You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/04/01 15:23:03 UTC

[geode] branch develop updated: GEODE-7925: Add concurrency test to PubSubDUnitTest (#4884)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9036ab9  GEODE-7925: Add concurrency test to PubSubDUnitTest (#4884)
9036ab9 is described below

commit 9036ab930baf535d08101b625af1d1bc0dbe6373
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Wed Apr 1 08:22:37 2020 -0700

    GEODE-7925: Add concurrency test to PubSubDUnitTest (#4884)
    
    - Use ConfigurationProperties instead of DistributionConfig constants
    - Switch to using an ExecutorServiceRule instead of raw threads
    
    Authored-by: Jens Deppe <jd...@vmware.com>
---
 .../org/apache/geode/redis/PubSubDUnitTest.java    | 80 ++++++++++++++++++----
 1 file changed, 68 insertions(+), 12 deletions(-)

diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java
index 4e930bc..c55a96b 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java
@@ -18,8 +18,12 @@ package org.apache.geode.redis;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.BeforeClass;
@@ -27,17 +31,23 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import redis.clients.jedis.Jedis;
 
+import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.internal.AvailablePortHelper;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.rules.ClusterStartupRule;
 import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
 
 public class PubSubDUnitTest {
 
   public static final String CHANNEL_NAME = "salutations";
+
   @ClassRule
   public static ClusterStartupRule cluster = new ClusterStartupRule();
 
+  @ClassRule
+  public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
   private static int[] ports;
 
   private static MemberVM server1, server2;
@@ -46,13 +56,13 @@ public class PubSubDUnitTest {
   public static void beforeClass() {
     ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
     Properties redisProps = new Properties();
-    redisProps.put("redis-bind-address", "localhost");
+    redisProps.put(ConfigurationProperties.REDIS_BIND_ADDRESS, "localhost");
 
     MemberVM locator = cluster.startLocatorVM(0);
 
-    redisProps.put("redis-port", Integer.toString(ports[0]));
+    redisProps.put(ConfigurationProperties.REDIS_PORT, Integer.toString(ports[0]));
     server1 = cluster.startServerVM(1, redisProps, locator.getPort());
-    redisProps.put("redis-port", Integer.toString(ports[1]));
+    redisProps.put(ConfigurationProperties.REDIS_PORT, Integer.toString(ports[1]));
     server2 = cluster.startServerVM(2, redisProps, locator.getPort());
   }
 
@@ -66,13 +76,10 @@ public class PubSubDUnitTest {
     MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
     MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
 
-    Runnable runnable1 = () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME);
-    Thread subscriberThread1 = new Thread(runnable1);
-    subscriberThread1.start();
-
-    Runnable runnable2 = () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME);
-    Thread subscriberThread2 = new Thread(runnable2);
-    subscriberThread2.start();
+    Future<Void> subscriber1Future = executor.submit(
+        () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    Future<Void> subscriber2Future = executor.submit(
+        () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
 
     assertThat(latch.await(30, TimeUnit.SECONDS))
         .as("channel subscription was not received")
@@ -84,8 +91,57 @@ public class PubSubDUnitTest {
     mockSubscriber1.unsubscribe(CHANNEL_NAME);
     mockSubscriber2.unsubscribe(CHANNEL_NAME);
 
-    GeodeAwaitility.await().untilAsserted(subscriberThread1::join);
-    GeodeAwaitility.await().untilAsserted(subscriberThread2::join);
+    GeodeAwaitility.await().untilAsserted(subscriber1Future::get);
+    GeodeAwaitility.await().untilAsserted(subscriber2Future::get);
+  }
+
+  @Test
+  public void testConcurrentPubSub() throws Exception {
+    int CLIENT_COUNT = 10;
+    int ITERATIONS = 1000;
+
+    Jedis subscriber1 = new Jedis("localhost", ports[0]);
+    Jedis subscriber2 = new Jedis("localhost", ports[1]);
+
+    CountDownLatch latch = new CountDownLatch(2);
+    MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
+    MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
+
+    Future<Void> subscriber1Future = executor.submit(
+        () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+    Future<Void> subscriber2Future = executor.submit(
+        () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+
+    assertThat(latch.await(30, TimeUnit.SECONDS))
+        .as("channel subscription was not received")
+        .isTrue();
+
+    List<Future<Void>> futures = new LinkedList<>();
+    for (int i = 0; i < CLIENT_COUNT; i++) {
+      Jedis client = new Jedis("localhost", ports[i % 2]);
+
+      Callable<Void> callable = () -> {
+        for (int j = 0; j < ITERATIONS; j++) {
+          client.publish(CHANNEL_NAME, "hello");
+        }
+        return null;
+      };
+
+      futures.add(executor.submit(callable));
+    }
+
+    for (Future<Void> future : futures) {
+      GeodeAwaitility.await().untilAsserted(future::get);
+    }
+
+    mockSubscriber1.unsubscribe(CHANNEL_NAME);
+    mockSubscriber2.unsubscribe(CHANNEL_NAME);
+
+    GeodeAwaitility.await().untilAsserted(subscriber1Future::get);
+    GeodeAwaitility.await().untilAsserted(subscriber2Future::get);
+
+    assertThat(mockSubscriber1.getReceivedMessages().size()).isEqualTo(CLIENT_COUNT * ITERATIONS);
+    assertThat(mockSubscriber2.getReceivedMessages().size()).isEqualTo(CLIENT_COUNT * ITERATIONS);
   }
 
 }