You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/07/16 22:27:01 UTC

[GitHub] [geode] nonbinaryprogrammer opened a new pull request #6704: GEODE-9338: remove publish strong guarantees

nonbinaryprogrammer opened a new pull request #6704:
URL: https://github.com/apache/geode/pull/6704


   Thank you for submitting a contribution to Apache Geode.
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced in the commit message?
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `develop`)?
   
   - [ ] Is your initial contribution a single, squashed commit?
   
   - [ ] Does `gradlew build` run cleanly?
   
   - [ ] Have you written or updated unit tests to verify your changes?
   
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   
   ### Note:
   Please ensure that once the PR is submitted, check Concourse for build issues and
   submit an update to your PR as soon as possible. If you need help, please send an
   email to dev@geode.apache.org.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676959824



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license

Review comment:
       Replaced all original license text.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678395771



##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }
+        for (int i = 50; i < 100; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+      }
+    };
+    testSubmitter.start();
+
+    for (int i = 0; i < 100; i++) {
+      int actual = cs.take().get().intValue();
+      System.out.println("Retrieved " + actual);
+      assertThat(actual).isEqualTo(i);
+    }
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    testSubmitter.join();
+  }
+
+  @Test
+  public void testUnstripedRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestUnstripedRunnable(actual, i));
+    }
+    pool.shutdown();
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    assertThat(TestUnstripedRunnable.outOfSequence)
+        .as("Expected at least some out-of-sequence runnables to execute")
+        .isTrue();
+  }
+
+  @Test
+  public void testMultipleStripes() throws InterruptedException {
+    final ExecutorService pool = new StripedExecutorService();
+    ExecutorService producerPool = Executors.newCachedThreadPool();
+    for (int i = 0; i < 20; i++) {
+      producerPool.submit(new Runnable() {
+        public void run() {
+          Object stripe = new Object();
+          AtomicInteger actual = new AtomicInteger(0);
+          for (int i = 0; i < 100; i++) {
+            pool.submit(new TestRunnable(stripe, actual, i));
+          }
+        }
+      });
+    }
+    producerPool.shutdown();
+
+    while (!producerPool.awaitTermination(1, TimeUnit.MINUTES)) {
+      ;
+    }

Review comment:
       Done

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }
+        for (int i = 50; i < 100; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+      }
+    };
+    testSubmitter.start();
+
+    for (int i = 0; i < 100; i++) {
+      int actual = cs.take().get().intValue();
+      System.out.println("Retrieved " + actual);
+      assertThat(actual).isEqualTo(i);
+    }
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    testSubmitter.join();
+  }
+
+  @Test
+  public void testUnstripedRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestUnstripedRunnable(actual, i));
+    }
+    pool.shutdown();
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    assertThat(TestUnstripedRunnable.outOfSequence)
+        .as("Expected at least some out-of-sequence runnables to execute")
+        .isTrue();
+  }
+
+  @Test
+  public void testMultipleStripes() throws InterruptedException {
+    final ExecutorService pool = new StripedExecutorService();
+    ExecutorService producerPool = Executors.newCachedThreadPool();
+    for (int i = 0; i < 20; i++) {
+      producerPool.submit(new Runnable() {
+        public void run() {
+          Object stripe = new Object();
+          AtomicInteger actual = new AtomicInteger(0);
+          for (int i = 0; i < 100; i++) {
+            pool.submit(new TestRunnable(stripe, actual, i));
+          }
+        }
+      });
+    }
+    producerPool.shutdown();
+
+    while (!producerPool.awaitTermination(1, TimeUnit.MINUTES)) {
+      ;
+    }
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.DAYS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+  }
+
+
+  @Test
+  public void testMultipleFastStripes() throws InterruptedException {

Review comment:
       Done

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;

Review comment:
       Done

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;

Review comment:
       Done

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {

Review comment:
       Good point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678396782



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {
+        return executor.awaitTermination(
+            remainingTime, TimeUnit.NANOSECONDS);
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * As soon as a SerialExecutor is empty, we remove it from the
+   * executors map. We might thus remove the SerialExecutors
+   * more quickly than necessary, but at least we can avoid a
+   * memory leak.
+   */
+  private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex) {
+    assert ser_ex == executors.get(stripe);
+    assert lock.isHeldByCurrentThread();
+    assert ser_ex.isEmpty();

Review comment:
       I've converted these to explicit checks that throw exceptions now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] onichols-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
onichols-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r675894174



##########
File path: geode-assembly/src/integrationTest/resources/dependency_classpath.txt
##########
@@ -87,4 +87,4 @@ lucene-queryparser-6.6.6.jar
 lucene-core-6.6.6.jar
 lucene-queries-6.6.6.jar
 geo-0.7.7.jar
-netty-all-4.1.59.Final.jar
+netty-all-4.1.66.Final.jar

Review comment:
       may need a similar change in geode-assembly/src/integrationTest/resources/assembly_content.txt




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676960935



##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
##########
@@ -209,20 +210,18 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Future<Void> subscriber1Future =
-        executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
     Future<Void> subscriber2Future =
         executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
     assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
         .isTrue();
 
-    Long result = publisher1.publish(CHANNEL_NAME, "hello");
-    assertThat(result).isEqualTo(2);
+    publisher1.publish(CHANNEL_NAME, "hello");

Review comment:
       We do have tests that cover that - see for example `AbstractPubSubIntegrationTest.testOneSubscriberOneChannel`. Although, I don't see how the response can be meaningful to a client anymore.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678395286



##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }

Review comment:
       I don't know why that's there. Removing it doesn't seem to affect the test.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] upthewaterspout commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
upthewaterspout commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r685574132



##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
##########
@@ -244,18 +243,17 @@ public void shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
     Future<Void> subscriber2Future =
         executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
-        .isTrue();
+    assertThat(latch.await(30, TimeUnit.SECONDS))

Review comment:
       Maybe just use our default of 5 minutes then? GeodeAwaitility.DEFAULT_TIMEOUT.

##########
File path: geode-apis-compatible-with-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java
##########
@@ -15,17 +15,65 @@
 
 package org.apache.geode.redis.internal.executor.pubsub;
 
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.time.Duration;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.buildobjects.process.ProcBuilder;
+import org.buildobjects.process.StreamConsumer;
+import org.junit.AfterClass;
 import org.junit.ClassRule;
 
 import org.apache.geode.NativeRedisTestRule;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
 
 public class PubSubNativeRedisAcceptanceTest extends AbstractPubSubIntegrationTest {
+
+  private static final Logger logger = LogService.getLogger();
+
   @ClassRule
   public static NativeRedisTestRule redis = new NativeRedisTestRule();
 
+  @AfterClass
+  public static void cleanup() {
+    // This test consumes a lot of sockets and any subsequent tests may fail because of spurious
+    // bind exceptions. Even though sockets are closed, they will remain in TIME_WAIT state so we
+    // need to wait for that to clear up. It shouldn't take more than a minute or so.
+    GeodeAwaitility.await().pollInterval(Duration.ofSeconds(10))
+        .until(() -> countTimeWait() < 100);

Review comment:
       This seems kinda sketchy. I know for example Dale is working on running tests in parallel - wouldn't that mean 100s of sockets in TIMED_WAIT all the time? I always get worried about tests that are OS dependent or trying to make general assertions about the state of the machine at the time the test is running.
   
   What sockets are getting bind exceptions? Geode uses SO_REUSEADDR. Is redis not doing that? I was able to start redis twice on the same port from the command line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676915223



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license

Review comment:
       Oops - I think IJ replaced the original license when I added it to the project.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678398392



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {
+        return executor.awaitTermination(
+            remainingTime, TimeUnit.NANOSECONDS);
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * As soon as a SerialExecutor is empty, we remove it from the
+   * executors map. We might thus remove the SerialExecutors
+   * more quickly than necessary, but at least we can avoid a
+   * memory leak.
+   */
+  private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex) {
+    assert ser_ex == executors.get(stripe);
+    assert lock.isHeldByCurrentThread();
+    assert ser_ex.isEmpty();
+
+    executors.remove(stripe);
+    terminating.signalAll();
+    if (state == State.SHUTDOWN && executors.isEmpty()) {
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Prints information about current state of this executor, the
+   * wrapped executor and the serial executors.
+   */
+  public String toString() {
+    lock.lock();
+    try {
+      return "StripedExecutorService: state=" + state + ", " +
+          "executor=" + executor + ", " +
+          "serialExecutors=" + executors;
+    } finally {
+      lock.unlock();
+    }
+
+  }
+
+  /**
+   * This field is used for conditional compilation. If it is
+   * false, then the finalize method is an empty method, in
+   * which case the SerialExecutor will not be registered with
+   * the Finalizer.
+   */
+  private static boolean DEBUG = false;
+
+  /**
+   * SerialExecutor is based on the construct with the same name
+   * described in the {@link Executor} JavaDocs. The difference
+   * with our SerialExecutor is that it can be terminated. It
+   * also removes itself automatically once the queue is empty.
+   */
+  private class SerialExecutor implements Executor {
+    /**
+     * The queue of unexecuted tasks.
+     */
+    private final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
+    /**
+     * The runnable that we are currently busy with.
+     */
+    private Runnable active;
+    /**
+     * The stripe that this SerialExecutor was defined for. It
+     * is needed so that we can remove this executor from the
+     * map once it is empty.
+     */
+    private final Object stripe;
+
+    /**
+     * Creates a SerialExecutor for a particular stripe.
+     */
+    private SerialExecutor(Object stripe) {
+      this.stripe = stripe;
+      if (DEBUG) {
+        System.out.println("SerialExecutor created " + stripe);

Review comment:
       Added a `debug` log statement




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678382809



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -494,6 +506,25 @@ private void runSubscribeAndPublish(int index, int minimumIterations, AtomicBool
     return mockSubscriber.getReceivedEvents();
   }
 
+  @Test
+  public void ensureOrderingWithOneSubscriberMultiplePublishes() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    executor.submit(() -> subscriber.subscribe(mockSubscriber, "salutations"));

Review comment:
       I can understand the suggestion, but actually, I'd prefer not to. Maybe just me, but I find that constants in tests add a tiny layer of abstraction that actually detract from understanding what's going on. In this case, maybe less so since the constant is a string and would obviously be something like `SALUTATIONS` or `SALUTATIONS_CHANNEL`. But, even then it's ambiguous. Is it a regular channel or is it a pattern channel? The ambiguity is more so when the constant refers to a number, say `VALUE_TWO`. Does that mean the integer `2` or a string `"2"` or the word `two`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: remove publish strong guarantees

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r675760429



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
##########
@@ -31,10 +31,8 @@
   /**
    * Publish a message on a channel
    *
-   *
    * @param channel to publish to
    * @param message to publish
-   * @return the number of messages published
    */
   long publish(RegionProvider regionProvider, byte[] channel, byte[] message);

Review comment:
       Docs updated with better comments.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676841873



##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
##########
@@ -244,18 +243,17 @@ public void shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
     Future<Void> subscriber2Future =
         executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
-        .isTrue();
+    assertThat(latch.await(30, TimeUnit.SECONDS))

Review comment:
       Is 30 seconds just an arbitrary amount of time to wait here, or is there some significance to this value?

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }
+        for (int i = 50; i < 100; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+      }
+    };
+    testSubmitter.start();
+
+    for (int i = 0; i < 100; i++) {
+      int actual = cs.take().get().intValue();
+      System.out.println("Retrieved " + actual);
+      assertThat(actual).isEqualTo(i);
+    }
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    testSubmitter.join();
+  }
+
+  @Test
+  public void testUnstripedRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestUnstripedRunnable(actual, i));
+    }
+    pool.shutdown();
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    assertThat(TestUnstripedRunnable.outOfSequence)
+        .as("Expected at least some out-of-sequence runnables to execute")
+        .isTrue();
+  }
+
+  @Test
+  public void testMultipleStripes() throws InterruptedException {
+    final ExecutorService pool = new StripedExecutorService();
+    ExecutorService producerPool = Executors.newCachedThreadPool();
+    for (int i = 0; i < 20; i++) {
+      producerPool.submit(new Runnable() {

Review comment:
       This `new Runnable()` can be replaced with a lambda.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -330,11 +335,14 @@ public void testSubscribeAndPublishUsingBinaryData() {
     Long result = publisher.publish(binaryBlob, binaryBlob);
     assertThat(result).isEqualTo(1);
 
+    GeodeAwaitility.await().untilAsserted(
+        () -> assertThat(mockSubscriber.getReceivedMessages()).isNotEmpty());
+    assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(binaryBlob);

Review comment:
       See above comment about possibly combining these into one assertion.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -303,11 +306,13 @@ public void testPublishBinaryData() {
     Long result = publisher.publish("salutations".getBytes(), expectedMessage);
     assertThat(result).isEqualTo(1);
 
+    GeodeAwaitility.await()
+        .untilAsserted(() -> assertThat(mockSubscriber.getReceivedMessages()).isNotEmpty());
+    assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(expectedMessage);

Review comment:
       Do we expect that there could be more than one received message here? If not, then these assertions could be combined to one:
   ```
   GeodeAwaitility.await()
           .untilAsserted(() -> assertThat(mockSubscriber.getReceivedMessages()).containsExactly(expectedMessage));
   ```

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -42,13 +47,45 @@
 public class PubSubImpl implements PubSub {
   public static final String REDIS_PUB_SUB_FUNCTION_ID = "redisPubSubFunctionID";
 
+  private static final int DEFAULT_PUBLISH_THREAD_COUNT =
+      Integer.getInteger("redis.publish-thread-count", 100);
+
   private static final Logger logger = LogService.getLogger();
 
   private final Subscriptions subscriptions;
+  private final ExecutorService executor;
+
+  /**
+   * Inner class to wrap the publish action and pass it to the {@link StripedExecutorService}.
+   */
+  private static class PublishingRunnable implements StripedRunnable {
+
+    private final Runnable runnable;
+    private final Object stripeIdentity;

Review comment:
       Is there a reason this field and the argument that the constructor takes can't be a `Client`? The only use of the constructor always passes a `Client` in, so it seems like we can be certain of the type.

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);

Review comment:
       Could this 100 and the one in the for loop be extracted to a variable, to make it explicit what we're asserting here?

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -715,6 +751,34 @@ public void testPatternWithoutAGlob() {
     assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
   }
 
+  @Test
+  public void concurrentSubscribers_andPublishers_doesNotHang()
+      throws InterruptedException, ExecutionException {
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    Future<Integer> makeSubscribersFuture1 =
+        executor.submit(() -> makeSubscribers(10000, running));
+    Future<Integer> makeSubscribersFuture2 =
+        executor.submit(() -> makeSubscribers(10000, running));
+
+    Future<Integer> publish1 = executor.submit(() -> doPublishing(1, 10000, running));
+    Future<Integer> publish2 = executor.submit(() -> doPublishing(2, 10000, running));
+    Future<Integer> publish3 = executor.submit(() -> doPublishing(3, 10000, running));
+    Future<Integer> publish4 = executor.submit(() -> doPublishing(4, 10000, running));
+    Future<Integer> publish5 = executor.submit(() -> doPublishing(5, 10000, running));
+
+    running.set(false);
+
+    assertThat(makeSubscribersFuture1.get()).isGreaterThanOrEqualTo(10);
+    assertThat(makeSubscribersFuture2.get()).isGreaterThanOrEqualTo(10);

Review comment:
       Why are these assertions using a value of 10 specifically? Running locally, both futures return the `minimumIterations` value of 10,000, so it seems like it should be safe to use a much larger value for this check (though possibly not 10,000 exactly, if there's a possibility of the test becoming flaky as a result). 
   
   With a value as low as 10 it seems like there's a potential for a race condition where 10 subscriptions are able to created before publishing begins, which could cause the test to pass even if the publishing somehow caused a problem with creating the subscriptions.

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);

Review comment:
       Could this sleep be replaced with a suitable `await()`?

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -494,6 +506,25 @@ private void runSubscribeAndPublish(int index, int minimumIterations, AtomicBool
     return mockSubscriber.getReceivedEvents();
   }
 
+  @Test
+  public void ensureOrderingWithOneSubscriberMultiplePublishes() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    executor.submit(() -> subscriber.subscribe(mockSubscriber, "salutations"));

Review comment:
       Could this String be extracted to a constant? It seems to be used a lot in this class, and it would be better to not have it hard-coded everywhere.

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }

Review comment:
       What is the purpose of this 2-second wait between submitting the first and second set of 50 callables? Could it be removed?

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);

Review comment:
       Could this be replaced with an `await()`?

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -436,11 +446,11 @@ public void testPunsubscribingImplicitlyFromAllChannels() {
   }
 
   @Test
-  public void ensureOrderingOfPublishedMessages() throws Exception {
+  public void ensureOrderingOfPublishedMessagesWithTwoSubscriptions() throws Exception {
     AtomicBoolean running = new AtomicBoolean(true);
 
     Future<Void> future1 =
-        executor.submit(() -> runSubscribeAndPublish(1, 10000, running));
+        executor.submit(() -> runSubscribeAndPublish(1, 3000, running));

Review comment:
       A little outside the scope of this PR perhaps, but the `index` field can be safely removed from this method signature and others in this test, since it's always 1 and was only used to differentiate between multiple subscriptions, rendering it redundant here.

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }
+        for (int i = 50; i < 100; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+      }
+    };
+    testSubmitter.start();
+
+    for (int i = 0; i < 100; i++) {
+      int actual = cs.take().get().intValue();
+      System.out.println("Retrieved " + actual);
+      assertThat(actual).isEqualTo(i);
+    }
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    testSubmitter.join();
+  }
+
+  @Test
+  public void testUnstripedRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestUnstripedRunnable(actual, i));
+    }
+    pool.shutdown();
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    assertThat(TestUnstripedRunnable.outOfSequence)
+        .as("Expected at least some out-of-sequence runnables to execute")
+        .isTrue();
+  }
+
+  @Test
+  public void testMultipleStripes() throws InterruptedException {
+    final ExecutorService pool = new StripedExecutorService();
+    ExecutorService producerPool = Executors.newCachedThreadPool();
+    for (int i = 0; i < 20; i++) {
+      producerPool.submit(new Runnable() {
+        public void run() {
+          Object stripe = new Object();
+          AtomicInteger actual = new AtomicInteger(0);
+          for (int i = 0; i < 100; i++) {
+            pool.submit(new TestRunnable(stripe, actual, i));
+          }
+        }
+      });
+    }
+    producerPool.shutdown();
+
+    while (!producerPool.awaitTermination(1, TimeUnit.MINUTES)) {
+      ;
+    }

Review comment:
       Can this be replaced with an `await()`?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {

Review comment:
       This statement should always be true when reached, as the return statement above it ensures that it's only reachable if we exited the while statement due to `executors.isEmpty()` returning true rather than due to exceeding the timeout. Given that `executors` is only added to while holding the lock, it shouldn't be possible for the value of `executors.isEmpty()` to change after it evaluates to true in the while loop.

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }
+        for (int i = 50; i < 100; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+      }
+    };
+    testSubmitter.start();
+
+    for (int i = 0; i < 100; i++) {
+      int actual = cs.take().get().intValue();

Review comment:
       The call to `intValue()` is unnecessary here and can be removed.

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }
+        for (int i = 50; i < 100; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+      }
+    };
+    testSubmitter.start();
+
+    for (int i = 0; i < 100; i++) {
+      int actual = cs.take().get().intValue();
+      System.out.println("Retrieved " + actual);
+      assertThat(actual).isEqualTo(i);
+    }
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    testSubmitter.join();
+  }
+
+  @Test
+  public void testUnstripedRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestUnstripedRunnable(actual, i));
+    }
+    pool.shutdown();
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+
+    assertThat(TestUnstripedRunnable.outOfSequence)
+        .as("Expected at least some out-of-sequence runnables to execute")
+        .isTrue();
+  }
+
+  @Test
+  public void testMultipleStripes() throws InterruptedException {
+    final ExecutorService pool = new StripedExecutorService();
+    ExecutorService producerPool = Executors.newCachedThreadPool();
+    for (int i = 0; i < 20; i++) {
+      producerPool.submit(new Runnable() {
+        public void run() {
+          Object stripe = new Object();
+          AtomicInteger actual = new AtomicInteger(0);
+          for (int i = 0; i < 100; i++) {
+            pool.submit(new TestRunnable(stripe, actual, i));
+          }
+        }
+      });
+    }
+    producerPool.shutdown();
+
+    while (!producerPool.awaitTermination(1, TimeUnit.MINUTES)) {
+      ;
+    }
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.DAYS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+  }
+
+
+  @Test
+  public void testMultipleFastStripes() throws InterruptedException {

Review comment:
       The comments that apply to `testMultipleStripes()` also apply here.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;

Review comment:
       These if statements should have curly braces.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {
+        return executor.awaitTermination(
+            remainingTime, TimeUnit.NANOSECONDS);
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * As soon as a SerialExecutor is empty, we remove it from the
+   * executors map. We might thus remove the SerialExecutors
+   * more quickly than necessary, but at least we can avoid a
+   * memory leak.
+   */
+  private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex) {
+    assert ser_ex == executors.get(stripe);
+    assert lock.isHeldByCurrentThread();
+    assert ser_ex.isEmpty();
+
+    executors.remove(stripe);
+    terminating.signalAll();
+    if (state == State.SHUTDOWN && executors.isEmpty()) {
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Prints information about current state of this executor, the
+   * wrapped executor and the serial executors.
+   */
+  public String toString() {
+    lock.lock();
+    try {
+      return "StripedExecutorService: state=" + state + ", " +
+          "executor=" + executor + ", " +
+          "serialExecutors=" + executors;
+    } finally {
+      lock.unlock();
+    }
+
+  }
+
+  /**
+   * This field is used for conditional compilation. If it is
+   * false, then the finalize method is an empty method, in
+   * which case the SerialExecutor will not be registered with
+   * the Finalizer.
+   */
+  private static boolean DEBUG = false;

Review comment:
       This can be `final`.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;

Review comment:
       This if statement should have curly braces.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {
+        return executor.awaitTermination(
+            remainingTime, TimeUnit.NANOSECONDS);
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * As soon as a SerialExecutor is empty, we remove it from the
+   * executors map. We might thus remove the SerialExecutors
+   * more quickly than necessary, but at least we can avoid a
+   * memory leak.
+   */
+  private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex) {
+    assert ser_ex == executors.get(stripe);
+    assert lock.isHeldByCurrentThread();
+    assert ser_ex.isEmpty();
+
+    executors.remove(stripe);
+    terminating.signalAll();
+    if (state == State.SHUTDOWN && executors.isEmpty()) {
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Prints information about current state of this executor, the
+   * wrapped executor and the serial executors.
+   */
+  public String toString() {
+    lock.lock();
+    try {
+      return "StripedExecutorService: state=" + state + ", " +
+          "executor=" + executor + ", " +
+          "serialExecutors=" + executors;
+    } finally {
+      lock.unlock();
+    }
+
+  }
+
+  /**
+   * This field is used for conditional compilation. If it is
+   * false, then the finalize method is an empty method, in
+   * which case the SerialExecutor will not be registered with
+   * the Finalizer.
+   */
+  private static boolean DEBUG = false;
+
+  /**
+   * SerialExecutor is based on the construct with the same name
+   * described in the {@link Executor} JavaDocs. The difference
+   * with our SerialExecutor is that it can be terminated. It
+   * also removes itself automatically once the queue is empty.
+   */
+  private class SerialExecutor implements Executor {
+    /**
+     * The queue of unexecuted tasks.
+     */
+    private final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
+    /**
+     * The runnable that we are currently busy with.
+     */
+    private Runnable active;
+    /**
+     * The stripe that this SerialExecutor was defined for. It
+     * is needed so that we can remove this executor from the
+     * map once it is empty.
+     */
+    private final Object stripe;
+
+    /**
+     * Creates a SerialExecutor for a particular stripe.
+     */
+    private SerialExecutor(Object stripe) {
+      this.stripe = stripe;
+      if (DEBUG) {
+        System.out.println("SerialExecutor created " + stripe);

Review comment:
       Rather than the `DEBUG` boolean and a `System.out` statement, might it be better to modify this to make use of logging?

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {
+        return executor.awaitTermination(
+            remainingTime, TimeUnit.NANOSECONDS);
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * As soon as a SerialExecutor is empty, we remove it from the
+   * executors map. We might thus remove the SerialExecutors
+   * more quickly than necessary, but at least we can avoid a
+   * memory leak.
+   */
+  private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex) {
+    assert ser_ex == executors.get(stripe);
+    assert lock.isHeldByCurrentThread();
+    assert ser_ex.isEmpty();

Review comment:
       Is it common for users to run Geode with assertions enabled? Since they're not enabled by default, might it be safer to use a slightly different approach here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] nonbinaryprogrammer merged pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
nonbinaryprogrammer merged pull request #6704:
URL: https://github.com/apache/geode/pull/6704


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678395538



##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);
+  }
+
+  @Test
+  public void testSingleStripeCallableWithCompletionService()
+      throws InterruptedException, ExecutionException {
+    ExecutorService pool = new StripedExecutorService();
+    final CompletionService<Integer> cs = new ExecutorCompletionService<>(
+        pool);
+
+    Thread testSubmitter = new Thread("TestSubmitter") {
+      public void run() {
+        Object stripe = new Object();
+        for (int i = 0; i < 50; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+        try {
+          Thread.sleep(2000);
+        } catch (InterruptedException e) {
+          interrupt();
+        }
+        for (int i = 50; i < 100; i++) {
+          cs.submit(new TestCallable(stripe, i));
+        }
+      }
+    };
+    testSubmitter.start();
+
+    for (int i = 0; i < 100; i++) {
+      int actual = cs.take().get().intValue();

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] dschneider-pivotal commented on a change in pull request #6704: GEODE-9338: remove publish strong guarantees

Posted by GitBox <gi...@apache.org>.
dschneider-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r674993888



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -58,23 +62,25 @@ public int getSubscriptionCount() {
 
   @Override
   public long publish(RegionProvider regionProvider, byte[] channel, byte[] message) {
-    Set<DistributedMember> membersWithDataRegion = regionProvider.getRegionMembers();
-    @SuppressWarnings("unchecked")
-    ResultCollector<String[], List<Long>> subscriberCountCollector = FunctionService
-        .onMembers(membersWithDataRegion)
-        .setArguments(new Object[] {channel, message})
-        .execute(REDIS_PUB_SUB_FUNCTION_ID);
+    executor.submit(() -> internalPublish(regionProvider, channel, message));
 
-    List<Long> subscriberCounts = null;
+    return subscriptions.findSubscriptions(channel).size();

Review comment:
       consider calling findSubscriptions(channel).size() first and saving its value in "long result".
   Then only call executor.submit if result > 0.
   Then return result.

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
##########
@@ -31,10 +31,8 @@
   /**
    * Publish a message on a channel
    *
-   *
    * @param channel to publish to
    * @param message to publish
-   * @return the number of messages published
    */
   long publish(RegionProvider regionProvider, byte[] channel, byte[] message);

Review comment:
       since publish still returns a long it seems like you should still have a comment describing what is returned. How about "return an estimate of the number of messages published by this call". It might also be worth enhancing this comment to say that this call may return before the actual publish has been done. Something about it being async.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676959619



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -42,13 +47,45 @@
 public class PubSubImpl implements PubSub {
   public static final String REDIS_PUB_SUB_FUNCTION_ID = "redisPubSubFunctionID";
 
+  private static final int DEFAULT_PUBLISH_THREAD_COUNT =
+      Integer.getInteger("redis.publish-thread-count", 100);

Review comment:
       Switch to that and capped it at 10 threads now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678382809



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -494,6 +506,25 @@ private void runSubscribeAndPublish(int index, int minimumIterations, AtomicBool
     return mockSubscriber.getReceivedEvents();
   }
 
+  @Test
+  public void ensureOrderingWithOneSubscriberMultiplePublishes() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    executor.submit(() -> subscriber.subscribe(mockSubscriber, "salutations"));

Review comment:
       I can understand the suggestion, but actually, I'd prefer not to. Maybe just me, but I find that constants in tests add a tiny layer of abstraction that actually detract from understanding what's going on. In this case, maybe less so since the constant is a string and would obviously be something like `SALUTATIONS` or `SALUTATIONS_CHANNEL`. But, even then it's ambiguous. Is it a regular channel or is it a pattern channel? The ambiguity is more so when the constant refers to a number, say `VALUE_TWO`. Does that mean the integer `2` or a string `"2"` or the word `"two"`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678397959



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,522 @@
+/*
+ * Copyright (C) 2000-2012 Heinz Max Kabutz
+ *
+ * See the NOTICE file distributed with this work for additional
+ * information regarding copyright ownership. Heinz Max Kabutz licenses
+ * this file to you under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License. You may
+ * obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import java.util.ArrayList;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.AbstractExecutorService;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * The StripedExecutorService accepts Runnable/Callable objects
+ * that also implement the StripedObject interface. It executes
+ * all the tasks for a single "stripe" consecutively.
+ * <p/>
+ * In this version, submitted tasks do not necessarily have to
+ * implement the StripedObject interface. If they do not, then
+ * they will simply be passed onto the wrapped ExecutorService
+ * directly.
+ * <p/>
+ * Idea inspired by Glenn McGregor on the Concurrency-interest
+ * mailing list and using the SerialExecutor presented in the
+ * Executor interface's JavaDocs.
+ * <p/>
+ * http://cs.oswego.edu/mailman/listinfo/concurrency-interest
+ *
+ * @author Dr Heinz M. Kabutz
+ */
+public class StripedExecutorService extends AbstractExecutorService {
+  /**
+   * The wrapped ExecutorService that will actually execute our
+   * tasks.
+   */
+  private final ExecutorService executor;
+
+  /**
+   * The lock prevents shutdown from being called in the middle
+   * of a submit. It also guards the executors IdentityHashMap.
+   */
+  private final ReentrantLock lock = new ReentrantLock();
+
+  /**
+   * This condition allows us to cleanly terminate this executor
+   * service.
+   */
+  private final Condition terminating = lock.newCondition();
+
+  /**
+   * Whenever a new StripedObject is submitted to the pool, it
+   * is added to this IdentityHashMap. As soon as the
+   * SerialExecutor is empty, the entry is removed from the map,
+   * in order to avoid a memory leak.
+   */
+  private final Map<Object, SerialExecutor> executors = new IdentityHashMap<>();
+
+  /**
+   * The default submit() method creates a new FutureTask and
+   * wraps our StripedRunnable with it. We thus need to
+   * remember the stripe object somewhere. In our case, we will
+   * do this inside the ThreadLocal "stripes". Before the
+   * thread returns from submitting the runnable, it will always
+   * remove the thread local entry.
+   */
+  private static final ThreadLocal<Object> stripes = new ThreadLocal<>();
+
+  /**
+   * Valid states are RUNNING and SHUTDOWN. We rely on the
+   * underlying executor service for the remaining states.
+   */
+  private State state = State.RUNNING;
+
+  private enum State {
+    RUNNING, SHUTDOWN
+  }
+
+  /**
+   * Take care using this constructor. The original visibility was private
+   * since users should not shutdown their executors directly,
+   * otherwise jobs might get stuck in our queues. Do not shutdown the executor
+   * passed in.
+   *
+   * @param executor the executor service that we use to execute
+   *        the tasks
+   */
+  public StripedExecutorService(ExecutorService executor) {
+    this.executor = executor;
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a
+   * cached thread pool.
+   */
+  public StripedExecutorService() {
+    this(Executors.newCachedThreadPool());
+  }
+
+  /**
+   * This constructs a StripedExecutorService that wraps a fixed
+   * thread pool with the given number of threads.
+   */
+  public StripedExecutorService(int numberOfThreads) {
+    this(Executors.newFixedThreadPool(numberOfThreads));
+  }
+
+  /**
+   * If the runnable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual runnable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
+    saveStripedObject(runnable);
+    return super.newTaskFor(runnable, value);
+  }
+
+  /**
+   * If the callable also implements StripedObject, we store the
+   * stripe object in a thread local, since the actual callable
+   * will be wrapped with a FutureTask.
+   */
+  @Override
+  protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
+    saveStripedObject(callable);
+    return super.newTaskFor(callable);
+  }
+
+  /**
+   * Saves the stripe in a ThreadLocal until we can use it to
+   * schedule the task into our pool.
+   */
+  private void saveStripedObject(Object task) {
+    if (isStripedObject(task)) {
+      stripes.set(((StripedObject) task).getStripe());
+    }
+  }
+
+  /**
+   * Returns true if the object implements the StripedObject
+   * interface.
+   */
+  private static boolean isStripedObject(Object o) {
+    return o instanceof StripedObject;
+  }
+
+  /**
+   * Delegates the call to submit(task, null).
+   */
+  public Future<?> submit(Runnable task) {
+    return submit(task, null);
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Runnable task, T result) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task, result);
+      } else { // bypass the serial executors
+        return executor.submit(task, result);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * If the task is a StripedObject, we execute it in-order by
+   * its stripe, otherwise we submit it directly to the wrapped
+   * executor. If the pool is not running, we throw a
+   * RejectedExecutionException.
+   */
+  public <T> Future<T> submit(Callable<T> task) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      if (isStripedObject(task)) {
+        return super.submit(task);
+      } else { // bypass the serial executors
+        return executor.submit(task);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Throws a RejectedExecutionException if the state is not
+   * RUNNING.
+   */
+  private void checkPoolIsRunning() {
+    assert lock.isHeldByCurrentThread();
+    if (state != State.RUNNING) {
+      throw new RejectedExecutionException("executor not running");
+    }
+  }
+
+  /**
+   * Executes the command. If command implements StripedObject,
+   * we execute it with a SerialExecutor. This method can be
+   * called directly by clients or it may be called by the
+   * AbstractExecutorService's submit() methods. In that case,
+   * we check whether the stripes thread local has been set. If
+   * it is, we remove it and use it to determine the
+   * StripedObject and execute it with a SerialExecutor. If no
+   * StripedObject is set, we instead pass the command to the
+   * wrapped ExecutorService directly.
+   */
+  public void execute(Runnable command) {
+    lock.lock();
+    try {
+      checkPoolIsRunning();
+      Object stripe = getStripe(command);
+      if (stripe != null) {
+        SerialExecutor ser_exec = executors.get(stripe);
+        if (ser_exec == null) {
+          executors.put(stripe, ser_exec = new SerialExecutor(stripe));
+        }
+        ser_exec.execute(command);
+      } else {
+        executor.execute(command);
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * We get the stripe object either from the Runnable if it
+   * also implements StripedObject, or otherwise from the thread
+   * local temporary storage. Result may be null.
+   */
+  private Object getStripe(Runnable command) {
+    Object stripe;
+    if (command instanceof StripedObject) {
+      stripe = (((StripedObject) command).getStripe());
+    } else {
+      stripe = stripes.get();
+    }
+    stripes.remove();
+    return stripe;
+  }
+
+  /**
+   * Shuts down the StripedExecutorService. No more tasks will
+   * be submitted. If the map of SerialExecutors is empty, we
+   * shut down the wrapped executor.
+   */
+  public void shutdown() {
+    lock.lock();
+    try {
+      state = State.SHUTDOWN;
+      if (executors.isEmpty()) {
+        executor.shutdown();
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * All the tasks in each of the SerialExecutors are drained
+   * to a list, as well as the tasks inside the wrapped
+   * ExecutorService. This is then returned to the user. Also,
+   * the shutdownNow method of the wrapped executor is called.
+   */
+  public List<Runnable> shutdownNow() {
+    lock.lock();
+    try {
+      shutdown();
+      List<Runnable> result = new ArrayList<>();
+      for (SerialExecutor ser_ex : executors.values()) {
+        ser_ex.tasks.drainTo(result);
+      }
+      result.addAll(executor.shutdownNow());
+      return result;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if shutdown() or shutdownNow() have been
+   * called; false otherwise.
+   */
+  public boolean isShutdown() {
+    lock.lock();
+    try {
+      return state == State.SHUTDOWN;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if this pool has been terminated, that is, all
+   * the SerialExecutors are empty and the wrapped
+   * ExecutorService has been terminated.
+   */
+  public boolean isTerminated() {
+    lock.lock();
+    try {
+      if (state == State.RUNNING)
+        return false;
+      for (SerialExecutor executor : executors.values()) {
+        if (!executor.isEmpty())
+          return false;
+      }
+      return executor.isTerminated();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Returns true if the wrapped ExecutorService terminates
+   * within the allotted amount of time.
+   */
+  public boolean awaitTermination(long timeout, TimeUnit unit)
+      throws InterruptedException {
+    lock.lock();
+    try {
+      long waitUntil = System.nanoTime() + unit.toNanos(timeout);
+      long remainingTime;
+      while ((remainingTime = waitUntil - System.nanoTime()) > 0
+          && !executors.isEmpty()) {
+        terminating.awaitNanos(remainingTime);
+      }
+      if (remainingTime <= 0)
+        return false;
+      if (executors.isEmpty()) {
+        return executor.awaitTermination(
+            remainingTime, TimeUnit.NANOSECONDS);
+      }
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * As soon as a SerialExecutor is empty, we remove it from the
+   * executors map. We might thus remove the SerialExecutors
+   * more quickly than necessary, but at least we can avoid a
+   * memory leak.
+   */
+  private void removeEmptySerialExecutor(Object stripe, SerialExecutor ser_ex) {
+    assert ser_ex == executors.get(stripe);
+    assert lock.isHeldByCurrentThread();
+    assert ser_ex.isEmpty();
+
+    executors.remove(stripe);
+    terminating.signalAll();
+    if (state == State.SHUTDOWN && executors.isEmpty()) {
+      executor.shutdown();
+    }
+  }
+
+  /**
+   * Prints information about current state of this executor, the
+   * wrapped executor and the serial executors.
+   */
+  public String toString() {
+    lock.lock();
+    try {
+      return "StripedExecutorService: state=" + state + ", " +
+          "executor=" + executor + ", " +
+          "serialExecutors=" + executors;
+    } finally {
+      lock.unlock();
+    }
+
+  }
+
+  /**
+   * This field is used for conditional compilation. If it is
+   * false, then the finalize method is an empty method, in
+   * which case the SerialExecutor will not be registered with
+   * the Finalizer.
+   */
+  private static boolean DEBUG = false;

Review comment:
       I've removed all this debug stuff since we wouldn't have any means of switching it on if we thought there was a problem. We'd have to introduce a system variable to set it externally.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: remove publish strong guarantees

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r675762310



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -58,23 +62,25 @@ public int getSubscriptionCount() {
 
   @Override
   public long publish(RegionProvider regionProvider, byte[] channel, byte[] message) {
-    Set<DistributedMember> membersWithDataRegion = regionProvider.getRegionMembers();
-    @SuppressWarnings("unchecked")
-    ResultCollector<String[], List<Long>> subscriberCountCollector = FunctionService
-        .onMembers(membersWithDataRegion)
-        .setArguments(new Object[] {channel, message})
-        .execute(REDIS_PUB_SUB_FUNCTION_ID);
+    executor.submit(() -> internalPublish(regionProvider, channel, message));
 
-    List<Long> subscriberCounts = null;
+    return subscriptions.findSubscriptions(channel).size();

Review comment:
       `findSubscriptions` only considers local subscribers and doesn't account for remote ones. So, unfortunately, we still need to always issue the fn call.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676919420



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -42,13 +47,45 @@
 public class PubSubImpl implements PubSub {
   public static final String REDIS_PUB_SUB_FUNCTION_ID = "redisPubSubFunctionID";
 
+  private static final int DEFAULT_PUBLISH_THREAD_COUNT =
+      Integer.getInteger("redis.publish-thread-count", 100);

Review comment:
       We could probably just use a `CachedThreadPool`, although it would be nice to have one that's also capped.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678386363



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -715,6 +751,34 @@ public void testPatternWithoutAGlob() {
     assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
   }
 
+  @Test
+  public void concurrentSubscribers_andPublishers_doesNotHang()
+      throws InterruptedException, ExecutionException {
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    Future<Integer> makeSubscribersFuture1 =
+        executor.submit(() -> makeSubscribers(10000, running));
+    Future<Integer> makeSubscribersFuture2 =
+        executor.submit(() -> makeSubscribers(10000, running));
+
+    Future<Integer> publish1 = executor.submit(() -> doPublishing(1, 10000, running));
+    Future<Integer> publish2 = executor.submit(() -> doPublishing(2, 10000, running));
+    Future<Integer> publish3 = executor.submit(() -> doPublishing(3, 10000, running));
+    Future<Integer> publish4 = executor.submit(() -> doPublishing(4, 10000, running));
+    Future<Integer> publish5 = executor.submit(() -> doPublishing(5, 10000, running));
+
+    running.set(false);
+
+    assertThat(makeSubscribersFuture1.get()).isGreaterThanOrEqualTo(10);
+    assertThat(makeSubscribersFuture2.get()).isGreaterThanOrEqualTo(10);

Review comment:
       The value here is actually a count of messages published so no race exists since you need to have subscribed before you can receive a published message. I did adjust the assertion to simply be `> 0` since `10` is somewhat arbitrary and confusing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678394175



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -42,13 +47,45 @@
 public class PubSubImpl implements PubSub {
   public static final String REDIS_PUB_SUB_FUNCTION_ID = "redisPubSubFunctionID";
 
+  private static final int DEFAULT_PUBLISH_THREAD_COUNT =
+      Integer.getInteger("redis.publish-thread-count", 100);
+
   private static final Logger logger = LogService.getLogger();
 
   private final Subscriptions subscriptions;
+  private final ExecutorService executor;
+
+  /**
+   * Inner class to wrap the publish action and pass it to the {@link StripedExecutorService}.
+   */
+  private static class PublishingRunnable implements StripedRunnable {
+
+    private final Runnable runnable;
+    private final Object stripeIdentity;

Review comment:
       I don't want to obscure the fact that the API only cares about the stripe object reference, but since it's a very specific use I can see your point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] upthewaterspout commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
upthewaterspout commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r676848022



##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/services/StripedExecutorService.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license

Review comment:
       Since this file came from a third party, I think you need to leave their license in place at the top of the file - https://www.apache.org/legal/src-headers.html#3party

##########
File path: geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
##########
@@ -42,13 +47,45 @@
 public class PubSubImpl implements PubSub {
   public static final String REDIS_PUB_SUB_FUNCTION_ID = "redisPubSubFunctionID";
 
+  private static final int DEFAULT_PUBLISH_THREAD_COUNT =
+      Integer.getInteger("redis.publish-thread-count", 100);

Review comment:
       Huh, that's a lot of threads. I wonder if we really should default to this many. I guess there is no way to have this StripedExecutorService thing adapt to changing load?

##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
##########
@@ -209,20 +210,18 @@ public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSu
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Future<Void> subscriber1Future =
-        executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
     Future<Void> subscriber2Future =
         executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
     assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
         .isTrue();
 
-    Long result = publisher1.publish(CHANNEL_NAME, "hello");
-    assertThat(result).isEqualTo(2);
+    publisher1.publish(CHANNEL_NAME, "hello");

Review comment:
       We changed the meaning and behavior of the results of the publish command - now it is only returning the number of subscribers on the current node. Does that mean we no longer know what to assert here? Are there other tests that are checking that what we return from publish makes sense?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on pull request #6704:
URL: https://github.com/apache/geode/pull/6704#issuecomment-887172730


   I think it would be good to have some implementation specific stats on the pubsub subsytem:
   - messages published
   - publish time
   - queue depth (messages waiting to publish)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678372249



##########
File path: geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
##########
@@ -244,18 +243,17 @@ public void shouldContinueToFunction_whenOneServerShutsDownAbruptly_givenTwoSubs
     Future<Void> subscriber2Future =
         executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
-        .isTrue();
+    assertThat(latch.await(30, TimeUnit.SECONDS))

Review comment:
       No significance.

##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -303,11 +306,13 @@ public void testPublishBinaryData() {
     Long result = publisher.publish("salutations".getBytes(), expectedMessage);
     assertThat(result).isEqualTo(1);
 
+    GeodeAwaitility.await()
+        .untilAsserted(() -> assertThat(mockSubscriber.getReceivedMessages()).isNotEmpty());
+    assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(expectedMessage);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678394441



##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);

Review comment:
       Done

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);

Review comment:
       Done

##########
File path: geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/services/StripedExecutorServiceJUnitTest.java
##########
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.services;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Collection;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author Heinz Kabutz
+ */
+public class StripedExecutorServiceJUnitTest {
+  @Before
+  public void initialize() {
+    TestRunnable.outOfSequence = false;
+    TestUnstripedRunnable.outOfSequence = false;
+    TestFastRunnable.outOfSequence = false;
+  }
+
+  @Test
+  public void testSingleStripeRunnable() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    assertThat(pool.isTerminated()).isFalse();
+    assertThat(pool.isShutdown()).isFalse();
+
+    pool.shutdown();
+
+    assertThat(pool.awaitTermination(1, TimeUnit.HOURS)).isTrue();
+    assertThat(TestRunnable.outOfSequence)
+        .as("Expected no out-of-sequence runnables to execute")
+        .isFalse();
+    assertThat(pool.isTerminated()).isTrue();
+  }
+
+  @Test
+  public void testShutdown() throws InterruptedException {
+    ThreadGroup group = new ThreadGroup("stripetestgroup");
+    Thread starter = new Thread(group, "starter") {
+      public void run() {
+        ExecutorService pool = new StripedExecutorService();
+        Object stripe = new Object();
+        AtomicInteger actual = new AtomicInteger(0);
+        for (int i = 0; i < 100; i++) {
+          pool.submit(new TestRunnable(stripe, actual, i));
+        }
+        pool.shutdown();
+      }
+    };
+    starter.start();
+    starter.join();
+
+    for (int i = 0; i < 100; i++) {
+      if (group.activeCount() == 0) {
+        return;
+      }
+      Thread.sleep(100);
+    }
+
+    assertThat(group.activeCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testShutdownNow() throws InterruptedException {
+    ExecutorService pool = new StripedExecutorService();
+    Object stripe = new Object();
+    AtomicInteger actual = new AtomicInteger(0);
+    for (int i = 0; i < 100; i++) {
+      pool.submit(new TestRunnable(stripe, actual, i));
+    }
+    Thread.sleep(500);
+
+    assertThat(pool.isTerminated()).isFalse();
+    Collection<Runnable> unfinishedJobs = pool.shutdownNow();
+
+    assertThat(pool.isShutdown()).isTrue();
+    assertThat(pool.awaitTermination(1, TimeUnit.MINUTES)).isTrue();
+    assertThat(pool.isTerminated()).isTrue();
+
+    assertThat(unfinishedJobs.size() > 0).isTrue();
+
+    assertThat(unfinishedJobs.size() + actual.intValue()).isEqualTo(100);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] jdeppe-pivotal commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
jdeppe-pivotal commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678399010



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -330,11 +335,14 @@ public void testSubscribeAndPublishUsingBinaryData() {
     Long result = publisher.publish(binaryBlob, binaryBlob);
     assertThat(result).isEqualTo(1);
 
+    GeodeAwaitility.await().untilAsserted(
+        () -> assertThat(mockSubscriber.getReceivedMessages()).isNotEmpty());
+    assertThat(mockSubscriber.getReceivedMessages().get(0)).isEqualTo(binaryBlob);

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [geode] DonalEvans commented on a change in pull request #6704: GEODE-9338: Remove strong guarantees for Radish PUBLISH command

Posted by GitBox <gi...@apache.org>.
DonalEvans commented on a change in pull request #6704:
URL: https://github.com/apache/geode/pull/6704#discussion_r678463633



##########
File path: geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/AbstractPubSubIntegrationTest.java
##########
@@ -494,6 +506,25 @@ private void runSubscribeAndPublish(int index, int minimumIterations, AtomicBool
     return mockSubscriber.getReceivedEvents();
   }
 
+  @Test
+  public void ensureOrderingWithOneSubscriberMultiplePublishes() {
+    MockSubscriber mockSubscriber = new MockSubscriber();
+    executor.submit(() -> subscriber.subscribe(mockSubscriber, "salutations"));

Review comment:
       Fair enough, that makes sense.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@geode.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org