You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/08/31 23:23:03 UTC

[geode] branch develop updated: GEODE-8333: Change Redis adapter threading model - fixes pubsub issues (#5488)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 27b8e47  GEODE-8333: Change Redis adapter threading model - fixes pubsub issues (#5488)
27b8e47 is described below

commit 27b8e47e9f6e433dc71bffc5420ebcb51f79797e
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Mon Aug 31 16:22:11 2020 -0700

    GEODE-8333: Change Redis adapter threading model - fixes pubsub issues (#5488)
    
    
    - Do not use Netty threads for the entire request lifecycle. Each
      instance of ExecutionHandlerContext (essentially each client
      connection) uses a command queue which is actioned by a single thread
      taken from the Geode 'waiting pool'.
    - Every SUBSCRIBEed client is moved to a separate EventGroupLoop so that
      PUBLISHed messages are not sent back using the 'normal' Worker
      EventLoopGroup. This avoids a deadlock issue where PUBLISHed messages
      need to be sent using the same thread that the PUBLISH response needs
      to happen on.
    - Fix issues with PubSub where switching the EventLoopGroup may fail
      (because a client has already closed the connection) thus resulting in
      hanging PUBLISHers waiting for a Subscription to be marked as
      'readyForPublish'.
    - Consolidate various MockSubscriber classes
---
 .../src/test/resources/expected-pom.xml            |   6 +
 .../gradle/plugins/DependencyConstraints.groovy    |   1 +
 geode-redis/build.gradle                           |   4 +
 .../pubsub/PubSubNativeRedisAcceptanceTest.java    |   5 +-
 .../geode/redis/mocks/MockBinarySubscriber.java    |   0
 .../apache/geode/redis/mocks/MockSubscriber.java   | 100 +++++++++--
 .../org/apache/geode/redis/MockSubscriber.java     |  62 -------
 .../internal/executor/pubsub/PubSubDUnitTest.java  | 196 +++++++++++++++------
 .../geode/redis/session/RedisSessionDUnitTest.java |  45 +++++
 .../pubsub/LettucePubSubIntegrationTest.java       |  98 +++++++++++
 .../executor/pubsub/PubSubIntegrationTest.java     | 182 ++++++++++++++++++-
 .../pubsub/SubscriptionsIntegrationTest.java       |   6 +-
 .../geode/redis/mocks/DummySubscription.java       |   6 +
 .../geode/redis/internal/GeodeRedisServer.java     |   4 +-
 .../redis/internal/executor/RedisResponse.java     |  12 ++
 .../executor/pubsub/PsubscribeExecutor.java        |  46 +++--
 .../internal/executor/pubsub/PublishExecutor.java  |   9 +-
 .../executor/pubsub/SubscribeExecutor.java         |  37 +++-
 .../apache/geode/redis/internal/netty/Command.java |  30 ++++
 .../internal/netty/ExecutionHandlerContext.java    | 175 +++++++++++-------
 .../redis/internal/netty/NettyRedisServer.java     |  11 +-
 .../internal/pubsub/AbstractSubscription.java      |  72 +++++---
 .../redis/internal/pubsub/ChannelSubscription.java |   5 +-
 .../redis/internal/pubsub/PatternSubscription.java |   5 +-
 .../apache/geode/redis/internal/pubsub/PubSub.java |   8 +-
 .../geode/redis/internal/pubsub/PubSubImpl.java    |   6 +-
 .../redis/internal/pubsub/SubscribeResult.java}    |  42 ++---
 .../geode/redis/internal/pubsub/Subscription.java  |   7 +
 .../geode/redis/internal/pubsub/Subscriptions.java |  23 ++-
 .../redis/internal/pubsub/PubSubImplJUnitTest.java |   9 +-
 .../internal/pubsub/SubscriptionsJUnitTest.java    |  27 +--
 31 files changed, 930 insertions(+), 309 deletions(-)

diff --git a/boms/geode-all-bom/src/test/resources/expected-pom.xml b/boms/geode-all-bom/src/test/resources/expected-pom.xml
index 9013d57..40e1920 100644
--- a/boms/geode-all-bom/src/test/resources/expected-pom.xml
+++ b/boms/geode-all-bom/src/test/resources/expected-pom.xml
@@ -548,6 +548,12 @@
         <scope>compile</scope>
       </dependency>
       <dependency>
+        <groupId>io.lettuce</groupId>
+        <artifactId>lettuce-core</artifactId>
+        <version>5.2.1.RELEASE</version>
+        <scope>compile</scope>
+      </dependency>
+      <dependency>
         <groupId>xerces</groupId>
         <artifactId>xercesImpl</artifactId>
         <version>2.12.0</version>
diff --git a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
index 860dafe..37d2c2e 100644
--- a/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
+++ b/buildSrc/src/main/groovy/org/apache/geode/gradle/plugins/DependencyConstraints.groovy
@@ -169,6 +169,7 @@ class DependencyConstraints implements Plugin<Project> {
         api(group: 'org.testcontainers', name: 'testcontainers', version: '1.13.0')
         api(group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.0')
         api(group: 'redis.clients', name: 'jedis', version: '3.2.0')
+        api(group: 'io.lettuce', name: 'lettuce-core', version: '5.2.1.RELEASE')
         api(group: 'xerces', name: 'xercesImpl', version: '2.12.0')
       }
     }
diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle
index 267ea9d..0d9a3f8 100644
--- a/geode-redis/build.gradle
+++ b/geode-redis/build.gradle
@@ -45,11 +45,14 @@ dependencies {
 
   commonTestImplementation(project(':geode-junit'))
   commonTestImplementation(project(':geode-dunit'))
+  commonTestImplementation('redis.clients:jedis')
 
   integrationTestImplementation(project(':geode-dunit'))
   integrationTestImplementation(project(':geode-junit'))
   integrationTestImplementation(sourceSets.commonTest.output)
   integrationTestImplementation('redis.clients:jedis')
+  integrationTestImplementation('io.lettuce:lettuce-core')
+  integrationTestImplementation('org.apache.logging.log4j:log4j-core')
   integrationTestRuntime(project(':geode-log4j'))
 
   acceptanceTestImplementation(sourceSets.integrationTest.output)
@@ -65,6 +68,7 @@ dependencies {
   }
   acceptanceTestImplementation('org.springframework.boot:spring-boot-starter-data-redis')
   acceptanceTestImplementation('org.springframework.session:spring-session-data-redis')
+  acceptanceTestImplementation('org.apache.logging.log4j:log4j-core')
 
   distributedTestCompile('org.apache.logging.log4j:log4j-core')
   distributedTestImplementation(project(':geode-dunit'))
diff --git a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java
index ee3ac5e..ce184ae 100644
--- a/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java
+++ b/geode-redis/src/acceptanceTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubNativeRedisAcceptanceTest.java
@@ -42,8 +42,8 @@ public class PubSubNativeRedisAcceptanceTest extends PubSubIntegrationTest {
   public static void setUp() {
     redisContainer = new GenericContainer<>("redis:5.0.6").withExposedPorts(6379);
     redisContainer.start();
-    subscriber = new Jedis("localhost", redisContainer.getFirstMappedPort(), REDIS_CLIENT_TIMEOUT);
-    publisher = new Jedis("localhost", redisContainer.getFirstMappedPort(), REDIS_CLIENT_TIMEOUT);
+    subscriber = new Jedis("localhost", redisContainer.getFirstMappedPort(), JEDIS_TIMEOUT);
+    publisher = new Jedis("localhost", redisContainer.getFirstMappedPort(), JEDIS_TIMEOUT);
   }
 
   @AfterClass
@@ -55,4 +55,5 @@ public class PubSubNativeRedisAcceptanceTest extends PubSubIntegrationTest {
   public int getPort() {
     return redisContainer.getFirstMappedPort();
   }
+
 }
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java
similarity index 100%
rename from geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java
rename to geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockBinarySubscriber.java
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
similarity index 54%
rename from geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
rename to geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
index e0ecb43..e55e4cb 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
+++ b/geode-redis/src/commonTest/java/org/apache/geode/redis/mocks/MockSubscriber.java
@@ -11,7 +11,6 @@
  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
  * or implied. See the License for the specific language governing permissions and limitations under
  * the License.
- *
  */
 
 package org.apache.geode.redis.mocks;
@@ -20,33 +19,112 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
+import redis.clients.jedis.Client;
 import redis.clients.jedis.JedisPubSub;
 
 public class MockSubscriber extends JedisPubSub {
+
+  private final CountDownLatch subscriptionLatch;
+  private final CountDownLatch unsubscriptionLatch;
   private final List<String> receivedMessages = Collections.synchronizedList(new ArrayList<>());
   private final List<String> receivedPMessages = Collections.synchronizedList(new ArrayList<>());
+  public final List<UnsubscribeInfo> unsubscribeInfos =
+      Collections.synchronizedList(new ArrayList<>());
+  public final List<UnsubscribeInfo> punsubscribeInfos =
+      Collections.synchronizedList(new ArrayList<>());
+  private String localSocketAddress;
+
+  public MockSubscriber() {
+    this(new CountDownLatch(1));
+  }
+
+  public MockSubscriber(CountDownLatch subscriptionLatch) {
+    this(subscriptionLatch, new CountDownLatch(1));
+  }
+
+  public MockSubscriber(CountDownLatch subscriptionLatch, CountDownLatch unsubscriptionLatch) {
+    this.subscriptionLatch = subscriptionLatch;
+    this.unsubscriptionLatch = unsubscriptionLatch;
+  }
+
+  @Override
+  public void proceed(Client client, String... channels) {
+    localSocketAddress = client.getSocket().getLocalSocketAddress().toString();
+    super.proceed(client, channels);
+  }
+
+  private void switchThreadName(String suffix) {
+    String threadName = Thread.currentThread().getName();
+    int suffixIndex = threadName.indexOf(" -- ");
+    if (suffixIndex >= 0) {
+      threadName = threadName.substring(0, suffixIndex);
+    }
+
+    threadName += " -- " + suffix + " [" + localSocketAddress + "]";
+    Thread.currentThread().setName(threadName);
+  }
 
   public List<String> getReceivedMessages() {
-    return new ArrayList<>(receivedMessages);
+    return receivedMessages;
   }
 
   public List<String> getReceivedPMessages() {
     return new ArrayList<>(receivedPMessages);
   }
 
-  public final List<UnsubscribeInfo> unsubscribeInfos =
-      Collections.synchronizedList(new ArrayList<>());
-  public final List<UnsubscribeInfo> punsubscribeInfos =
-      Collections.synchronizedList(new ArrayList<>());
+  @Override
+  public void onMessage(String channel, String message) {
+    switchThreadName(String.format("MESSAGE %s %s", channel, message));
+    receivedMessages.add(message);
+  }
+
+  @Override
+  public void onPMessage(String pattern, String channel, String message) {
+    switchThreadName(String.format("PMESSAGE %s %s %s", pattern, channel, message));
+    receivedPMessages.add(message);
+  }
+
+  @Override
+  public void onSubscribe(String channel, int subscribedChannels) {
+    switchThreadName(String.format("SUBSCRIBE %s", channel));
+    subscriptionLatch.countDown();
+  }
+
+  private static final int AWAIT_TIMEOUT_MILLIS = 30000;
+
+  public void awaitSubscribe(String channel) {
+    try {
+      if (!subscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+        throw new RuntimeException("awaitSubscribe timed out for channel: " + channel);
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
 
   @Override
   public void onUnsubscribe(String channel, int subscribedChannels) {
+    switchThreadName(String.format("UNSUBSCRIBE %s %d", channel, subscribedChannels));
     unsubscribeInfos.add(new UnsubscribeInfo(channel, subscribedChannels));
+    unsubscriptionLatch.countDown();
+  }
+
+  public void awaitUnsubscribe(String channel) {
+    try {
+      if (!unsubscriptionLatch.await(AWAIT_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
+        throw new RuntimeException("awaitUnsubscribe timed out for channel: " + channel);
+      }
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
   }
 
   @Override
   public void onPUnsubscribe(String pattern, int subscribedChannels) {
+    switchThreadName(String.format("PUNSUBSCRIBE %s %d", pattern, subscribedChannels));
     punsubscribeInfos.add(new UnsubscribeInfo(pattern, subscribedChannels));
   }
 
@@ -86,14 +164,4 @@ public class MockSubscriber extends JedisPubSub {
     }
   }
 
-
-  @Override
-  public void onMessage(String channel, String message) {
-    receivedMessages.add(message);
-  }
-
-  @Override
-  public void onPMessage(String pattern, String channel, String message) {
-    receivedPMessages.add(message);
-  }
 }
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/MockSubscriber.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/MockSubscriber.java
deleted file mode 100644
index 5ac9c38..0000000
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/MockSubscriber.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- *
- */
-
-package org.apache.geode.redis;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import org.apache.logging.log4j.Logger;
-import redis.clients.jedis.Client;
-import redis.clients.jedis.JedisPubSub;
-
-import org.apache.geode.logging.internal.log4j.api.LogService;
-
-public class MockSubscriber extends JedisPubSub {
-  private CountDownLatch latch;
-  private List<String> receivedMessages = new ArrayList<String>();
-  private Client client;
-
-  private static final Logger logger = LogService.getLogger();
-
-  public MockSubscriber(CountDownLatch latch) {
-    this.latch = latch;
-  }
-
-  public List<String> getReceivedMessages() {
-    return receivedMessages;
-  }
-
-  @Override
-  public void onSubscribe(String channel, int subscribedChannels) {
-    // logger.info("--->>> Received subscription for " + client.getSocket());
-    latch.countDown();
-  }
-
-  @Override
-  public void onMessage(String channel, String message) {
-    receivedMessages.add(message);
-  }
-
-  @Override
-  public void proceed(Client client, String... channels) {
-    this.client = client;
-    // logger.info("--->>> Before for " + client.getSocket());
-    super.proceed(client, channels);
-    // logger.info("--->>> After for " + client.getSocket());
-  }
-}
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
index 52ed634..4ea912a 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubDUnitTest.java
@@ -30,25 +30,35 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
 import org.junit.AfterClass;
-import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.exceptions.JedisConnectionException;
 
-import org.apache.geode.redis.MockSubscriber;
+import org.apache.geode.logging.internal.executors.LoggingThread;
+import org.apache.geode.logging.internal.log4j.api.FastLogger;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.mocks.MockSubscriber;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.Host;
+import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.MemberVM;
 import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
 import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 import org.apache.geode.test.junit.rules.GfshCommandRule;
 
+
 public class PubSubDUnitTest {
 
   public static final String CHANNEL_NAME = "salutations";
+  public static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
 
   @ClassRule
   public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(6);
@@ -92,6 +102,14 @@ public class PubSubDUnitTest {
     server4 = cluster.startRedisVM(4, locator.getPort());
     server5 = cluster.startServerVM(5, locator.getPort());
 
+    for (VM v : Host.getHost(0).getAllVMs()) {
+      v.invoke(() -> {
+        Logger logger = LogService.getLogger("org.apache.geode.redis");
+        Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
+        FastLogger.setDelegating(true);
+      });
+    }
+
     redisServerPort1 = cluster.getRedisPort(1);
     redisServerPort2 = cluster.getRedisPort(2);
     redisServerPort3 = cluster.getRedisPort(3);
@@ -105,12 +123,6 @@ public class PubSubDUnitTest {
     gfsh.connectAndVerify(locator);
   }
 
-  @Before
-  public void testSetup() {
-    subscriber1.flushAll();
-    subscriber2.flushAll();
-  }
-
   @AfterClass
   public static void tearDown() {
     subscriber1.disconnect();
@@ -126,6 +138,70 @@ public class PubSubDUnitTest {
   }
 
   @Test
+  public void shouldNotHang_givenPublishingAndSubscribingSimultaneously() {
+    ArrayList<Thread> threads = new ArrayList<>();
+    AtomicLong publishCount = new AtomicLong();
+    Random random = new Random();
+    int SUBSCRIBER_COUNT = 5;
+    int CHANNEL_COUNT = 200;
+
+    for (int i = 0; i < CHANNEL_COUNT; i++) {
+      String channelName = "theBestChannel" + i;
+      Thread thread = new LoggingThread(channelName, () -> {
+        ArrayList<MockSubscriber> mockSubscribers = new ArrayList<>();
+        ArrayList<Jedis> clients = new ArrayList<>();
+        ArrayList<Future<Void>> subscribeFutures = new ArrayList<>();
+
+        for (int j = 0; j < SUBSCRIBER_COUNT; j++) {
+          MockSubscriber mockSubscriber = new MockSubscriber();
+          mockSubscribers.add(mockSubscriber);
+          Jedis client = getConnection(random);
+          clients.add(client);
+
+          Future<Void> f = executor.submit(() -> {
+            client.subscribe(mockSubscriber, channelName);
+          });
+          subscribeFutures.add(f);
+        }
+
+        mockSubscribers.forEach(x -> x.awaitSubscribe(channelName));
+
+        Jedis localPublisher = getConnection(random);
+        long published = localPublisher.publish(channelName, "hi");
+        publishCount.getAndAdd(published);
+        localPublisher.close();
+
+        mockSubscribers.forEach(s -> {
+          s.unsubscribe(channelName);
+          s.awaitUnsubscribe(channelName);
+        });
+
+        subscribeFutures.forEach(x -> {
+          try {
+            x.get(JEDIS_TIMEOUT, TimeUnit.MILLISECONDS);
+          } catch (Exception e) {
+            throw new RuntimeException(e);
+          }
+        });
+        clients.forEach(Jedis::close);
+      });
+
+      threads.add(thread);
+      thread.start();
+    }
+
+    threads.forEach(thread -> {
+      try {
+        thread.join();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+    assertThat(publishCount.get()).isEqualTo(CHANNEL_COUNT * SUBSCRIBER_COUNT);
+  }
+
+  @Test
   public void shouldContinueToFunction_whenOneServerShutsDownGracefully_givenTwoSubscribersOnePublisher()
       throws InterruptedException {
     CountDownLatch latch = new CountDownLatch(2);
@@ -133,13 +209,12 @@ public class PubSubDUnitTest {
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Future<Void> subscriber1Future = executor.submit(
-        () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
-    Future<Void> subscriber2Future = executor.submit(
-        () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+    Future<Void> subscriber1Future =
+        executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    Future<Void> subscriber2Future =
+        executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS))
-        .as("channel subscription was not received")
+    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
         .isTrue();
 
     Long result = publisher1.publish(CHANNEL_NAME, "hello");
@@ -164,13 +239,12 @@ public class PubSubDUnitTest {
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Future<Void> subscriber1Future = executor.submit(
-        () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
-    Future<Void> subscriber2Future = executor.submit(
-        () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+    Future<Void> subscriber1Future =
+        executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    Future<Void> subscriber2Future =
+        executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS))
-        .as("channel subscription was not received")
+    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
         .isTrue();
 
     Long result = publisher1.publish(CHANNEL_NAME, "hello");
@@ -215,13 +289,12 @@ public class PubSubDUnitTest {
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Future<Void> subscriber1Future = executor.submit(
-        () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
-    Future<Void> subscriber2Future = executor.submit(
-        () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+    Future<Void> subscriber1Future =
+        executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    Future<Void> subscriber2Future =
+        executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS))
-        .as("channel subscription was not received")
+    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
         .isTrue();
 
     Long resultPublisher1 = publisher1.publish(CHANNEL_NAME, "hello");
@@ -248,13 +321,12 @@ public class PubSubDUnitTest {
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Future<Void> subscriber1Future = executor.submit(
-        () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
-    Future<Void> subscriber2Future = executor.submit(
-        () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+    Future<Void> subscriber1Future =
+        executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    Future<Void> subscriber2Future =
+        executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS))
-        .as("channel subscription was not received")
+    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
         .isTrue();
 
     Long result = publisher1.publish(CHANNEL_NAME, "hello");
@@ -276,13 +348,12 @@ public class PubSubDUnitTest {
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Future<Void> subscriber1Future = executor.submit(
-        () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
-    Future<Void> subscriber2Future = executor.submit(
-        () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+    Future<Void> subscriber1Future =
+        executor.submit(() -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    Future<Void> subscriber2Future =
+        executor.submit(() -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
-    assertThat(latch.await(30, TimeUnit.SECONDS))
-        .as("channel subscription was not received")
+    assertThat(latch.await(30, TimeUnit.SECONDS)).as("channel subscription was not received")
         .isTrue();
 
     List<Future<Void>> futures = new LinkedList<>();
@@ -321,6 +392,7 @@ public class PubSubDUnitTest {
     String CHANNEL_NAME = "best_channel_ever";
 
     List<Jedis> clients = new ArrayList<>();
+    List<MockSubscriber> subscribers = new ArrayList<>();
 
     // Build up an initial set of subscribers
     for (int i = 0; i < CLIENT_COUNT; i++) {
@@ -332,6 +404,7 @@ public class PubSubDUnitTest {
       executor.submit(() -> client.subscribe(mockSubscriber, CHANNEL_NAME));
       latch.await();
 
+      subscribers.add(mockSubscriber);
       clients.add(client);
     }
 
@@ -344,13 +417,18 @@ public class PubSubDUnitTest {
 
     assertThat(result).isEqualTo(CLIENT_COUNT * 10);
 
+    subscribers.forEach(x -> {
+      x.unsubscribe();
+      x.awaitUnsubscribe(CHANNEL_NAME);
+    });
     clients.forEach(Jedis::close);
   }
 
   @Test
   public void testPubSubWithManyClientsDisconnecting() throws Exception {
-    int CLIENT_COUNT = 10;
-    int ITERATIONS = 400;
+    int CLIENT_COUNT = 1;
+    int ITERATIONS = 1000;
+    String LOCAL_CHANNEL_NAME = "disconnecting_channel";
 
     Random random = new Random();
     List<Jedis> clients = new ArrayList<>();
@@ -362,15 +440,15 @@ public class PubSubDUnitTest {
 
       CountDownLatch latch = new CountDownLatch(1);
       MockSubscriber mockSubscriber = new MockSubscriber(latch);
-      executor.submit(() -> client.subscribe(mockSubscriber, CHANNEL_NAME));
+      executor.submit(() -> client.subscribe(mockSubscriber, LOCAL_CHANNEL_NAME));
       latch.await();
     }
 
     // Start actively publishing in the background
-    Jedis publishingClient = new Jedis("localhost", redisServerPort1);
+    Jedis publishingClient = new Jedis("localhost", redisServerPort1, 60_000);
     Callable<Void> callable = () -> {
       for (int j = 0; j < ITERATIONS; j++) {
-        publishingClient.publish(CHANNEL_NAME, "hello - " + j);
+        publishingClient.publish(LOCAL_CHANNEL_NAME, "hello - " + j);
       }
       return null;
     };
@@ -385,7 +463,7 @@ public class PubSubDUnitTest {
       Jedis client = new Jedis("localhost", redisServerPort1);
       CountDownLatch latch = new CountDownLatch(1);
       MockSubscriber mockSubscriber = new MockSubscriber(latch);
-      executor.submit(() -> client.subscribe(mockSubscriber, CHANNEL_NAME));
+      executor.submit(() -> client.subscribe(mockSubscriber, LOCAL_CHANNEL_NAME));
       latch.await();
 
       clients.set(candy, client);
@@ -409,12 +487,11 @@ public class PubSubDUnitTest {
   }
 
   private void waitForRestart() {
-    await()
-        .untilAsserted(() -> gfsh.executeAndAssertThat("list members")
-            .statusIsSuccess()
-            .hasTableSection()
+    await().untilAsserted(
+        () -> gfsh.executeAndAssertThat("list members").statusIsSuccess().hasTableSection()
             .hasColumn("Name")
-            .containsOnly("locator-0", "server-1", "server-2", "server-3", "server-4", "server-5"));
+            .containsOnly("locator-0", "server-1", "server-2", "server-3", "server-4",
+                "server-5"));
   }
 
   private void reconnectSubscriber1() {
@@ -424,4 +501,25 @@ public class PubSubDUnitTest {
   private void reconnectSubscriber2() {
     subscriber2 = new Jedis(LOCAL_HOST, redisServerPort2);
   }
+
+  private Jedis getConnection(Random random) {
+    Jedis client = null;
+
+    for (int i = 0; i < 10; i++) {
+      int randPort = random.nextInt(4) + 1;
+      try {
+        client = new Jedis("localhost", cluster.getRedisPort(randPort), JEDIS_TIMEOUT);
+        client.ping();
+        return client;
+      } catch (Exception e) {
+        try {
+          if (client != null) {
+            client.close();
+          }
+        } catch (Exception exception) {
+        }
+      }
+    }
+    throw new RuntimeException("Tried 10 times, but could not get a good connection.");
+  }
 }
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
index 639ea45..5e011e9 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/session/RedisSessionDUnitTest.java
@@ -17,16 +17,25 @@ package org.apache.geode.redis.session;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.test.junit.categories.RedisTest;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 @Category({RedisTest.class})
 public class RedisSessionDUnitTest extends SessionDUnitTest {
 
+  @Rule
+  public ExecutorServiceRule executor = new ExecutorServiceRule();
+
   @BeforeClass
   public static void setup() {
     SessionDUnitTest.setup();
@@ -45,6 +54,42 @@ public class RedisSessionDUnitTest extends SessionDUnitTest {
   }
 
   @Test
+  public void createSessionsConcurrently() throws ExecutionException, InterruptedException {
+    AtomicBoolean running = new AtomicBoolean(true);
+    CountDownLatch latch = new CountDownLatch(1);
+
+    Future<Void> future1 = executor.submit(() -> sessionCreator(1, 100, running, latch));
+    Future<Void> future2 = executor.submit(() -> sessionCreator(2, 100, running, latch));
+    Future<Void> future3 = executor.submit(() -> sessionCreator(3, 100, running, latch));
+
+    latch.countDown();
+
+    future1.get();
+    future2.get();
+    future3.get();
+  }
+
+  private void sessionCreator(int index, int iterations, AtomicBoolean running,
+      CountDownLatch latch)
+      throws InterruptedException {
+    int iterationCount = 0;
+    latch.await();
+    while (iterationCount < iterations && running.get()) {
+      String noteName = String.format("note-%d-%d", index, iterationCount);
+      try {
+        String sessionCookie = createNewSessionWithNote(APP1, noteName);
+        String[] sessionNotes = getSessionNotes(APP2, sessionCookie);
+        assertThat(sessionNotes).containsExactly(noteName);
+      } catch (Exception e) {
+        running.set(false);
+        throw new RuntimeException("BANG " + noteName, e);
+      }
+
+      iterationCount++;
+    }
+  }
+
+  @Test
   public void should_storeSession() {
     String sessionCookie = createNewSessionWithNote(APP1, "note1");
 
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
new file mode 100644
index 0000000..23c0073
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/LettucePubSubIntegrationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.pubsub;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Future;
+
+import io.lettuce.core.RedisClient;
+import io.lettuce.core.RedisFuture;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
+
+public class LettucePubSubIntegrationTest {
+
+  private static final String CHANNEL = "best-channel";
+  private RedisClient client;
+
+  @ClassRule
+  public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+  @ClassRule
+  public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
+  @Before
+  public void before() {
+    client = RedisClient.create("redis://localhost:" + server.getPort());
+  }
+
+  @After
+  public void after() {
+    client.shutdown();
+  }
+
+
+  @Test
+  public void concurrentPublishersToMultipleSubscribers_doNotLosePublishMessages()
+      throws Exception {
+    int subscriberCount = 50;
+    int publisherCount = 10;
+    int publishIterations = 10000;
+
+    for (int i = 0; i < subscriberCount; i++) {
+      StatefulRedisPubSubConnection<String, String> subscriber = client.connectPubSub();
+      subscriber.sync().subscribe(CHANNEL);
+    }
+
+    List<Future<Long>> results = new ArrayList<>();
+    for (int i = 0; i < publisherCount; i++) {
+      int localI = i;
+      results.add(executor.submit(() -> publish(localI, publishIterations)));
+    }
+
+    long publishCount = 0;
+    for (Future<Long> r : results) {
+      publishCount += r.get();
+    }
+
+    assertThat(publishCount).isEqualTo(subscriberCount * publisherCount * publishIterations);
+  }
+
+  private Long publish(int index, int iterationCount) throws Exception {
+    StatefulRedisPubSubConnection<String, String> publisher = client.connectPubSub();
+    long publishCount = 0;
+
+    List<RedisFuture<Long>> results = new ArrayList<>();
+    for (int i = 0; i < iterationCount; i++) {
+      results.add(publisher.async().publish(CHANNEL, "message-" + index + "-" + i));
+    }
+
+    for (RedisFuture<Long> r : results) {
+      publishCount += r.get();
+    }
+
+    return publishCount;
+  }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
index 20c4a60..5d341cf 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/pubsub/PubSubIntegrationTest.java
@@ -20,10 +20,21 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.Logger;
+import org.apache.logging.log4j.core.config.Configurator;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -32,6 +43,8 @@ import org.junit.experimental.categories.Category;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.Protocol;
 
+import org.apache.geode.logging.internal.log4j.api.FastLogger;
+import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.GeodeRedisServerRule;
 import org.apache.geode.redis.mocks.MockBinarySubscriber;
 import org.apache.geode.redis.mocks.MockSubscriber;
@@ -43,7 +56,7 @@ import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 public class PubSubIntegrationTest {
   static Jedis publisher;
   static Jedis subscriber;
-  static final int REDIS_CLIENT_TIMEOUT = 100000;
+  public static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
 
   @ClassRule
   public static GeodeRedisServerRule server = new GeodeRedisServerRule();
@@ -53,8 +66,8 @@ public class PubSubIntegrationTest {
 
   @BeforeClass
   public static void setUp() {
-    subscriber = new Jedis("localhost", server.getPort(), REDIS_CLIENT_TIMEOUT);
-    publisher = new Jedis("localhost", server.getPort(), REDIS_CLIENT_TIMEOUT);
+    subscriber = new Jedis("localhost", server.getPort(), JEDIS_TIMEOUT);
+    publisher = new Jedis("localhost", server.getPort(), JEDIS_TIMEOUT);
   }
 
   @AfterClass
@@ -379,11 +392,13 @@ public class PubSubIntegrationTest {
     waitFor(() -> !subscriberThread.isAlive());
 
     List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
-        .map(x -> x.channel).collect(Collectors.toList());
+        .map(x -> x.channel)
+        .collect(Collectors.toList());
     assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");
 
     List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
-        .map(x -> x.count).collect(Collectors.toList());
+        .map(x -> x.count)
+        .collect(Collectors.toList());
     assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
 
   }
@@ -404,11 +419,13 @@ public class PubSubIntegrationTest {
     waitFor(() -> !subscriberThread.isAlive());
 
     List<String> unsubscribedChannels = mockSubscriber.unsubscribeInfos.stream()
-        .map(x -> x.channel).collect(Collectors.toList());
+        .map(x -> x.channel)
+        .collect(Collectors.toList());
     assertThat(unsubscribedChannels).containsExactlyInAnyOrder("salutations", "yuletide");
 
     List<Integer> channelCounts = mockSubscriber.unsubscribeInfos.stream()
-        .map(x -> x.count).collect(Collectors.toList());
+        .map(x -> x.count)
+        .collect(Collectors.toList());
     assertThat(channelCounts).containsExactlyInAnyOrder(1, 0);
 
     Long result = publisher.publish("salutations", "greetings");
@@ -455,7 +472,7 @@ public class PubSubIntegrationTest {
 
   @Test
   public void testTwoSubscribersOneChannel() {
-    Jedis subscriber2 = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+    Jedis subscriber2 = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
     MockSubscriber mockSubscriber1 = new MockSubscriber();
     MockSubscriber mockSubscriber2 = new MockSubscriber();
 
@@ -525,7 +542,7 @@ public class PubSubIntegrationTest {
 
   @Test
   public void testDeadSubscriber() {
-    Jedis deadSubscriber = new Jedis("localhost", getPort(), REDIS_CLIENT_TIMEOUT);
+    Jedis deadSubscriber = new Jedis("localhost", getPort(), JEDIS_TIMEOUT);
 
     MockSubscriber mockSubscriber = new MockSubscriber();
 
@@ -674,6 +691,153 @@ public class PubSubIntegrationTest {
     assertThat(mockSubscriber.getReceivedPMessages()).containsExactly("hello");
   }
 
+  private Jedis getConnection() {
+    Exception lastException = null;
+
+    for (int i = 0; i < 10; i++) {
+      Jedis client = null;
+      try {
+        client = new Jedis("localhost", server.getPort(), JEDIS_TIMEOUT);
+        client.ping();
+        return client;
+      } catch (Exception e) {
+        lastException = e;
+        if (client != null) {
+          try {
+            client.close();
+          } catch (Exception ignore) {
+          }
+        }
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+    }
+
+    throw new RuntimeException("Tried 10 times, but could not get a good connection.",
+        lastException);
+  }
+
+  int doPublishing(int index, int minimumIterations, AtomicBoolean running) {
+    int iterationCount = 0;
+    int publishedMessages = 0;
+    Jedis client = getConnection();
+    try {
+      while (iterationCount < minimumIterations || running.get()) {
+        publishedMessages += client.publish("my-channel", "boo-" + index + "-" + iterationCount);
+        iterationCount++;
+      }
+    } finally {
+      client.close();
+    }
+
+    return publishedMessages;
+  }
+
+  int makeSubscribers(int index, int minimumIterations, AtomicBoolean running)
+      throws InterruptedException, ExecutionException {
+    ExecutorService executor = Executors.newFixedThreadPool(100);
+    // ExecutorService secondaryExecutor = Executors.newCachedThreadPool();
+    Queue<Future<Void>> workQ = new ConcurrentLinkedQueue<>();
+
+    Future<Integer> consumer = executor.submit(() -> {
+      int subscribersProcessed = 0;
+      while (subscribersProcessed < minimumIterations || running.get()) {
+        if (workQ.isEmpty()) {
+          Thread.yield();
+          continue;
+        }
+        Future<Void> f = workQ.poll();
+        try {
+          f.get();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        subscribersProcessed++;
+      }
+      return subscribersProcessed;
+    });
+
+    int iterationCount = 0;
+    String channel = "my-channel";
+    while (iterationCount < minimumIterations || running.get()) {
+      Future<Void> f = executor.submit(() -> {
+        Jedis client = getConnection();
+        ExecutorService secondaryExecutor = Executors.newSingleThreadExecutor();
+        MockSubscriber mockSubscriber = new MockSubscriber();
+        AtomicReference<Thread> innerThread = new AtomicReference<>();
+        Future<Void> inner = secondaryExecutor.submit(() -> {
+          innerThread.set(Thread.currentThread());
+          client.subscribe(mockSubscriber, channel);
+          return null;
+        });
+        mockSubscriber.awaitSubscribe(channel);
+        if (inner.isDone()) {
+          throw new RuntimeException("inner completed before unsubscribe");
+        }
+
+        mockSubscriber.unsubscribe(channel);
+
+        try {
+          inner.get(30, TimeUnit.SECONDS);
+        } catch (Exception e) {
+          LogService.getLogger().debug("=> {} {}", innerThread.get(), innerThread.get().getState());
+          for (StackTraceElement st : innerThread.get().getStackTrace()) {
+            LogService.getLogger().debug("-> {}", st);
+          }
+          throw new RuntimeException("inner.get() errored after unsubscribe: " + e.getMessage());
+        }
+
+        mockSubscriber.awaitUnsubscribe(channel);
+        client.close();
+        secondaryExecutor.shutdownNow();
+
+        return null;
+      });
+      workQ.add(f);
+      iterationCount++;
+    }
+
+    int result = consumer.get();
+    executor.shutdownNow();
+
+    return result;
+  }
+
+  @Test
+  public void concurrentSubscribers_andPublishers_doesNotHang()
+      throws InterruptedException, ExecutionException {
+    Logger logger = LogService.getLogger("org.apache.geode.redis");
+    Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
+    FastLogger.setDelegating(true);
+
+    AtomicBoolean running = new AtomicBoolean(true);
+
+    Future<Integer> makeSubscribersFuture1 =
+        executor.submit(() -> makeSubscribers(1, 10000, running));
+    Future<Integer> makeSubscribersFuture2 =
+        executor.submit(() -> makeSubscribers(2, 10000, running));
+
+    Future<Integer> publish1 = executor.submit(() -> doPublishing(1, 10000, running));
+    Future<Integer> publish2 = executor.submit(() -> doPublishing(2, 10000, running));
+    Future<Integer> publish3 = executor.submit(() -> doPublishing(3, 10000, running));
+    Future<Integer> publish4 = executor.submit(() -> doPublishing(4, 10000, running));
+    Future<Integer> publish5 = executor.submit(() -> doPublishing(5, 10000, running));
+
+    running.set(false);
+
+    assertThat(makeSubscribersFuture1.get()).isGreaterThanOrEqualTo(10);
+    assertThat(makeSubscribersFuture2.get()).isGreaterThanOrEqualTo(10);
+
+    assertThat(publish1.get()).isGreaterThan(0);
+    assertThat(publish2.get()).isGreaterThan(0);
+    assertThat(publish3.get()).isGreaterThan(0);
+    assertThat(publish4.get()).isGreaterThan(0);
+    assertThat(publish5.get()).isGreaterThan(0);
+  }
+
   private void waitFor(Callable<Boolean> booleanCallable) {
     GeodeAwaitility.await()
         .ignoreExceptions() // ignoring socket closed exceptions
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
index a4dc896..cf79b1c 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/pubsub/SubscriptionsIntegrationTest.java
@@ -108,7 +108,8 @@ public class SubscriptionsIntegrationTest {
     for (int i = 0; i < ITERATIONS; i++) {
       Client client = new Client(mock(Channel.class));
       clients.add(client);
-      subscriptions.add(new ChannelSubscription(client, "channel".getBytes(), context));
+      subscriptions
+          .add(new ChannelSubscription(client, "channel".getBytes(), context, subscriptions));
     }
 
     new ConcurrentLoopingThreads(1,
@@ -128,7 +129,8 @@ public class SubscriptionsIntegrationTest {
     for (int i = 0; i < ITERATIONS; i++) {
       Client client = new Client(mock(Channel.class));
       clients.add(client);
-      subscriptions.add(new ChannelSubscription(client, "channel".getBytes(), context));
+      subscriptions
+          .add(new ChannelSubscription(client, "channel".getBytes(), context, subscriptions));
     }
 
     new ConcurrentLoopingThreads(1,
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
index f814a96..cbbec95 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/DummySubscription.java
@@ -52,4 +52,10 @@ public class DummySubscription implements Subscription {
   public byte[] getChannelName() {
     return null;
   }
+
+  @Override
+  public void readyToPublish() {}
+
+  @Override
+  public void shutdown() {}
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 8f4c727..0d8bcb7 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -93,7 +93,9 @@ public class GeodeRedisServer {
         new PassiveExpirationManager(regionProvider.getDataRegion(), redisStats);
     nettyRedisServer = new NettyRedisServer(() -> cache.getInternalDistributedSystem().getConfig(),
         regionProvider, pubSub,
-        this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats);
+        this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats,
+        cache.getInternalDistributedSystem().getDistributionManager().getExecutors()
+            .getWaitingThreadPool());
   }
 
   public void setAllowUnsupportedCommands(boolean allowUnsupportedCommands) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index 8a6d153..4a82c4e 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -32,10 +32,22 @@ public class RedisResponse {
 
   private final Function<ByteBufAllocator, ByteBuf> coderCallback;
 
+  private Runnable afterWriteCallback;
+
   private RedisResponse(Function<ByteBufAllocator, ByteBuf> coderCallback) {
     this.coderCallback = coderCallback;
   }
 
+  public void setAfterWriteCallback(Runnable callback) {
+    afterWriteCallback = callback;
+  }
+
+  public void afterWrite() {
+    if (afterWriteCallback != null) {
+      afterWriteCallback.run();
+    }
+  }
+
   public ByteBuf encode(ByteBufAllocator allocator) {
     return coderCallback.apply(allocator);
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
index 9ece01f..9244b55 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PsubscribeExecutor.java
@@ -18,34 +18,58 @@ package org.apache.geode.redis.internal.executor.pubsub;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.function.Consumer;
 
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
-import org.apache.geode.redis.internal.executor.GlobPattern;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.pubsub.SubscribeResult;
 
 public class PsubscribeExecutor extends AbstractExecutor {
 
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    Collection<Collection<?>> items = new ArrayList<>();
+
+    Collection<SubscribeResult> results = new ArrayList<>();
     for (int i = 1; i < command.getProcessedCommand().size(); i++) {
-      Collection<Object> item = new ArrayList<>();
-      byte[] pattern = command.getProcessedCommand().get(i);
-      long subscribedChannels =
-          context.getPubSub().psubscribe(
-              new GlobPattern(new String(pattern)), context, context.getClient());
+      byte[] patternBytes = command.getProcessedCommand().get(i);
+      SubscribeResult result =
+          context.getPubSub().psubscribe(patternBytes, context, context.getClient());
+      results.add(result);
+    }
 
+    Collection<Collection<?>> items = new ArrayList<>();
+    for (SubscribeResult result : results) {
+      Collection<Object> item = new ArrayList<>();
       item.add("psubscribe");
-      item.add(pattern);
-      item.add(subscribedChannels);
-
+      item.add(result.getChannel());
+      item.add(result.getChannelCount());
       items.add(item);
     }
 
-    return RedisResponse.flattenedArray(items);
+
+    Runnable callback = () -> {
+      Consumer<Boolean> innerCallback = success -> {
+        for (SubscribeResult result : results) {
+          if (result.getSubscription() != null) {
+            if (success) {
+              result.getSubscription().readyToPublish();
+            } else {
+              result.getSubscription().shutdown();
+            }
+          }
+        }
+      };
+      context.changeChannelEventLoopGroup(context.getSubscriberGroup(), innerCallback);
+    };
+
+    RedisResponse response = RedisResponse.flattenedArray(items);
+    response.setAfterWriteCallback(callback);
+
+    return response;
+
   }
 
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
index 12101d5..ee51563 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/PublishExecutor.java
@@ -27,12 +27,13 @@ public class PublishExecutor extends AbstractExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    List<byte[]> args = command.getProcessedCommand();
 
-    long publishCount =
-        context.getPubSub().publish(getDataRegion(context), args.get(1), args.get(2));
+    List<byte[]> args = command.getProcessedCommand();
+    byte[] channelName = args.get(1);
+    byte[] message = args.get(2);
+    long publishCount = context.getPubSub()
+        .publish(context.getRegionProvider().getDataRegion(), channelName, message);
 
     return RedisResponse.integer(publishCount);
   }
-
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
index 0cdacfb..9f2150f 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/pubsub/SubscribeExecutor.java
@@ -17,34 +17,55 @@ package org.apache.geode.redis.internal.executor.pubsub;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.function.Consumer;
 
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+import org.apache.geode.redis.internal.pubsub.SubscribeResult;
 
 public class SubscribeExecutor extends AbstractExecutor {
 
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    Collection<Collection<?>> items = new ArrayList<>();
+    Collection<SubscribeResult> results = new ArrayList<>();
     for (int i = 1; i < command.getProcessedCommand().size(); i++) {
-      Collection<Object> item = new ArrayList<>();
       byte[] channelName = command.getProcessedCommand().get(i);
-      long subscribedChannels =
+      SubscribeResult result =
           context.getPubSub().subscribe(channelName, context, context.getClient());
+      results.add(result);
+    }
 
+    Collection<Collection<?>> items = new ArrayList<>();
+    for (SubscribeResult result : results) {
+      Collection<Object> item = new ArrayList<>();
       item.add("subscribe");
-      item.add(channelName);
-      item.add(subscribedChannels);
-
+      item.add(result.getChannel());
+      item.add(result.getChannelCount());
       items.add(item);
     }
 
-    context.changeChannelEventLoopGroup(context.getSubscriberGroup());
+    Runnable callback = () -> {
+      Consumer<Boolean> innerCallback = success -> {
+        for (SubscribeResult result : results) {
+          if (result.getSubscription() != null) {
+            if (success) {
+              result.getSubscription().readyToPublish();
+            } else {
+              result.getSubscription().shutdown();
+            }
+          }
+        }
+      };
+      context.changeChannelEventLoopGroup(context.getSubscriberGroup(), innerCallback);
+    };
+
+    RedisResponse response = RedisResponse.flattenedArray(items);
+    response.setAfterWriteCallback(callback);
 
-    return RedisResponse.flattenedArray(items);
+    return response;
   }
 
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
index 14ccfb5..74e516c 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
@@ -19,6 +19,8 @@ import java.nio.channels.SocketChannel;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import io.netty.channel.ChannelHandlerContext;
+
 import org.apache.geode.redis.internal.RedisCommandType;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -35,6 +37,14 @@ public class Command {
   private ByteArrayWrapper bytes;
 
   /**
+   * Constructor used to create a static marker Command for shutting down executors.
+   */
+  Command() {
+    commandElems = null;
+    commandType = null;
+  }
+
+  /**
    * Constructor for {@link Command}. Must initialize Command with a {@link SocketChannel} and a
    * {@link List} of command elements
    *
@@ -176,4 +186,24 @@ public class Command {
     return String.format("wrong number of arguments for '%s' command",
         getCommandType().toString().toLowerCase());
   }
+
+  private long asyncStartTime;
+
+  public void setAsyncStartTime(long start) {
+    asyncStartTime = start;
+  }
+
+  public long getAsyncStartTime() {
+    return asyncStartTime;
+  }
+
+  private ChannelHandlerContext channelHandlerContext;
+
+  public void setChannelHandlerContext(ChannelHandlerContext ctx) {
+    channelHandlerContext = ctx;
+  }
+
+  public ChannelHandlerContext getChannelHandlerContext() {
+    return channelHandlerContext;
+  }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index e3f5546..ab33f84 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -17,6 +17,9 @@ package org.apache.geode.redis.internal.netty;
 
 
 import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 import io.netty.buffer.ByteBuf;
@@ -61,17 +64,22 @@ import org.apache.geode.redis.internal.pubsub.PubSub;
 public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
 
   private static final Logger logger = LogService.getLogger();
+  private static final Command TERMINATE_COMMAND = new Command();
 
   private final Client client;
   private final Channel channel;
   private final RegionProvider regionProvider;
   private final PubSub pubsub;
-  private final EventLoopGroup subscriberGroup;
   private final ByteBufAllocator byteBufAllocator;
   private final byte[] authPassword;
   private final Supplier<Boolean> allowUnsupportedSupplier;
   private final Runnable shutdownInvoker;
   private final RedisStats redisStats;
+  private final EventLoopGroup subscriberGroup;
+  private final int MAX_QUEUED_COMMANDS =
+      Integer.getInteger("geode.redis.commandQueueSize", 1000);
+  private final LinkedBlockingQueue<Command> commandQueue =
+      new LinkedBlockingQueue<>(MAX_QUEUED_COMMANDS);
 
   private boolean isAuthenticated;
 
@@ -82,31 +90,57 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
    * @param password Authentication password for each context, can be null
    */
   public ExecutionHandlerContext(Channel channel, RegionProvider regionProvider, PubSub pubsub,
-      EventLoopGroup subscriberGroup,
       Supplier<Boolean> allowUnsupportedSupplier,
       Runnable shutdownInvoker,
       RedisStats redisStats,
+      ExecutorService backgroundExecutor,
+      EventLoopGroup subscriberGroup,
       byte[] password) {
     this.channel = channel;
     this.regionProvider = regionProvider;
     this.pubsub = pubsub;
-    this.subscriberGroup = subscriberGroup;
     this.allowUnsupportedSupplier = allowUnsupportedSupplier;
     this.shutdownInvoker = shutdownInvoker;
     this.redisStats = redisStats;
+    this.subscriberGroup = subscriberGroup;
     this.client = new Client(channel);
     this.byteBufAllocator = this.channel.alloc();
     this.authPassword = password;
     this.isAuthenticated = password == null;
     redisStats.addClient();
-  }
 
-  public ChannelFuture writeToChannel(ByteBuf message) {
-    return channel.writeAndFlush(message, channel.newPromise());
+    backgroundExecutor.submit(this::processCommandQueue);
   }
 
   public ChannelFuture writeToChannel(RedisResponse response) {
-    return channel.writeAndFlush(response.encode(byteBufAllocator), channel.newPromise());
+    return channel.writeAndFlush(response.encode(byteBufAllocator), channel.newPromise())
+        .addListener((ChannelFutureListener) f -> {
+          response.afterWrite();
+          logResponse(response, channel.remoteAddress().toString(), f.cause());
+        });
+  }
+
+  private void processCommandQueue() {
+    while (true) {
+      Command command = takeCommandFromQueue();
+      if (command == TERMINATE_COMMAND) {
+        return;
+      }
+      try {
+        executeCommand(command);
+      } catch (Throwable ex) {
+        exceptionCaught(command.getChannelHandlerContext(), ex);
+      }
+    }
+  }
+
+  private Command takeCommandFromQueue() {
+    try {
+      return commandQueue.take();
+    } catch (InterruptedException e) {
+      logger.info("Command queue thread interrupted");
+      return TERMINATE_COMMAND;
+    }
   }
 
   /**
@@ -115,17 +149,8 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     Command command = (Command) msg;
-    try {
-      if (logger.isDebugEnabled()) {
-        logger.debug("Executing Redis command: {}", command);
-      }
-
-      executeCommand(ctx, command);
-    } catch (Exception e) {
-      logger.warn("Execution of Redis command {} failed: {}", command, e);
-      throw e;
-    }
-
+    command.setChannelHandlerContext(ctx);
+    commandQueue.put(command);
   }
 
   /**
@@ -139,6 +164,33 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     }
   }
 
+  public EventLoopGroup getSubscriberGroup() {
+    return subscriberGroup;
+  }
+
+  public synchronized void changeChannelEventLoopGroup(EventLoopGroup newGroup,
+      Consumer<Boolean> callback) {
+    if (newGroup.equals(channel.eventLoop())) {
+      // already registered with newGroup
+      callback.accept(true);
+      return;
+    }
+    channel.deregister().addListener((ChannelFutureListener) future -> {
+      boolean registerSuccess = true;
+      synchronized (channel) {
+        if (!channel.isRegistered()) {
+          try {
+            newGroup.register(channel).sync();
+          } catch (Exception e) {
+            logger.warn("Unable to register new EventLoopGroup: {}", e.getMessage());
+            registerSuccess = false;
+          }
+        }
+      }
+      callback.accept(registerSuccess);
+    });
+  }
+
   private RedisResponse getExceptionResponse(ChannelHandlerContext ctx, Throwable cause) {
     RedisResponse response;
     if (cause instanceof IOException) {
@@ -192,43 +244,50 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     if (logger.isDebugEnabled()) {
       logger.debug("GeodeRedisServer-Connection closing with " + ctx.channel().remoteAddress());
     }
+    commandQueue.offer(TERMINATE_COMMAND);
     redisStats.removeClient();
     ctx.channel().close();
     ctx.close();
   }
 
-  private void executeCommand(ChannelHandlerContext ctx, Command command) {
-    RedisResponse response;
+  private void executeCommand(Command command) {
+    try {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Executing Redis command: {} - {}", command,
+            channel.remoteAddress().toString());
+      }
 
-    if (!isAuthenticated()) {
-      response = handleUnAuthenticatedCommand(command);
-      writeToChannel(response);
-      return;
-    }
+      if (!isAuthenticated()) {
+        writeToChannel(handleUnAuthenticatedCommand(command));
+        return;
+      }
 
-    if (command.isUnsupported() && !allowUnsupportedCommands()) {
-      writeToChannel(
-          RedisResponse.error(command.getCommandType() + RedisConstants.ERROR_UNSUPPORTED_COMMAND));
-      return;
-    }
+      if (command.isUnsupported() && !allowUnsupportedCommands()) {
+        writeToChannel(
+            RedisResponse
+                .error(command.getCommandType() + RedisConstants.ERROR_UNSUPPORTED_COMMAND));
+        return;
+      }
 
-    if (command.isUnimplemented()) {
-      logger.info("Failed " + command.getCommandType() + " because it is not implemented.");
-      writeToChannel(RedisResponse.error(command.getCommandType() + " is not implemented."));
-      return;
-    }
+      if (command.isUnimplemented()) {
+        logger.info("Failed " + command.getCommandType() + " because it is not implemented.");
+        writeToChannel(RedisResponse.error(command.getCommandType() + " is not implemented."));
+        return;
+      }
 
-    final long start = redisStats.startCommand(command.getCommandType());
-    try {
-      response = command.execute(this);
-      logResponse(response);
-      writeToChannel(response);
-    } finally {
-      redisStats.endCommand(command.getCommandType(), start);
-    }
+      final long start = redisStats.startCommand(command.getCommandType());
+      try {
+        writeToChannel(command.execute(this));
+      } finally {
+        redisStats.endCommand(command.getCommandType(), start);
+      }
 
-    if (command.isOfType(RedisCommandType.QUIT)) {
-      channelInactive(ctx);
+      if (command.isOfType(RedisCommandType.QUIT)) {
+        channelInactive(command.getChannelHandlerContext());
+      }
+    } catch (Exception e) {
+      logger.warn("Execution of Redis command {} failed: {}", command, e);
+      throw e;
     }
   }
 
@@ -247,25 +306,16 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     return response;
   }
 
-  public EventLoopGroup getSubscriberGroup() {
-    return subscriberGroup;
-  }
-
-  public void changeChannelEventLoopGroup(EventLoopGroup newGroup) {
-    if (newGroup.equals(channel.eventLoop())) {
-      // already registered with newGroup
-      return;
-    }
-    channel.deregister().addListener((ChannelFutureListener) future -> {
-      newGroup.register(channel).sync();
-    });
-  }
-
-  private void logResponse(RedisResponse response) {
+  private void logResponse(RedisResponse response, String extraMessage, Throwable cause) {
     if (logger.isDebugEnabled() && response != null) {
       ByteBuf buf = response.encode(new UnpooledByteBufAllocator(false));
-      logger.debug("Redis command returned: {}",
-          Command.getHexEncodedString(buf.array(), buf.readableBytes()));
+      if (cause == null) {
+        logger.debug("Redis command returned: {} - {}",
+            Command.getHexEncodedString(buf.array(), buf.readableBytes()), extraMessage);
+      } else {
+        logger.debug("Redis command FAILED to return: {} - {}",
+            Command.getHexEncodedString(buf.array(), buf.readableBytes()), extraMessage, cause);
+      }
     }
   }
 
@@ -329,5 +379,4 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     return pubsub;
   }
 
-
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
index e5c53dd..bdcc80b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
@@ -26,6 +26,7 @@ import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.security.UnrecoverableKeyException;
 import java.security.cert.CertificateException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadFactory;
 import java.util.function.Supplier;
 
@@ -77,6 +78,7 @@ public class NettyRedisServer {
   private final Supplier<Boolean> allowUnsupportedSupplier;
   private final Runnable shutdownInvoker;
   private final RedisStats redisStats;
+  private final ExecutorService backgroundExecutor;
   private final EventLoopGroup selectorGroup;
   private final EventLoopGroup workerGroup;
   private final EventLoopGroup subscriberGroup;
@@ -88,13 +90,14 @@ public class NettyRedisServer {
       RegionProvider regionProvider, PubSub pubsub,
       Supplier<Boolean> allowUnsupportedSupplier,
       Runnable shutdownInvoker, int port, String requestedAddress,
-      RedisStats redisStats) {
+      RedisStats redisStats, ExecutorService backgroundExecutor) {
     this.configSupplier = configSupplier;
     this.regionProvider = regionProvider;
     this.pubsub = pubsub;
     this.allowUnsupportedSupplier = allowUnsupportedSupplier;
     this.shutdownInvoker = shutdownInvoker;
     this.redisStats = redisStats;
+    this.backgroundExecutor = backgroundExecutor;
     if (port < RANDOM_PORT_INDICATOR) {
       throw new IllegalArgumentException("Redis port cannot be less than 0");
     }
@@ -127,6 +130,7 @@ public class NettyRedisServer {
       closeFuture = serverChannel.closeFuture();
     }
     workerGroup.shutdownGracefully();
+    subscriberGroup.shutdownGracefully();
     Future<?> bossFuture = selectorGroup.shutdownGracefully();
     if (serverChannel != null) {
       serverChannel.close();
@@ -158,8 +162,9 @@ public class NettyRedisServer {
         pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(), new ByteToCommandDecoder());
         pipeline.addLast(new WriteTimeoutHandler(10));
         pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(),
-            new ExecutionHandlerContext(socketChannel, regionProvider, pubsub, subscriberGroup,
-                allowUnsupportedSupplier, shutdownInvoker, redisStats, redisPasswordBytes));
+            new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,
+                allowUnsupportedSupplier, shutdownInvoker, redisStats, backgroundExecutor,
+                subscriberGroup, redisPasswordBytes));
       }
     };
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
index 298fc58..b446a1f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/AbstractSubscription.java
@@ -16,14 +16,14 @@
 
 package org.apache.geode.redis.internal.pubsub;
 
-import io.netty.buffer.ByteBuf;
+import java.util.concurrent.CountDownLatch;
+
 import io.netty.channel.ChannelFutureListener;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Client;
-import org.apache.geode.redis.internal.netty.Coder;
-import org.apache.geode.redis.internal.netty.CoderException;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
 public abstract class AbstractSubscription implements Subscription {
@@ -31,25 +31,61 @@ public abstract class AbstractSubscription implements Subscription {
   private final Client client;
   private final ExecutionHandlerContext context;
 
-  AbstractSubscription(Client client, ExecutionHandlerContext context) {
+  // Two things have to happen before we are ready to publish:
+  // 1 - we need to make sure the subscriber has switched EventLoopGroups
+  // 2 - the response to the SUBSCRIBE command has been submitted to the client
+  private final CountDownLatch readyForPublish = new CountDownLatch(1);
+  private final Subscriptions subscriptions;
+  private boolean running = true;
+
+  AbstractSubscription(Client client, ExecutionHandlerContext context,
+      Subscriptions subscriptions) {
     if (client == null) {
       throw new IllegalArgumentException("client cannot be null");
     }
     if (context == null) {
       throw new IllegalArgumentException("context cannot be null");
     }
+    if (subscriptions == null) {
+      throw new IllegalArgumentException("subscriptions cannot be null");
+    }
     this.client = client;
     this.context = context;
+    this.subscriptions = subscriptions;
+  }
+
+  @Override
+  public void readyToPublish() {
+    readyForPublish.countDown();
   }
 
   @Override
   public void publishMessage(byte[] channel, byte[] message,
       PublishResultCollector publishResultCollector) {
-    ByteBuf messageByteBuffer = constructResponse(channel, message);
-    writeToChannel(messageByteBuffer, publishResultCollector);
+    try {
+      readyForPublish.await();
+    } catch (InterruptedException e) {
+      // we must be shutting down or registration failed
+      Thread.interrupted();
+      running = false;
+    }
+
+    if (running) {
+      writeToChannel(constructResponse(channel, message), publishResultCollector);
+    } else {
+      publishResultCollector.failure(client);
+    }
   }
 
-  Client getClient() {
+  @Override
+  public synchronized void shutdown() {
+    running = false;
+    subscriptions.remove(client);
+    // release any threads currently waiting to publish
+    readyToPublish();
+  }
+
+  public Client getClient() {
     return client;
   }
 
@@ -58,25 +94,17 @@ public abstract class AbstractSubscription implements Subscription {
     return this.client.equals(client);
   }
 
-  private ByteBuf constructResponse(byte[] channel, byte[] message) {
-    ByteBuf messageByteBuffer;
-    try {
-      messageByteBuffer = Coder.getArrayResponse(context.getByteBufAllocator(),
-          createResponse(channel, message));
-    } catch (CoderException e) {
-      logger.warn("Unable to encode publish message", e);
-      return null;
-    }
-    return messageByteBuffer;
+  private RedisResponse constructResponse(byte[] channel, byte[] message) {
+    return RedisResponse.array(createResponse(channel, message));
   }
 
   /**
-   * This method turns the response into a synchronous call. We want to determine if the response,
-   * to the client, resulted in an error - for example if the client has disconnected and the write
-   * fails. In such cases we need to be able to notify the caller.
+   * We want to determine if the response, to the client, resulted in an error - for example if
+   * the client has disconnected and the write fails. In such cases we need to be able to notify
+   * the caller.
    */
-  private void writeToChannel(ByteBuf messageByteBuffer, PublishResultCollector resultCollector) {
-    context.writeToChannel(messageByteBuffer)
+  private void writeToChannel(RedisResponse response, PublishResultCollector resultCollector) {
+    context.writeToChannel(response)
         .addListener((ChannelFutureListener) future -> {
           if (future.cause() == null) {
             resultCollector.success();
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
index 5d1b69d..8b1d8d1 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/ChannelSubscription.java
@@ -28,8 +28,9 @@ import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 class ChannelSubscription extends AbstractSubscription {
   private byte[] channel;
 
-  public ChannelSubscription(Client client, byte[] channel, ExecutionHandlerContext context) {
-    super(client, context);
+  public ChannelSubscription(Client client, byte[] channel, ExecutionHandlerContext context,
+      Subscriptions subscriptions) {
+    super(client, context, subscriptions);
 
     if (channel == null) {
       throw new IllegalArgumentException("channel cannot be null");
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
index 554946a..d197224 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PatternSubscription.java
@@ -29,8 +29,9 @@ import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 class PatternSubscription extends AbstractSubscription {
   final GlobPattern pattern;
 
-  public PatternSubscription(Client client, GlobPattern pattern, ExecutionHandlerContext context) {
-    super(client, context);
+  public PatternSubscription(Client client, GlobPattern pattern, ExecutionHandlerContext context,
+      Subscriptions subscriptions) {
+    super(client, context, subscriptions);
 
     if (pattern == null) {
       throw new IllegalArgumentException("pattern cannot be null");
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
index 659db58..7029b12 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
@@ -48,9 +48,9 @@ public interface PubSub {
    * @param channel to subscribe to
    * @param context ExecutionHandlerContext which will handle the client response
    * @param client a Client instance making the request
-   * @return the number of channels subscribed to
+   * @return the result of the subscribe
    */
-  long subscribe(byte[] channel, ExecutionHandlerContext context, Client client);
+  SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context, Client client);
 
   /**
    * Subscribe to a pattern
@@ -58,9 +58,9 @@ public interface PubSub {
    * @param pattern glob pattern to subscribe to
    * @param context ExecutionHandlerContext which will handle the client response
    * @param client a Client instance making the request
-   * @return the number of channels subscribed to
+   * @return the result of the subscribe
    */
-  long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client client);
+  SubscribeResult psubscribe(byte[] pattern, ExecutionHandlerContext context, Client client);
 
   /**
    * Unsubscribe a client from a channel
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
index edbf10d..6b9643f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
@@ -86,12 +86,13 @@ public class PubSubImpl implements PubSub {
   }
 
   @Override
-  public long subscribe(byte[] channel, ExecutionHandlerContext context, Client client) {
+  public SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context, Client client) {
     return subscriptions.subscribe(channel, context, client);
   }
 
   @Override
-  public long psubscribe(GlobPattern pattern, ExecutionHandlerContext context, Client client) {
+  public SubscribeResult psubscribe(byte[] pattern, ExecutionHandlerContext context,
+      Client client) {
     return subscriptions.psubscribe(pattern, context, client);
   }
 
@@ -142,7 +143,6 @@ public class PubSubImpl implements PubSub {
 
   @VisibleForTesting
   long publishMessageToSubscribers(byte[] channel, byte[] message) {
-
     List<Subscription> foundSubscriptions = subscriptions
         .findSubscriptions(channel);
     if (foundSubscriptions.isEmpty()) {
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriberWithLatch.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/SubscribeResult.java
similarity index 52%
rename from geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriberWithLatch.java
rename to geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/SubscribeResult.java
index a3a1476..ffe6a01 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/mocks/MockSubscriberWithLatch.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/SubscribeResult.java
@@ -14,34 +14,34 @@
  *
  */
 
-package org.apache.geode.redis.mocks;
+package org.apache.geode.redis.internal.pubsub;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
+public class SubscribeResult {
+  private final Subscription subscription;
+  private final long channelCount;
+  private final byte[] channel;
 
-import redis.clients.jedis.JedisPubSub;
-
-
-public class MockSubscriberWithLatch extends JedisPubSub {
-  private CountDownLatch latch;
-  private List<String> receivedMessages = new ArrayList<String>();
-
-  public MockSubscriberWithLatch(CountDownLatch latch) {
-    this.latch = latch;
+  public SubscribeResult(Subscription subscription, long channelCount, byte[] channel) {
+    this.subscription = subscription;
+    this.channelCount = channelCount;
+    this.channel = channel;
   }
 
-  public List<String> getReceivedMessages() {
-    return receivedMessages;
+  /**
+   * Returns the Subscription instance this subscribe operations created; possibly null.
+   */
+  public Subscription getSubscription() {
+    return subscription;
   }
 
-  @Override
-  public void onSubscribe(String channel, int subscribedChannels) {
-    latch.countDown();
+  /**
+   * returns the number of channels this subscribe operation subscribed to.
+   */
+  public long getChannelCount() {
+    return channelCount;
   }
 
-  @Override
-  public void onMessage(String channel, String message) {
-    receivedMessages.add(message);
+  public byte[] getChannel() {
+    return channel;
   }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
index f8f4656..ba9dccd 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscription.java
@@ -57,4 +57,11 @@ public interface Subscription {
    * pattern is returned.
    */
   byte[] getChannelName();
+
+  /**
+   * Called once this subscriber is ready to have publishMessage called
+   */
+  void readyToPublish();
+
+  void shutdown();
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
index 1392ab2..76b4048 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/Subscriptions.java
@@ -98,20 +98,29 @@ public class Subscriptions {
     return subscriptions.size();
   }
 
-  public synchronized long subscribe(byte[] channel, ExecutionHandlerContext context,
+  public synchronized SubscribeResult subscribe(byte[] channel, ExecutionHandlerContext context,
       Client client) {
+    Subscription createdSubscription = null;
     if (!exists(channel, client)) {
-      add(new ChannelSubscription(client, channel, context));
+      createdSubscription = new ChannelSubscription(client, channel, context, this);
+      add(createdSubscription);
     }
-    return findSubscriptions(client).size();
+    long channelCount = findSubscriptions(client).size();
+    return new SubscribeResult(createdSubscription, channelCount, channel);
   }
 
-  public synchronized long psubscribe(GlobPattern pattern, ExecutionHandlerContext context,
+  public SubscribeResult psubscribe(byte[] patternBytes, ExecutionHandlerContext context,
       Client client) {
-    if (!exists(pattern, client)) {
-      add(new PatternSubscription(client, pattern, context));
+    GlobPattern pattern = new GlobPattern(new String(patternBytes));
+    Subscription createdSubscription = null;
+    synchronized (this) {
+      if (!exists(pattern, client)) {
+        createdSubscription = new PatternSubscription(client, pattern, context, this);
+        add(createdSubscription);
+      }
+      long channelCount = findSubscriptions(client).size();
+      return new SubscribeResult(createdSubscription, channelCount, patternBytes);
     }
-    return findSubscriptions(client).size();
   }
 
   public synchronized long unsubscribe(Object channelOrPattern, Client client) {
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java
index 8136288..f38f0b7 100644
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/PubSubImplJUnitTest.java
@@ -22,8 +22,6 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.DefaultChannelPromise;
@@ -43,15 +41,14 @@ public class PubSubImplJUnitTest {
     ExecutionHandlerContext mockContext = mock(ExecutionHandlerContext.class);
 
     FailingChannelFuture mockFuture = new FailingChannelFuture();
-    when(mockContext.writeToChannel((ByteBuf) any())).thenReturn(mockFuture);
-    when(mockContext.getByteBufAllocator()).thenReturn(new PooledByteBufAllocator());
+    when(mockContext.writeToChannel(any())).thenReturn(mockFuture);
 
     Client deadClient = mock(Client.class);
     when(deadClient.isDead()).thenReturn(true);
 
     ChannelSubscription subscription =
-        spy(new ChannelSubscription(deadClient,
-            "sally".getBytes(), mockContext));
+        spy(new ChannelSubscription(deadClient, "sally".getBytes(), mockContext, subscriptions));
+    subscription.readyToPublish();
 
     subscriptions.add(subscription);
 
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java
index e72235a..9213f99 100644
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/pubsub/SubscriptionsJUnitTest.java
@@ -36,7 +36,8 @@ public class SubscriptionsJUnitTest {
     ExecutionHandlerContext context = mock(ExecutionHandlerContext.class);
     Client client = new Client(channel);
 
-    subscriptions.add(new ChannelSubscription(client, "subscriptions".getBytes(), context));
+    subscriptions
+        .add(new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions));
 
     assertThat(subscriptions.exists("subscriptions".getBytes(), client)).isTrue();
     assertThat(subscriptions.exists("unknown".getBytes(), client)).isFalse();
@@ -51,7 +52,7 @@ public class SubscriptionsJUnitTest {
     Client client = new Client(channel);
     GlobPattern pattern = new GlobPattern("sub*s");
 
-    subscriptions.add(new PatternSubscription(client, pattern, context));
+    subscriptions.add(new PatternSubscription(client, pattern, context, subscriptions));
 
     assertThat(subscriptions.exists(pattern, client)).isTrue();
   }
@@ -66,7 +67,8 @@ public class SubscriptionsJUnitTest {
     GlobPattern globPattern1 = new GlobPattern("sub*s");
     GlobPattern globPattern2 = new GlobPattern("subscriptions");
 
-    subscriptions.add(new ChannelSubscription(client, "subscriptions".getBytes(), context));
+    subscriptions
+        .add(new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions));
 
     assertThat(subscriptions.exists(globPattern1, client)).isFalse();
     assertThat(subscriptions.exists(globPattern2, client)).isFalse();
@@ -81,8 +83,9 @@ public class SubscriptionsJUnitTest {
     Client client = new Client(channel);
     GlobPattern globby = new GlobPattern("sub*s");
 
-    subscriptions.add(new ChannelSubscription(client, "subscriptions".getBytes(), context));
-    subscriptions.add(new PatternSubscription(client, globby, context));
+    subscriptions
+        .add(new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions));
+    subscriptions.add(new PatternSubscription(client, globby, context, subscriptions));
 
     assertThat(subscriptions.exists(globby, client)).isTrue();
     assertThat(subscriptions.exists("subscriptions".getBytes(), client)).isTrue();
@@ -110,9 +113,9 @@ public class SubscriptionsJUnitTest {
     Client clientTwo = new Client(mockChannelTwo);
 
     ChannelSubscription subscriptionOne =
-        new ChannelSubscription(clientOne, "subscriptions".getBytes(), context);
+        new ChannelSubscription(clientOne, "subscriptions".getBytes(), context, subscriptions);
     ChannelSubscription subscriptionTwo =
-        new ChannelSubscription(clientTwo, "monkeys".getBytes(), context);
+        new ChannelSubscription(clientTwo, "monkeys".getBytes(), context, subscriptions);
 
     subscriptions.add(subscriptionOne);
     subscriptions.add(subscriptionTwo);
@@ -130,9 +133,9 @@ public class SubscriptionsJUnitTest {
     Client clientTwo = new Client(mockChannelTwo);
 
     ChannelSubscription subscriptionOne =
-        new ChannelSubscription(clientOne, "subscriptions".getBytes(), context);
+        new ChannelSubscription(clientOne, "subscriptions".getBytes(), context, subscriptions);
     ChannelSubscription subscriptionTwo =
-        new ChannelSubscription(clientTwo, "monkeys".getBytes(), context);
+        new ChannelSubscription(clientTwo, "monkeys".getBytes(), context, subscriptions);
 
     subscriptions.add(subscriptionOne);
     subscriptions.add(subscriptionTwo);
@@ -153,12 +156,12 @@ public class SubscriptionsJUnitTest {
     Client client = new Client(mockChannelOne);
 
     ChannelSubscription channelSubscriberOne =
-        new ChannelSubscription(client, "subscriptions".getBytes(), context);
+        new ChannelSubscription(client, "subscriptions".getBytes(), context, subscriptions);
     GlobPattern pattern = new GlobPattern("monkeys");
     PatternSubscription patternSubscriber = new PatternSubscription(client,
-        pattern, context);
+        pattern, context, subscriptions);
     ChannelSubscription channelSubscriberTwo =
-        new ChannelSubscription(client, "monkeys".getBytes(), context);
+        new ChannelSubscription(client, "monkeys".getBytes(), context, subscriptions);
 
     subscriptions.add(channelSubscriberOne);
     subscriptions.add(patternSubscriber);