You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by he...@apache.org on 2022/02/04 19:44:37 UTC

[geode-benchmarks] branch develop updated: Geode for Redis PubSub Benchmarks (#162)

This is an automated email from the ASF dual-hosted git repository.

heybales pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode-benchmarks.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4ee8f96  Geode for Redis PubSub Benchmarks (#162)
4ee8f96 is described below

commit 4ee8f965f6c28e9a4005eeeebf403503c505e0cf
Author: Eric Zoerner <zo...@vmware.com>
AuthorDate: Fri Feb 4 11:44:28 2022 -0800

    Geode for Redis PubSub Benchmarks (#162)
    
    The new benchmarks are currently turned off due to high variation in average latency which would fail the CI threshold. We will revisit this in the future. For other uses these benchmarks can provide throughput data.
---
 geode-benchmarks/build.gradle                      |   2 +
 .../benchmark/redis/tasks/JedisClientManager.java  |  72 ++++++-
 .../redis/tasks/LettuceClientManager.java          |  26 ++-
 .../redis/tasks/LettucePubSubClientManager.java    | 216 +++++++++++++++++++++
 .../benchmark/redis/tasks/PublishRedisTask.java    |  74 +++++++
 .../geode/benchmark/redis/tasks/RedisClient.java   |  30 +++
 .../benchmark/redis/tasks/StopPubSubRedisTask.java |  36 ++++
 .../benchmark/redis/tasks/SubscribeRedisTask.java  | 188 ++++++++++++++++++
 .../redis/tests/PubSubBenchmarkConfiguration.java  | 135 +++++++++++++
 .../tests/PubSubLargeBenchmarkConfiguration.java   |  81 ++++++++
 .../tests/PubSubSmallBenchmarkConfiguration.java   |  80 ++++++++
 .../benchmark/redis/tests/RedisBenchmark.java      |  31 +--
 .../redis/tests/RedisPubSubLargeBenchmark.java     |  35 ++++
 .../tests/RedisPubSubPatternLargeBenchmark.java    |  34 ++++
 .../tests/RedisPubSubPatternSmallBenchmark.java    |  34 ++++
 .../redis/tests/RedisPubSubSmallBenchmark.java     |  35 ++++
 .../geode/benchmark/tasks/WeightedTasksTest.java   |   2 +-
 gradle/dependency-versions.properties              |   1 +
 18 files changed, 1089 insertions(+), 23 deletions(-)

diff --git a/geode-benchmarks/build.gradle b/geode-benchmarks/build.gradle
index 73db68c..53280be 100644
--- a/geode-benchmarks/build.gradle
+++ b/geode-benchmarks/build.gradle
@@ -80,6 +80,8 @@ dependencies {
   implementation(group: 'io.lettuce', name: 'lettuce-core', version: project.'lettuce.version') {
     exclude group: 'io.netty'
   }
+  implementation(group: 'io.vavr', name: 'vavr', version: project.'vavr.version')
+
 
   if (VersionNumber.parse(geodeVersion) >= VersionNumber.parse("1.15.0.+")) {
     runtimeOnly(group: 'org.apache.geode', name: 'geode-for-redis')
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/JedisClientManager.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/JedisClientManager.java
index a4d6ef4..9cb7be5 100644
--- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/JedisClientManager.java
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/JedisClientManager.java
@@ -24,12 +24,16 @@ import java.util.HashSet;
 import java.util.Set;
 import java.util.stream.Collectors;
 
+import io.vavr.Function3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import redis.clients.jedis.HostAndPort;
 import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.JedisPubSub;
+
+import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
 
 public final class JedisClientManager implements RedisClientManager {
   private static final Logger logger = LoggerFactory.getLogger(RedisClientManager.class);
@@ -58,28 +62,67 @@ public final class JedisClientManager implements RedisClientManager {
     }
 
     @Override
-    public long zadd(String key, double score, String value) {
+    public long zadd(final String key, final double score, final String value) {
       return jedisCluster.zadd(key, score, value);
     }
 
     @Override
-    public long zrem(String key, String value) {
+    public long zrem(final String key, final String value) {
       return jedisCluster.zrem(key, value);
     }
 
     @Override
-    public Set<String> zrange(String key, long start, long stop) {
+    public Set<String> zrange(final String key, final long start, final long stop) {
       return jedisCluster.zrange(key, start, stop);
     }
 
     @Override
-    public Set<String> zrangeByScore(String key, long start, long stop) {
+    public Set<String> zrangeByScore(final String key, final long start, final long stop) {
       return jedisCluster.zrangeByScore(key, start, stop);
     }
 
     @Override
+    public SubscriptionListener createSubscriptionListener(
+        final PubSubBenchmarkConfiguration pubSubConfig,
+        final Function3<String, String, Unsubscriber, Void> channelMessageConsumer) {
+      return new JedisSubscriptionListener(new JedisPubSub() {
+        @Override
+        public void onPMessage(final String pattern, final String channel, final String message) {
+          super.onPMessage(pattern, channel, message);
+          final Unsubscriber unsubscriber =
+              channels -> punsubscribe(channels.toArray(new String[] {}));
+          channelMessageConsumer.apply(channel, message, unsubscriber);
+        }
+
+        @Override
+        public void onMessage(final String channel, final String message) {
+          super.onMessage(channel, message);
+          final Unsubscriber unsubscriber =
+              channels -> unsubscribe(channels.toArray(new String[] {}));
+          channelMessageConsumer.apply(channel, message, unsubscriber);
+        }
+      });
+    }
+
+    @Override
+    public void subscribe(final SubscriptionListener listener, final String... channels) {
+      jedisCluster.subscribe(((JedisSubscriptionListener) listener).getJedisPubSub(), channels);
+    }
+
+    @Override
+    public void psubscribe(final SubscriptionListener listener, final String... channelPatterns) {
+      jedisCluster.psubscribe(((JedisSubscriptionListener) listener).getJedisPubSub(),
+          channelPatterns);
+    }
+
+    @Override
+    public void publish(final String channel, final String message) {
+      jedisCluster.publish(channel, message);
+    }
+
+    @Override
     public void flushdb() {
-      Set<String> seen = new HashSet<>();
+      final Set<String> seen = new HashSet<>();
       for (int i = 0; i < HASHSLOTS; ++i) {
         try (final Jedis connectionFromSlot = jedisCluster.getConnectionFromSlot(i)) {
           if (seen.add(connectionFromSlot.getClient().getHost())) {
@@ -104,7 +147,7 @@ public final class JedisClientManager implements RedisClientManager {
     poolConfig.setLifo(false);
     final JedisCluster jedisCluster = new JedisCluster(nodes, Integer.MAX_VALUE, poolConfig);
 
-    long start = System.nanoTime();
+    final long start = System.nanoTime();
     while (true) {
       try (final Jedis jedis = jedisCluster.getConnectionFromSlot(0)) {
         logger.info("Waiting for cluster to come up.");
@@ -113,13 +156,13 @@ public final class JedisClientManager implements RedisClientManager {
           break;
         }
         logger.debug(clusterInfo);
-      } catch (Exception e) {
+      } catch (final Exception e) {
         if (System.nanoTime() - start > CONNECT_TIMEOUT.toNanos()) {
           throw e;
         }
         try {
           Thread.sleep(50);
-        } catch (InterruptedException interruptedException) {
+        } catch (final InterruptedException interruptedException) {
           throw new RuntimeException(e);
         }
         logger.info("Failed connecting.", e);
@@ -142,4 +185,17 @@ public final class JedisClientManager implements RedisClientManager {
 
     return redisClient;
   }
+
+  static class JedisSubscriptionListener implements RedisClient.SubscriptionListener {
+    private final JedisPubSub jedisPubSub;
+
+    public JedisSubscriptionListener(final JedisPubSub jedisPubSub) {
+      this.jedisPubSub = jedisPubSub;
+    }
+
+    JedisPubSub getJedisPubSub() {
+      return jedisPubSub;
+    }
+  }
+
 }
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/LettuceClientManager.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/LettuceClientManager.java
index 012220a..c917873 100644
--- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/LettuceClientManager.java
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/LettuceClientManager.java
@@ -30,9 +30,12 @@ import io.lettuce.core.RedisURI;
 import io.lettuce.core.cluster.RedisClusterClient;
 import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
 import io.lettuce.core.cluster.api.sync.RedisAdvancedClusterCommands;
+import io.vavr.Function3;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
+
 public final class LettuceClientManager implements RedisClientManager {
   private static final Logger logger = LoggerFactory.getLogger(LettuceClientManager.class);
 
@@ -48,7 +51,6 @@ public final class LettuceClientManager implements RedisClientManager {
         return redisClusterConnection.sync();
       });
 
-
   private static final RedisClient redisClient = new RedisClient() {
     @Override
     public String get(final String key) {
@@ -92,6 +94,28 @@ public final class LettuceClientManager implements RedisClientManager {
     }
 
     @Override
+    public SubscriptionListener createSubscriptionListener(
+        final PubSubBenchmarkConfiguration pubSubConfig,
+        final Function3<String, String, Unsubscriber, Void> channelMessageConsumer) {
+      throw new UnsupportedOperationException("not a pubsub client");
+    }
+
+    @Override
+    public void subscribe(final SubscriptionListener listener, final String... channels) {
+      throw new UnsupportedOperationException("not a pubsub client");
+    }
+
+    @Override
+    public void psubscribe(final SubscriptionListener listener, final String... channelPatterns) {
+      throw new UnsupportedOperationException("not a pubsub client");
+    }
+
+    @Override
+    public void publish(final String channel, final String message) {
+      throw new UnsupportedOperationException("not a pubsub client");
+    }
+
+    @Override
     public void flushdb() {
       redisAdvancedClusterCommands.get().flushdb();
     }
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/LettucePubSubClientManager.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/LettucePubSubClientManager.java
new file mode 100644
index 0000000..fc1536c
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/LettucePubSubClientManager.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.benchmark.redis.tasks;
+
+import static java.lang.Thread.currentThread;
+
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import io.lettuce.core.Range;
+import io.lettuce.core.RedisURI;
+import io.lettuce.core.cluster.RedisClusterClient;
+import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
+import io.lettuce.core.cluster.pubsub.api.sync.RedisClusterPubSubCommands;
+import io.lettuce.core.pubsub.RedisPubSubAdapter;
+import io.lettuce.core.pubsub.RedisPubSubListener;
+import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
+import io.vavr.Function3;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
+
+public final class LettucePubSubClientManager implements RedisClientManager {
+  private static final Logger logger = LoggerFactory.getLogger(LettucePubSubClientManager.class);
+
+  private static RedisClusterClient redisClusterClient;
+
+  private static final ThreadLocal<RedisClusterPubSubCommands<String, String>> redisClusterCommands =
+      ThreadLocal.withInitial(() -> {
+        logger.info("Setup for thread {}", Thread.currentThread().getId());
+
+        final StatefulRedisClusterPubSubConnection<String, String> redisClusterPubSubConnection =
+            redisClusterClient.connectPubSub();
+        return redisClusterPubSubConnection.sync();
+      });
+
+  private static final RedisClient redisClient = new RedisClient() {
+    @Override
+    public String get(final String key) {
+      return LettucePubSubClientManager.redisClusterCommands.get().get(key);
+    }
+
+    @Override
+    public String set(final String key, final String value) {
+      return LettucePubSubClientManager.redisClusterCommands.get().set(key, value);
+    }
+
+    @Override
+    public String hget(final String key, final String field) {
+      return LettucePubSubClientManager.redisClusterCommands.get().hget(key, field);
+    }
+
+    @Override
+    public boolean hset(final String key, final String field, final String value) {
+      return LettucePubSubClientManager.redisClusterCommands.get().hset(key, field, value);
+    }
+
+    @Override
+    public long zadd(String key, double score, String value) {
+      return LettucePubSubClientManager.redisClusterCommands.get().zadd(key, score, value);
+    }
+
+    @Override
+    public long zrem(String key, String value) {
+      return LettucePubSubClientManager.redisClusterCommands.get().zrem(key, value);
+    }
+
+    @Override
+    public Set<String> zrange(String key, long start, long stop) {
+      return new HashSet<>(
+          LettucePubSubClientManager.redisClusterCommands.get().zrange(key, start, stop));
+    }
+
+    @Override
+    public Set<String> zrangeByScore(String key, long start, long stop) {
+      return new HashSet<>(
+          LettucePubSubClientManager.redisClusterCommands.get().zrangebyscore(key,
+              Range.create(start, stop)));
+    }
+
+    @Override
+    public SubscriptionListener createSubscriptionListener(
+        final PubSubBenchmarkConfiguration pubSubConfig,
+        final Function3<String, String, Unsubscriber, Void> channelMessageConsumer) {
+      return new LettuceSubscriptionListener(new RedisPubSubAdapter<String, String>() {
+        @Override
+        public void message(final String pattern, final String channel, final String message) {
+          super.message(pattern, channel, message);
+          final Unsubscriber unsubscriber =
+              channels -> LettucePubSubClientManager.redisClusterCommands.get()
+                  .punsubscribe(channels.toArray(new String[] {}));
+          channelMessageConsumer.apply(channel, message, unsubscriber);
+        }
+
+        @Override
+        public void message(final String channel, final String message) {
+          super.message(channel, message);
+          final Unsubscriber unsubscriber =
+              channels -> LettucePubSubClientManager.redisClusterCommands.get()
+                  .unsubscribe(channels.toArray(new String[] {}));
+          channelMessageConsumer.apply(channel, message, unsubscriber);
+        }
+      });
+    }
+
+    @Override
+    public void subscribe(final SubscriptionListener listener, final String... channels) {
+      final StatefulRedisPubSubConnection<String, String> connection =
+          LettucePubSubClientManager.redisClusterCommands.get().getStatefulConnection();
+
+      connection.addListener(((LettuceSubscriptionListener) listener).getListener());
+      LettucePubSubClientManager.redisClusterCommands.get().subscribe(channels);
+    }
+
+    @Override
+    public void psubscribe(final SubscriptionListener listener, final String... channels) {
+      final StatefulRedisPubSubConnection<String, String> connection =
+          LettucePubSubClientManager.redisClusterCommands.get().getStatefulConnection();
+
+      connection.addListener(((LettuceSubscriptionListener) listener).getListener());
+      LettucePubSubClientManager.redisClusterCommands.get().psubscribe(channels);
+    }
+
+    @Override
+    public void publish(String channel, String message) {
+      LettucePubSubClientManager.redisClusterCommands.get().publish(channel, message);
+    }
+
+    @Override
+    public void flushdb() {
+      redisClusterCommands.get().flushdb();
+    }
+  };
+
+  @Override
+  public void connect(final Collection<InetSocketAddress> servers) {
+    logger.info("Connect RedisClient on thread {}.", currentThread());
+
+    final List<RedisURI> nodes = servers.stream()
+        .map(i -> RedisURI.create(i.getHostString(), i.getPort())).collect(Collectors.toList());
+
+    final RedisClusterClient redisClusterClient = RedisClusterClient.create(nodes);
+
+    long start = System.nanoTime();
+    while (true) {
+      try (final StatefulRedisClusterPubSubConnection<String, String> connection =
+          redisClusterClient.connectPubSub()) {
+        logger.info("Waiting for cluster to come up.");
+        final String clusterInfo = connection.sync().clusterInfo();
+        if (clusterInfo.contains("cluster_state:ok")) {
+          break;
+        }
+        logger.debug(clusterInfo);
+      } catch (Exception e) {
+        if (System.nanoTime() - start > CONNECT_TIMEOUT.toNanos()) {
+          throw e;
+        }
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException interruptedException) {
+          throw new RuntimeException(e);
+        }
+        logger.info("Failed connecting.", e);
+      }
+    }
+
+    redisClusterClient.refreshPartitions();
+
+    LettucePubSubClientManager.redisClusterClient = redisClusterClient;
+  }
+
+  @Override
+  public void close() {
+    logger.info("Close RedisClient on thread {}.", currentThread());
+
+    redisClusterClient.shutdown();
+  }
+
+  @Override
+  public RedisClient get() {
+    logger.info("Getting RedisClient on thread {}.", currentThread());
+
+    return redisClient;
+  }
+
+  static class LettuceSubscriptionListener implements RedisClient.SubscriptionListener {
+    private final RedisPubSubListener<String, String> listener;
+
+    public LettuceSubscriptionListener(
+        RedisPubSubListener<String, String> listener) {
+      this.listener = listener;
+    }
+
+    RedisPubSubListener<String, String> getListener() {
+      return listener;
+    }
+  }
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/PublishRedisTask.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/PublishRedisTask.java
new file mode 100644
index 0000000..f9b977f
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/PublishRedisTask.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.benchmark.redis.tasks;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yardstickframework.BenchmarkDriverAdapter;
+
+import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
+
+public class PublishRedisTask extends BenchmarkDriverAdapter implements Serializable {
+  private static final Logger logger = LoggerFactory.getLogger(PublishRedisTask.class);
+  private final PubSubBenchmarkConfiguration pubSubConfig;
+  private final RedisClientManager publisherClientManager;
+
+  public PublishRedisTask(final PubSubBenchmarkConfiguration pubSubConfig,
+      final RedisClientManager publisherClientManager) {
+    this.pubSubConfig = pubSubConfig;
+    this.publisherClientManager = publisherClientManager;
+    logger.info("Initialized: PublishRedisTask");
+  }
+
+  @Override
+  public boolean test(final Map<Object, Object> ctx) throws Exception {
+    final CyclicBarrier barrier = pubSubConfig.getCyclicBarrier();
+    final RedisClient redisClient = publisherClientManager.get();
+
+    for (final String channel : pubSubConfig.getBenchmarkPublishChannels()) {
+      for (int i = 0; i < pubSubConfig.getNumMessagesPerChannelOperation(); i++) {
+        final String message = Strings.repeat(String.valueOf((char) ('A' + i)),
+            pubSubConfig.getMessageLength());
+        redisClient.publish(channel, message);
+      }
+    }
+
+    // waits for all subscribers to receive all messages, then barrier is reset automatically
+    // for the next test iteration; fails test if times out
+    barrier.await(10, TimeUnit.SECONDS);
+    return true;
+  }
+
+  @Override
+  public void tearDown() throws Exception {
+    super.tearDown();
+    logger.info("PublishRedisTask: Sending END message");
+    assertThat(pubSubConfig.getCyclicBarrier().getNumberWaiting()).isZero();
+    assertThat(pubSubConfig.getCyclicBarrier().isBroken()).isFalse();
+    publisherClientManager.get().publish(pubSubConfig.getControlChannel(),
+        pubSubConfig.getEndMessage());
+  }
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/RedisClient.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/RedisClient.java
index cf796f3..0cf7e74 100644
--- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/RedisClient.java
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/RedisClient.java
@@ -15,8 +15,13 @@
 
 package org.apache.geode.benchmark.redis.tasks;
 
+import java.util.List;
 import java.util.Set;
 
+import io.vavr.Function3;
+
+import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
+
 public interface RedisClient {
   String get(String key);
 
@@ -35,4 +40,29 @@ public interface RedisClient {
   Set<String> zrange(String key, long start, long stop);
 
   Set<String> zrangeByScore(String key, long start, long stop);
+
+  /**
+   * Create a subscription listener.
+   *
+   * @param channelMessageConsumer a function that accepts the channel, the message, and
+   *        a consumer that will unsubscribe the listener from the list of channels
+   *        passed in.
+   * @return the subscription listener
+   */
+  SubscriptionListener createSubscriptionListener(PubSubBenchmarkConfiguration pubSubConfig,
+      Function3<String, String, Unsubscriber, Void> channelMessageConsumer);
+
+  void subscribe(SubscriptionListener control, String... channels);
+
+  void psubscribe(SubscriptionListener control, String... channelPatterns);
+
+  void publish(String channel, String message);
+
+  interface SubscriptionListener {
+  }
+
+  @FunctionalInterface
+  interface Unsubscriber {
+    void unsubscribe(List<String> channels);
+  }
 }
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/StopPubSubRedisTask.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/StopPubSubRedisTask.java
new file mode 100644
index 0000000..94caa7c
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/StopPubSubRedisTask.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.benchmark.redis.tasks;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.geode.perftest.Task;
+import org.apache.geode.perftest.TestContext;
+
+public class StopPubSubRedisTask implements Task {
+  private static final Logger logger = LoggerFactory.getLogger(StopPubSubRedisTask.class);
+
+  public StopPubSubRedisTask() {
+    logger.info("Initialized: PubSubEndRedisTask");
+  }
+
+  @Override
+  public void run(final TestContext context) throws Exception {
+    SubscribeRedisTask.shutdown(context);
+  }
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/SubscribeRedisTask.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/SubscribeRedisTask.java
new file mode 100644
index 0000000..6cda0d5
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tasks/SubscribeRedisTask.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.benchmark.redis.tasks;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.geode.benchmark.redis.tests.PubSubBenchmarkConfiguration;
+import org.apache.geode.perftest.Task;
+import org.apache.geode.perftest.TestContext;
+
+public class SubscribeRedisTask implements Task {
+  private static final Logger logger = LoggerFactory.getLogger(SubscribeRedisTask.class);
+
+  // TextContext keys for shared objects between the SubscribeTask (before) and
+  // the PubSubEndTask (after)
+  private static final String SUBSCRIBERS_CONTEXT_KEY = "subscribers";
+  private static final String SUBSCRIBERS_THREAD_POOL = "threadPool";
+
+  private final List<RedisClientManager> subscriberClientManagers;
+  private final boolean validate;
+  private final PubSubBenchmarkConfiguration pubSubConfig;
+
+  public SubscribeRedisTask(final PubSubBenchmarkConfiguration pubSubConfig,
+      final List<RedisClientManager> subscriberClientManagers,
+      final boolean validate) {
+    this.pubSubConfig = pubSubConfig;
+    logger.info(
+        "Initialized: SubscribeRedisTask numChannels={}, numMessagesPerChannel={}, messageLength={}, validate={}, useChannelPattern={}",
+        pubSubConfig.getNumChannels(), pubSubConfig.getNumMessagesPerChannelOperation(),
+        pubSubConfig.getMessageLength(), validate, pubSubConfig.shouldUseChannelPattern());
+    this.subscriberClientManagers = subscriberClientManagers;
+    this.validate = validate;
+  }
+
+  @Override
+  public void run(final TestContext context) throws Exception {
+    final CyclicBarrier barrier = pubSubConfig.getCyclicBarrier();
+
+    // save subscribers in the TestContext, as this will be shared with
+    // the after tasks which will call shutdown()
+    final List<Subscriber> subscribers = subscriberClientManagers.stream()
+        .map(cm -> new Subscriber(cm.get(), barrier, context))
+        .collect(Collectors.toList());
+    context.setAttribute(SUBSCRIBERS_CONTEXT_KEY, subscribers);
+
+    // save thread pool in TestContext, so it can be shutdown cleanly after
+    final ExecutorService subscriberThreadPool =
+        Executors.newFixedThreadPool(subscriberClientManagers.size());
+    context.setAttribute(SUBSCRIBERS_THREAD_POOL, subscriberThreadPool);
+
+    for (final Subscriber subscriber : subscribers) {
+      subscriber.subscribeAsync(subscriberThreadPool, context);
+    }
+
+    // sleep to try to make sure subscribe is complete before continuing
+    Thread.sleep(1000);
+  }
+
+  public static void shutdown(final TestContext cxt) throws Exception {
+    // precondition: method run has been previously executed in this Worker
+    // and therefore subscribers and threadPool are available
+    @SuppressWarnings("unchecked")
+    final List<Subscriber> subscribers =
+        (List<Subscriber>) cxt.getAttribute(SUBSCRIBERS_CONTEXT_KEY);
+
+    for (final SubscribeRedisTask.Subscriber subscriber : subscribers) {
+      subscriber.waitForCompletion(cxt);
+    }
+
+    logger.info("Shutting down thread pool…");
+
+    final ExecutorService threadPool = (ExecutorService) cxt.getAttribute(SUBSCRIBERS_THREAD_POOL);
+    threadPool.shutdownNow();
+    // noinspection ResultOfMethodCallIgnored
+    threadPool.awaitTermination(5, TimeUnit.MINUTES);
+
+    logger.info("Thread pool terminated");
+  }
+
+  public class Subscriber {
+    private final AtomicInteger messagesReceived;
+    private final int numMessagesExpected;
+    private final RedisClient client;
+    private final RedisClient.SubscriptionListener listener;
+    private CompletableFuture<Void> future;
+
+    Subscriber(final RedisClient client, final CyclicBarrier barrier, final TestContext context) {
+      this.messagesReceived = new AtomicInteger(0);
+      this.client = client;
+
+      numMessagesExpected =
+          pubSubConfig.getNumChannels() * pubSubConfig.getNumMessagesPerChannelOperation();
+
+      listener = client.createSubscriptionListener(pubSubConfig,
+          (String channel, String message, RedisClient.Unsubscriber unsubscriber) -> {
+            if (channel.equals(pubSubConfig.getControlChannel())) {
+              if (message.equals(pubSubConfig.getEndMessage())) {
+                unsubscriber.unsubscribe(pubSubConfig.getAllSubscribeChannels());
+                logger.info("Subscriber thread unsubscribed.");
+              } else {
+                throw new AssertionError("Unrecognized control message: " + message);
+              }
+            } else if (receiveMessageAndIsComplete(channel, message, context)) {
+              try {
+                reset();
+                barrier.await(10, TimeUnit.SECONDS);
+              } catch (final TimeoutException e) {
+                throw new RuntimeException("Subscriber timed out while waiting on barrier");
+              } catch (final InterruptedException | BrokenBarrierException ignored) {
+              }
+            }
+            return null;
+          });
+    }
+
+    public void subscribeAsync(final ExecutorService threadPool, final TestContext context) {
+      future = CompletableFuture.runAsync(
+          () -> {
+            final List<String> channels = pubSubConfig.getAllSubscribeChannels();
+            if (pubSubConfig.shouldUseChannelPattern()) {
+              context.logProgress("Subscribing to channel patterns " + channels);
+              client.psubscribe(listener, channels.toArray(new String[] {}));
+            } else {
+              context.logProgress("Subscribing to channels " + channels);
+              client.subscribe(listener, channels.toArray(new String[] {}));
+            }
+          }, threadPool);
+      future.whenComplete((result, ex) -> {
+        logger.info("Subscriber thread completed");
+        if (ex != null) {
+          ex.printStackTrace();
+          context.logProgress(String.format("Subscriber completed with exception '%s')", ex));
+        }
+      });
+    }
+
+    public void waitForCompletion(final TestContext ctx) throws Exception {
+      if (future == null) {
+        return;
+      }
+      assertThat(future.get(10, TimeUnit.SECONDS)).isNull();
+    }
+
+    // Receive a message and return true if all messages have been received
+    private boolean receiveMessageAndIsComplete(final String channel, final String message,
+        final TestContext context) {
+      if (validate) {
+        context.logProgress(String.format(
+            "Received message %s of length %d on channel %s; messagesReceived=%d; messagesExpected=%d",
+            message, message.length(), channel, messagesReceived.get() + 1, numMessagesExpected));
+        assertThat(message.length()).isEqualTo(pubSubConfig.getMessageLength());
+      }
+      return messagesReceived.incrementAndGet() >= numMessagesExpected;
+    }
+
+    private void reset() {
+      messagesReceived.set(0);
+    }
+  }
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubBenchmarkConfiguration.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubBenchmarkConfiguration.java
new file mode 100644
index 0000000..67b6388
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubBenchmarkConfiguration.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.benchmark.redis.tests;
+
+import static java.util.concurrent.TimeUnit.MINUTES;
+import static org.apache.geode.benchmark.Config.after;
+import static org.apache.geode.benchmark.Config.before;
+import static org.apache.geode.benchmark.Config.workload;
+import static org.apache.geode.benchmark.redis.tests.RedisBenchmark.RedisClusterImplementation.Manual;
+import static org.apache.geode.benchmark.topology.Roles.CLIENT;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CyclicBarrier;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import org.apache.geode.benchmark.redis.tasks.FlushDbTask;
+import org.apache.geode.benchmark.redis.tasks.JedisClientManager;
+import org.apache.geode.benchmark.redis.tasks.LettucePubSubClientManager;
+import org.apache.geode.benchmark.redis.tasks.PublishRedisTask;
+import org.apache.geode.benchmark.redis.tasks.RedisClientManager;
+import org.apache.geode.benchmark.redis.tasks.StartRedisClient;
+import org.apache.geode.benchmark.redis.tasks.StopPubSubRedisTask;
+import org.apache.geode.benchmark.redis.tasks.StopRedisClient;
+import org.apache.geode.benchmark.redis.tasks.SubscribeRedisTask;
+import org.apache.geode.perftest.TestConfig;
+
+public abstract class PubSubBenchmarkConfiguration implements Serializable {
+
+  public static final long DURATION_SECONDS = MINUTES.toSeconds(10);
+
+  public abstract CyclicBarrier getCyclicBarrier();
+
+  public abstract int getNumSubscribers();
+
+  public abstract int getNumChannels();
+
+  public abstract int getNumMessagesPerChannelOperation();
+
+  public abstract int getMessageLength();
+
+  public abstract String getControlChannel();
+
+  public abstract String getEndMessage();
+
+  public abstract boolean shouldUseChannelPattern();
+
+  public List<String> getBenchmarkSubscribeChannels() {
+    return shouldUseChannelPattern() ? Collections.singletonList("channel*")
+        : getBenchmarkPublishChannels();
+  }
+
+  public List<String> getBenchmarkPublishChannels() {
+    return IntStream.range(0, getNumChannels()).mapToObj(n -> "channel" + n)
+        .collect(Collectors.toList());
+  }
+
+  /** Return list of all channels for subscribing including the control channel. */
+  public List<String> getAllSubscribeChannels() {
+    return Stream.concat(getBenchmarkSubscribeChannels().stream(),
+        Stream.of(getControlChannel())).collect(Collectors.toList());
+  }
+
+  public void configurePubSubTest(final RedisBenchmark benchmark,
+      final TestConfig config) {
+
+    benchmark.configureClusterTopology(config);
+
+    // By design this benchmark is run with a single publisher,
+    // the subscriber threads are configured separately
+    config.threads(1);
+
+    // Run twice as long due to longer operations than typical benchmarks
+    config.durationSeconds(DURATION_SECONDS);
+
+    final Supplier<RedisClientManager> clientManagerSupplier;
+    switch (benchmark.getRedisClientImplementation()) {
+      case Jedis:
+        clientManagerSupplier = JedisClientManager::new;
+        break;
+      case Lettuce:
+        clientManagerSupplier = LettucePubSubClientManager::new;
+        break;
+      default:
+        throw new AssertionError("unexpected RedisClientImplementation");
+    }
+
+    // client manager for publisher
+    benchmark.redisClientManager = clientManagerSupplier.get();
+
+    // client managers for subscribers
+    final List<RedisClientManager> subscriberClients =
+        Stream.generate(clientManagerSupplier).limit(getNumSubscribers())
+            .collect(Collectors.toList());
+
+
+    before(config, new StartRedisClient(benchmark.redisClientManager), CLIENT);
+
+    before(config,
+        new SubscribeRedisTask(this, subscriberClients,
+            benchmark.isValidationEnabled()),
+        CLIENT);
+
+    if (Manual == benchmark.getRedisClusterImplementation()) {
+      before(config, new FlushDbTask(benchmark.redisClientManager), CLIENT);
+    }
+
+    workload(config,
+        new PublishRedisTask(this, benchmark.redisClientManager),
+        CLIENT);
+
+    after(config, new StopPubSubRedisTask(), CLIENT);
+
+    after(config, new StopRedisClient(benchmark.redisClientManager), CLIENT);
+    subscriberClients.forEach(c -> after(config, new StopRedisClient(c), CLIENT));
+  }
+
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubLargeBenchmarkConfiguration.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubLargeBenchmarkConfiguration.java
new file mode 100644
index 0000000..54df524
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubLargeBenchmarkConfiguration.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.benchmark.redis.tests;
+
+import java.util.concurrent.CyclicBarrier;
+
+public class PubSubLargeBenchmarkConfiguration extends PubSubBenchmarkConfiguration {
+
+  private static final int NUM_SUBSCRIBERS = 10;
+  private static final int NUM_CHANNELS = 5;
+  private static final int NUM_MESSAGES_PER_CHANNEL_OPERATION = 40000;
+  private static final int MESSAGE_LENGTH = 500;
+  private static final String CONTROL_CHANNEL = "__control__";
+  private static final String END_MESSAGE = "END";
+
+  private static final CyclicBarrier BARRIER = new CyclicBarrier(NUM_SUBSCRIBERS + 1);
+
+  private final boolean shouldUseChannelPattern;
+
+  public PubSubLargeBenchmarkConfiguration() {
+    this(false);
+  }
+
+  public PubSubLargeBenchmarkConfiguration(final boolean shouldUseChannelPattern) {
+    this.shouldUseChannelPattern = shouldUseChannelPattern;
+  }
+
+  @Override
+  public boolean shouldUseChannelPattern() {
+    return shouldUseChannelPattern;
+  }
+
+  @Override
+  public String getEndMessage() {
+    return END_MESSAGE;
+  }
+
+  @Override
+  public CyclicBarrier getCyclicBarrier() {
+    return BARRIER;
+  }
+
+  @Override
+  public int getNumSubscribers() {
+    return NUM_SUBSCRIBERS;
+  }
+
+  @Override
+  public int getNumChannels() {
+    return NUM_CHANNELS;
+  }
+
+  @Override
+  public int getNumMessagesPerChannelOperation() {
+    return NUM_MESSAGES_PER_CHANNEL_OPERATION;
+  }
+
+  @Override
+  public int getMessageLength() {
+    return MESSAGE_LENGTH;
+  }
+
+  @Override
+  public String getControlChannel() {
+    return CONTROL_CHANNEL;
+  }
+
+
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubSmallBenchmarkConfiguration.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubSmallBenchmarkConfiguration.java
new file mode 100644
index 0000000..c9a27a9
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/PubSubSmallBenchmarkConfiguration.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.benchmark.redis.tests;
+
+import java.util.concurrent.CyclicBarrier;
+
+public class PubSubSmallBenchmarkConfiguration extends PubSubBenchmarkConfiguration {
+
+  private static final int NUM_SUBSCRIBERS = 1;
+  private static final int NUM_CHANNELS = 1;
+  private static final int NUM_MESSAGES_PER_CHANNEL_OPERATION = 1;
+  private static final int MESSAGE_LENGTH = 1;
+  private static final String CONTROL_CHANNEL = "__control__";
+  private static final String END_MESSAGE = "END";
+
+  private static final CyclicBarrier BARRIER = new CyclicBarrier(NUM_SUBSCRIBERS + 1);
+
+  private final boolean shouldUseChannelPattern;
+
+  public PubSubSmallBenchmarkConfiguration() {
+    this(false);
+  }
+
+  public PubSubSmallBenchmarkConfiguration(final boolean shouldUseChannelPattern) {
+    this.shouldUseChannelPattern = shouldUseChannelPattern;
+  }
+
+  @Override
+  public boolean shouldUseChannelPattern() {
+    return shouldUseChannelPattern;
+  }
+
+  @Override
+  public String getEndMessage() {
+    return END_MESSAGE;
+  }
+
+  @Override
+  public CyclicBarrier getCyclicBarrier() {
+    return BARRIER;
+  }
+
+  @Override
+  public int getNumSubscribers() {
+    return NUM_SUBSCRIBERS;
+  }
+
+  @Override
+  public int getNumChannels() {
+    return NUM_CHANNELS;
+  }
+
+  @Override
+  public int getNumMessagesPerChannelOperation() {
+    return NUM_MESSAGES_PER_CHANNEL_OPERATION;
+  }
+
+  @Override
+  public int getMessageLength() {
+    return MESSAGE_LENGTH;
+  }
+
+  @Override
+  public String getControlChannel() {
+    return CONTROL_CHANNEL;
+  }
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisBenchmark.java
index 79a2a71..ffd079e 100644
--- a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisBenchmark.java
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisBenchmark.java
@@ -97,19 +97,10 @@ public class RedisBenchmark extends AbstractPerformanceTest {
 
   @Override
   public TestConfig configure() {
+
     TestConfig config = GeodeBenchmark.createConfig();
 
-    switch (getRedisClusterImplementation()) {
-      case Redis:
-        RedisTopology.configure(config);
-        break;
-      case Geode:
-        GeodeTopology.configure(config);
-        break;
-      case Manual:
-        ManualRedisTopology.configure(config);
-        break;
-    }
+    configureClusterTopology(config);
 
     switch (getRedisClientImplementation()) {
       case Jedis:
@@ -131,7 +122,21 @@ public class RedisBenchmark extends AbstractPerformanceTest {
     return config;
   }
 
-  private RedisClientImplementation getRedisClientImplementation() {
+  void configureClusterTopology(final TestConfig config) {
+    switch (getRedisClusterImplementation()) {
+      case Redis:
+        RedisTopology.configure(config);
+        break;
+      case Geode:
+        GeodeTopology.configure(config);
+        break;
+      case Manual:
+        ManualRedisTopology.configure(config);
+        break;
+    }
+  }
+
+  RedisClientImplementation getRedisClientImplementation() {
     final String sniProp = System.getProperty(WITH_REDIS_CLIENT_PROPERTY);
     if (isNullOrEmpty(sniProp)) {
       return Jedis;
@@ -140,7 +145,7 @@ public class RedisBenchmark extends AbstractPerformanceTest {
     return RedisClientImplementation.valueOfIgnoreCase(sniProp);
   }
 
-  private RedisClusterImplementation getRedisClusterImplementation() {
+  RedisClusterImplementation getRedisClusterImplementation() {
     final String sniProp = System.getProperty(WITH_REDIS_CLUSTER_PROPERTY);
     if (isNullOrEmpty(sniProp)) {
       return Geode;
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubLargeBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubLargeBenchmark.java
new file mode 100644
index 0000000..1eab809
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubLargeBenchmark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.benchmark.redis.tests;
+
+import org.junit.jupiter.api.Disabled;
+
+import org.apache.geode.benchmark.tests.GeodeBenchmark;
+import org.apache.geode.perftest.TestConfig;
+
+@Disabled("Disabled due to high variation in average latency")
+public class RedisPubSubLargeBenchmark extends RedisBenchmark {
+
+  @Override
+  public TestConfig configure() {
+    final TestConfig testConfig = GeodeBenchmark.createConfig();
+    final PubSubBenchmarkConfiguration pubSubConfig = new PubSubLargeBenchmarkConfiguration();
+    pubSubConfig.configurePubSubTest(this, testConfig);
+    return testConfig;
+  }
+
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubPatternLargeBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubPatternLargeBenchmark.java
new file mode 100644
index 0000000..dad999e
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubPatternLargeBenchmark.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.benchmark.redis.tests;
+
+import org.junit.jupiter.api.Disabled;
+
+import org.apache.geode.benchmark.tests.GeodeBenchmark;
+import org.apache.geode.perftest.TestConfig;
+
+@Disabled("Disabled due to high variation in average latency")
+public class RedisPubSubPatternLargeBenchmark extends RedisBenchmark {
+
+  @Override
+  public TestConfig configure() {
+    final TestConfig testConfig = GeodeBenchmark.createConfig();
+    final PubSubBenchmarkConfiguration pubSubConfig = new PubSubLargeBenchmarkConfiguration(true);
+    pubSubConfig.configurePubSubTest(this, testConfig);
+    return testConfig;
+  }
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubPatternSmallBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubPatternSmallBenchmark.java
new file mode 100644
index 0000000..8c557ee
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubPatternSmallBenchmark.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.benchmark.redis.tests;
+
+import org.junit.jupiter.api.Disabled;
+
+import org.apache.geode.benchmark.tests.GeodeBenchmark;
+import org.apache.geode.perftest.TestConfig;
+
+@Disabled("Disabled due to high variation in average latency")
+public class RedisPubSubPatternSmallBenchmark extends RedisBenchmark {
+
+  @Override
+  public TestConfig configure() {
+    final TestConfig testConfig = GeodeBenchmark.createConfig();
+    final PubSubBenchmarkConfiguration pubSubConfig = new PubSubSmallBenchmarkConfiguration(true);
+    pubSubConfig.configurePubSubTest(this, testConfig);
+    return testConfig;
+  }
+}
diff --git a/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubSmallBenchmark.java b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubSmallBenchmark.java
new file mode 100644
index 0000000..92d9bde
--- /dev/null
+++ b/geode-benchmarks/src/main/java/org/apache/geode/benchmark/redis/tests/RedisPubSubSmallBenchmark.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.geode.benchmark.redis.tests;
+
+import org.junit.jupiter.api.Disabled;
+
+import org.apache.geode.benchmark.tests.GeodeBenchmark;
+import org.apache.geode.perftest.TestConfig;
+
+@Disabled("Disabled due to high variation in average latency")
+public class RedisPubSubSmallBenchmark extends RedisBenchmark {
+
+  @Override
+  public TestConfig configure() {
+    final TestConfig testConfig = GeodeBenchmark.createConfig();
+    final PubSubSmallBenchmarkConfiguration pubsubConfig = new PubSubSmallBenchmarkConfiguration();
+    pubsubConfig.configurePubSubTest(this, testConfig);
+    return testConfig;
+  }
+}
diff --git a/geode-benchmarks/src/test/java/org/apache/geode/benchmark/tasks/WeightedTasksTest.java b/geode-benchmarks/src/test/java/org/apache/geode/benchmark/tasks/WeightedTasksTest.java
index 0200601..df7dff6 100644
--- a/geode-benchmarks/src/test/java/org/apache/geode/benchmark/tasks/WeightedTasksTest.java
+++ b/geode-benchmarks/src/test/java/org/apache/geode/benchmark/tasks/WeightedTasksTest.java
@@ -16,7 +16,7 @@ package org.apache.geode.benchmark.tasks;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.data.Offset.offset;
-import static org.mockito.Matchers.same;
+import static org.mockito.ArgumentMatchers.same;
 import static org.mockito.Mockito.atLeastOnce;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockingDetails;
diff --git a/gradle/dependency-versions.properties b/gradle/dependency-versions.properties
index 7a5a52f..d496b8a 100644
--- a/gradle/dependency-versions.properties
+++ b/gradle/dependency-versions.properties
@@ -30,3 +30,4 @@ JSON.version = 20210307
 jedis.version = 3.6.0
 lettuce.version = 6.1.1.RELEASE
 classgraph.version = 4.8.105
+vavr.version = 1.0.0-alpha-4