You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/04/01 15:23:03 UTC
[geode] branch develop updated: GEODE-7925: Add concurrency test to
PubSubDUnitTest (#4884)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 9036ab9 GEODE-7925: Add concurrency test to PubSubDUnitTest (#4884)
9036ab9 is described below
commit 9036ab930baf535d08101b625af1d1bc0dbe6373
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Wed Apr 1 08:22:37 2020 -0700
GEODE-7925: Add concurrency test to PubSubDUnitTest (#4884)
- Use ConfigurationProperties instead of DistributionConfig constants
- Switch to using an ExecutorServiceRule instead of raw threads
Authored-by: Jens Deppe <jd...@vmware.com>
---
.../org/apache/geode/redis/PubSubDUnitTest.java | 80 ++++++++++++++++++----
1 file changed, 68 insertions(+), 12 deletions(-)
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java
index 4e930bc..c55a96b 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/PubSubDUnitTest.java
@@ -18,8 +18,12 @@ package org.apache.geode.redis;
import static org.assertj.core.api.Assertions.assertThat;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Properties;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.junit.BeforeClass;
@@ -27,17 +31,23 @@ import org.junit.ClassRule;
import org.junit.Test;
import redis.clients.jedis.Jedis;
+import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.rules.ExecutorServiceRule;
public class PubSubDUnitTest {
public static final String CHANNEL_NAME = "salutations";
+
@ClassRule
public static ClusterStartupRule cluster = new ClusterStartupRule();
+ @ClassRule
+ public static ExecutorServiceRule executor = new ExecutorServiceRule();
+
private static int[] ports;
private static MemberVM server1, server2;
@@ -46,13 +56,13 @@ public class PubSubDUnitTest {
public static void beforeClass() {
ports = AvailablePortHelper.getRandomAvailableTCPPorts(2);
Properties redisProps = new Properties();
- redisProps.put("redis-bind-address", "localhost");
+ redisProps.put(ConfigurationProperties.REDIS_BIND_ADDRESS, "localhost");
MemberVM locator = cluster.startLocatorVM(0);
- redisProps.put("redis-port", Integer.toString(ports[0]));
+ redisProps.put(ConfigurationProperties.REDIS_PORT, Integer.toString(ports[0]));
server1 = cluster.startServerVM(1, redisProps, locator.getPort());
- redisProps.put("redis-port", Integer.toString(ports[1]));
+ redisProps.put(ConfigurationProperties.REDIS_PORT, Integer.toString(ports[1]));
server2 = cluster.startServerVM(2, redisProps, locator.getPort());
}
@@ -66,13 +76,10 @@ public class PubSubDUnitTest {
MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
- Runnable runnable1 = () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME);
- Thread subscriberThread1 = new Thread(runnable1);
- subscriberThread1.start();
-
- Runnable runnable2 = () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME);
- Thread subscriberThread2 = new Thread(runnable2);
- subscriberThread2.start();
+ Future<Void> subscriber1Future = executor.submit(
+ () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+ Future<Void> subscriber2Future = executor.submit(
+ () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
assertThat(latch.await(30, TimeUnit.SECONDS))
.as("channel subscription was not received")
@@ -84,8 +91,57 @@ public class PubSubDUnitTest {
mockSubscriber1.unsubscribe(CHANNEL_NAME);
mockSubscriber2.unsubscribe(CHANNEL_NAME);
- GeodeAwaitility.await().untilAsserted(subscriberThread1::join);
- GeodeAwaitility.await().untilAsserted(subscriberThread2::join);
+ GeodeAwaitility.await().untilAsserted(subscriber1Future::get);
+ GeodeAwaitility.await().untilAsserted(subscriber2Future::get);
+ }
+
+ @Test
+ public void testConcurrentPubSub() throws Exception {
+ int CLIENT_COUNT = 10;
+ int ITERATIONS = 1000;
+
+ Jedis subscriber1 = new Jedis("localhost", ports[0]);
+ Jedis subscriber2 = new Jedis("localhost", ports[1]);
+
+ CountDownLatch latch = new CountDownLatch(2);
+ MockSubscriber mockSubscriber1 = new MockSubscriber(latch);
+ MockSubscriber mockSubscriber2 = new MockSubscriber(latch);
+
+ Future<Void> subscriber1Future = executor.submit(
+ () -> subscriber1.subscribe(mockSubscriber1, CHANNEL_NAME));
+ Future<Void> subscriber2Future = executor.submit(
+ () -> subscriber2.subscribe(mockSubscriber2, CHANNEL_NAME));
+
+ assertThat(latch.await(30, TimeUnit.SECONDS))
+ .as("channel subscription was not received")
+ .isTrue();
+
+ List<Future<Void>> futures = new LinkedList<>();
+ for (int i = 0; i < CLIENT_COUNT; i++) {
+ Jedis client = new Jedis("localhost", ports[i % 2]);
+
+ Callable<Void> callable = () -> {
+ for (int j = 0; j < ITERATIONS; j++) {
+ client.publish(CHANNEL_NAME, "hello");
+ }
+ return null;
+ };
+
+ futures.add(executor.submit(callable));
+ }
+
+ for (Future<Void> future : futures) {
+ GeodeAwaitility.await().untilAsserted(future::get);
+ }
+
+ mockSubscriber1.unsubscribe(CHANNEL_NAME);
+ mockSubscriber2.unsubscribe(CHANNEL_NAME);
+
+ GeodeAwaitility.await().untilAsserted(subscriber1Future::get);
+ GeodeAwaitility.await().untilAsserted(subscriber2Future::get);
+
+ assertThat(mockSubscriber1.getReceivedMessages().size()).isEqualTo(CLIENT_COUNT * ITERATIONS);
+ assertThat(mockSubscriber2.getReceivedMessages().size()).isEqualTo(CLIENT_COUNT * ITERATIONS);
}
}