You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/04/22 13:08:53 UTC
[geode] branch develop updated: GEODE-8002: Extract common
concurrent execution test code into LoopingThreads class (#4973)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 7dbd970 GEODE-8002: Extract common concurrent execution test code into LoopingThreads class (#4973)
7dbd970 is described below
commit 7dbd9705970db2fe07bff8d21ed4bd0dc00fa8b0
Author: Murtuza Boxwala <1m...@gmail.com>
AuthorDate: Wed Apr 22 09:08:07 2020 -0400
GEODE-8002: Extract common concurrent execution test code into LoopingThreads class (#4973)
---
.../apache/geode/redis/HashesIntegrationTest.java | 314 ++++++---------------
.../apache/geode/redis/StringsIntegrationTest.java | 82 ++----
.../redis/general/ConcurrentLoopingThreads.java | 88 ++++++
.../geode/redis/general/ExistsIntegrationTest.java | 72 ++---
4 files changed, 221 insertions(+), 335 deletions(-)
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java
index 4d35d48..776d6ee 100755
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java
@@ -38,14 +38,11 @@ import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.RandomStringUtils;
+import org.assertj.core.util.Maps;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -59,6 +56,7 @@ import redis.clients.jedis.exceptions.JedisDataException;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.redis.general.ConcurrentLoopingThreads;
import org.apache.geode.test.junit.categories.RedisTest;
@Category({RedisTest.class})
@@ -478,52 +476,33 @@ public class HashesIntegrationTest {
@Test
public void testConcurrentHMSet_differentKeyPerClient() throws InterruptedException {
- String key1 = "HMSET" + randString();
- String key2 = "HMSET" + randString();
- Map<String, String> record1 = new HashMap<String, String>();
- Map<String, String> record2 = new HashMap<String, String>();
-
- Runnable runnable1 = () -> doABunchOfHMSets(key1, record1, jedis);
- Runnable runnable2 = () -> doABunchOfHMSets(key2, record2, jedis2);
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
+ String key1 = "HMSET1";
+ String key2 = "HMSET2";
+ Map<String, String> expectedMap = new HashMap<String, String>();
+ for (int i = 0; i < ITERATION_COUNT; i++) {
+ expectedMap.put("field" + i, "value" + i);
+ }
- Map<String, String> result = jedis.hgetAll(key1);
- assertEquals(record1.size(), result.size());
- assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
- assertThat(result.values().containsAll(record1.values())).isTrue();
-
- Map<String, String> result2 = jedis.hgetAll(key2);
- assertEquals(record2.size(), result2.size());
- assertThat(result2.keySet().containsAll(record2.keySet())).isTrue();
- assertThat(result2.values().containsAll(record2.values())).isTrue();
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> jedis.hmset(key1, Maps.newHashMap("field" + i, "value" + i)),
+ (i) -> jedis2.hmset(key2, Maps.newHashMap("field" + i, "value" + i)))
+ .run();
+
+ assertThat(jedis.hgetAll(key1)).isEqualTo(expectedMap);
+ assertThat(jedis.hgetAll(key2)).isEqualTo(expectedMap);
}
@Test
public void testConcurrentHMSet_sameKeyPerClient() throws InterruptedException {
- String key = "HMSET" + randString();
- Map<String, String> record1 = new HashMap<String, String>();
- Map<String, String> record2 = new HashMap<String, String>();
-
- Runnable runnable1 = () -> doABunchOfHMSets(key, record1, jedis);
- Runnable runnable2 = () -> doABunchOfHMSets(key, record2, jedis2);
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
+ String key = "HMSET1";
+
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> jedis.hmset(key, Maps.newHashMap("fieldA" + i, "valueA" + i)),
+ (i) -> jedis2.hmset(key, Maps.newHashMap("fieldB" + i, "valueB" + i)))
+ .run();
Map<String, String> result = jedis.hgetAll(key);
- assertEquals(record1.size() + record2.size(), result.size());
- assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
- assertThat(result.keySet().containsAll(record2.keySet())).isTrue();
- assertThat(result.values().containsAll(record1.values())).isTrue();
- assertThat(result.values().containsAll(record2.values())).isTrue();
+ assertThat(result).hasSize(ITERATION_COUNT * 2);
}
@Test
@@ -536,109 +515,60 @@ public class HashesIntegrationTest {
fields.add(randString());
}
+ AtomicLong successCount = new AtomicLong();
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> successCount.addAndGet(jedis.hsetnx(key, "field" + i, "A")),
+ (i) -> successCount.addAndGet(jedis2.hsetnx(key, "field" + i, "B")))
+ .run();
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService pool = Executors.newFixedThreadPool(2);
- Callable<Integer> callable1 = () -> doABunchOfHSetNXs(key, fields, "Thread1", jedis, latch);
- Callable<Integer> callable2 = () -> doABunchOfHSetNXs(key, fields, "Thread2", jedis2, latch);
- Future<Integer> future1 = pool.submit(callable1);
- Future<Integer> future2 = pool.submit(callable2);
-
- latch.countDown();
- assertThat(future1.get() + future2.get()).isEqualTo(ITERATION_COUNT);
-
- pool.shutdown();
+ assertThat(successCount.get()).isEqualTo(ITERATION_COUNT);
}
- private int doABunchOfHSetNXs(String key, ArrayList<String> fields, String fieldValue,
- Jedis jedis, CountDownLatch latch) throws InterruptedException {
- int successes = 0;
-
- latch.await();
+ @Test
+ public void testConcurrentHSet_differentKeyPerClient() {
+ String key1 = "HSET1";
+ String key2 = "HSET2";
+ Map<String, String> expectedMap = new HashMap<String, String>();
for (int i = 0; i < ITERATION_COUNT; i++) {
- if (jedis.hsetnx(key, fields.get(i), fieldValue) == 1) {
- successes++;
- Thread.yield();
- }
+ expectedMap.put("field" + i, "value" + i);
}
- return successes;
- }
- @Test
- public void testConcurrentHSet_differentKeyPerClient() throws InterruptedException {
- String key1 = "HSET" + randString();
- String key2 = "HSET" + randString();
- Map<String, String> record1 = new HashMap<String, String>();
- Map<String, String> record2 = new HashMap<String, String>();
-
- Runnable runnable1 = () -> doABunchOfHSets(key1, record1, jedis);
- Runnable runnable2 = () -> doABunchOfHSets(key2, record2, jedis2);
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> jedis.hset(key1, "field" + i, "value" + i),
+ (i) -> jedis2.hset(key2, "field" + i, "value" + i))
+ .run();
- Map<String, String> result = jedis.hgetAll(key1);
- assertEquals(record1.size(), result.size());
- assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
- assertThat(result.values().containsAll(record1.values())).isTrue();
-
- Map<String, String> result2 = jedis.hgetAll(key2);
- assertEquals(record2.size(), result2.size());
- assertThat(result2.keySet().containsAll(record2.keySet())).isTrue();
- assertThat(result2.values().containsAll(record2.values())).isTrue();
+ assertThat(jedis.hgetAll(key1)).isEqualTo(expectedMap);
+ assertThat(jedis.hgetAll(key2)).isEqualTo(expectedMap);
}
@Test
public void testConcurrentHSet_sameKeyPerClient() throws InterruptedException {
- String key1 = "HSET" + randString();
- Map<String, String> record1 = new HashMap<String, String>();
- Map<String, String> record2 = new HashMap<String, String>();
-
- Runnable runnable1 = () -> doABunchOfHSets(key1, record1, jedis);
- Runnable runnable2 = () -> doABunchOfHSets(key1, record2, jedis2);
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
+ String key1 = "HSET1";
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> jedis.hset(key1, "fieldA" + i, "value" + i),
+ (i) -> jedis2.hset(key1, "fieldB" + i, "value" + i))
+ .run();
Map<String, String> result = jedis.hgetAll(key1);
- assertEquals(record1.size() + record2.size(), result.size());
- assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
- assertThat(result.keySet().containsAll(record2.keySet())).isTrue();
- assertThat(result.values().containsAll(record1.values())).isTrue();
- assertThat(result.values().containsAll(record2.values())).isTrue();
+
+ assertThat(result).hasSize(ITERATION_COUNT * 2);
}
@Test
public void testConcurrentHIncr_sameKeyPerClient() throws InterruptedException {
- String key1 = "HSET" + randString();
- String field = "FIELD" + randString();
+ String key = "KEY";
+ String field = "FIELD";
+ jedis.hset(key, field, "0");
- jedis.hset(key1, field, "0");
-
- Runnable runnable1 = () -> doABunchOfHIncrs(key1, field, ITERATION_COUNT / 2, jedis);
- Runnable runnable2 = () -> doABunchOfHIncrs(key1, field, ITERATION_COUNT / 2, jedis2);
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
-
- String value = jedis.hget(key1, field);
- assertThat(value).isEqualTo(Integer.toString(ITERATION_COUNT));
- }
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> jedis.hincrBy(key, field, 1),
+ (i) -> jedis2.hincrBy(key, field, 1))
+ .run();
- private void doABunchOfHIncrs(String key, String field, int incrCount, Jedis jedis) {
- for (int i = 0; i < incrCount; i++) {
- jedis.hincrBy(key, field, 1);
- }
+ String value = jedis.hget(key, field);
+ assertThat(value).isEqualTo(Integer.toString(ITERATION_COUNT * 2));
}
@Test
@@ -646,28 +576,14 @@ public class HashesIntegrationTest {
String key = "HSET" + randString();
String field = "FIELD" + randString();
-
jedis.hset(key, field, "0");
- Runnable runnable1 = () -> doABunchOfHIncrByFloats(key, field, ITERATION_COUNT / 2, 1.0, jedis);
- Runnable runnable2 =
- () -> doABunchOfHIncrByFloats(key, field, ITERATION_COUNT / 2, 0.5, jedis2);
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> jedis.hincrByFloat(key, field, 0.5),
+ (i) -> jedis.hincrByFloat(key, field, 1.0)).run();
String value = jedis.hget(key, field);
- assertThat(value).isEqualTo(String.format("%.0f", ITERATION_COUNT * 0.75));
- }
-
- private void doABunchOfHIncrByFloats(String key, String field, int incrCount, double incrValue,
- Jedis jedis) {
- for (int i = 0; i < incrCount; i++) {
- jedis.hincrByFloat(key, field, incrValue);
- }
+ assertThat(value).isEqualTo(String.format("%.0f", ITERATION_COUNT * 1.5));
}
@Test
@@ -681,79 +597,52 @@ public class HashesIntegrationTest {
@Test
public void testConcurrentHSetHDel_sameKeyPerClient() throws InterruptedException {
- String key1 = "HSET" + randString();
+ String key = "HSET1";
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(ITERATION_COUNT);
- Runnable runnable1 = () -> doABunchOfHSetsWithBlockingQueue(key1, blockingQueue, jedis);
- Runnable runnable2 = () -> doABunchOfHDelsWithBlockingQueue(key1, blockingQueue, jedis2);
- Thread thread1 = new Thread(runnable1);
- Thread thread2 = new Thread(runnable2);
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> {
+ jedis.hset(key, "field" + i, "value" + i);
+ blockingQueue.add("field" + i);
+ },
+ (i) -> {
+ try {
+ String fieldToDelete = blockingQueue.take();
+ jedis.hdel(key, fieldToDelete);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ })
+ .run();
+
+ Map<String, String> result = jedis.hgetAll(key);
- Map<String, String> result = jedis.hgetAll(key1);
assertThat(result).isEmpty();
}
@Test
public void testConcurrentHGetAll() throws InterruptedException, ExecutionException {
- String key1 = "HSET" + randString();
+ String key = "HSET1";
HashMap<String, String> record = new HashMap<>();
- doABunchOfHSets(key1, record, jedis);
-
- ExecutorService pool = Executors.newFixedThreadPool(2);
- Callable<Integer> callable1 = () -> doABunchOfHGetAlls(key1, jedis);
- Callable<Integer> callable2 = () -> doABunchOfHGetAlls(key1, jedis2);
- Future<Integer> future1 = pool.submit(callable1);
- Future<Integer> future2 = pool.submit(callable2);
-
- assertThat(future1.get()).isEqualTo(ITERATION_COUNT);
- assertThat(future2.get()).isEqualTo(ITERATION_COUNT);
+ doABunchOfHSets(key, record, jedis);
- pool.shutdown();
- }
-
- private int doABunchOfHGetAlls(String key, Jedis jedis) {
- int returnedCount = 0;
- for (int i = 0; i < ITERATION_COUNT; i++) {
- if (jedis.hgetAll(key).size() == ITERATION_COUNT) {
- returnedCount++;
- }
- }
- return returnedCount;
- }
-
- private void doABunchOfHDelsWithBlockingQueue(String key,
- ArrayBlockingQueue<String> blockingQueue,
- Jedis jedis) {
- String field;
- for (int i = 0; i < ITERATION_COUNT; i++) {
- try {
- field = blockingQueue.take();
- } catch (InterruptedException e) {
- throw new RuntimeException("HDel thread was interrupted unexpectedly", e);
- }
- jedis.hdel(key, field);
- }
- }
-
- private void doABunchOfHSetsWithBlockingQueue(String key,
- ArrayBlockingQueue<String> blockingQueue,
- Jedis jedis) {
- String field;
- String fieldValue;
- for (int i = 0; i < ITERATION_COUNT; i++) {
- field = randString();
- fieldValue = randString();
-
- jedis.hset(key, field, fieldValue);
+ AtomicLong successCount = new AtomicLong();
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> {
+ if (jedis.hgetAll(key).size() == ITERATION_COUNT) {
+ successCount.incrementAndGet();
+ }
+ },
+ (i) -> {
+ if (jedis.hgetAll(key).size() == ITERATION_COUNT) {
+ successCount.incrementAndGet();
+ }
+ })
+ .run();
- blockingQueue.add(field);
- }
+ assertThat(successCount.get()).isEqualTo(ITERATION_COUNT * 2);
}
private void doABunchOfHSets(String key, Map<String, String> record, Jedis jedis) {
@@ -769,21 +658,6 @@ public class HashesIntegrationTest {
}
}
- private void doABunchOfHMSets(String key, Map<String, String> record, Jedis jedis) {
- String field;
- String fieldValue;
- for (int i = 0; i < ITERATION_COUNT; i++) {
- field = randString();
- fieldValue = randString();
- Map<String, String> hmsetMap = new HashMap<>();
- hmsetMap.put(field, fieldValue);
-
- record.put(field, fieldValue);
-
- jedis.hmset(key, hmsetMap);
- }
- }
-
private String randString() {
int length = rand.nextInt(8) + 5;
return RandomStringUtils.randomAlphanumeric(length);
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java
index 9f38251..48a461a 100755
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.AfterClass;
@@ -48,6 +49,7 @@ import redis.clients.jedis.params.SetParams;
import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.GemFireCache;
import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.redis.general.ConcurrentLoopingThreads;
import org.apache.geode.redis.internal.RedisConstants;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.junit.categories.RedisTest;
@@ -787,46 +789,27 @@ public class StringsIntegrationTest {
}
@Test
- public void testConcurrentDel_differentClients()
- throws InterruptedException, ExecutionException {
+ public void testConcurrentDel_differentClients() {
String keyBaseName = "DELBASE";
- doABunchOfSets(keyBaseName, jedis);
+ new ConcurrentLoopingThreads(
+ ITERATION_COUNT,
+ (i) -> jedis.set(keyBaseName + i, "value" + i))
+ .run();
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService pool = Executors.newFixedThreadPool(2);
- Callable<Integer> callable1 = () -> doABunchOfDels(keyBaseName, 0, jedis, latch);
- Callable<Integer> callable2 = () -> doABunchOfDels(keyBaseName, 1, jedis2, latch);
- Future<Integer> future1 = pool.submit(callable1);
- Future<Integer> future2 = pool.submit(callable2);
+ AtomicLong deletedCount = new AtomicLong();
+ new ConcurrentLoopingThreads(ITERATION_COUNT,
+ (i) -> deletedCount.addAndGet(jedis.del(keyBaseName + i)),
+ (i) -> deletedCount.addAndGet(jedis2.del(keyBaseName + i)))
+ .run();
- latch.countDown();
- assertThat(future1.get() + future2.get()).isEqualTo(ITERATION_COUNT);
+ assertThat(deletedCount.get()).isEqualTo(ITERATION_COUNT);
for (int i = 0; i < ITERATION_COUNT; i++) {
assertThat(jedis.get(keyBaseName + i)).isNull();
}
- pool.shutdown();
- }
-
- private void doABunchOfSets(String keyBaseName, Jedis jedis) {
- for (int i = 0; i < ITERATION_COUNT; i++) {
- jedis.set(keyBaseName + i, "value" + i);
- }
- }
-
- private int doABunchOfDels(String keyBaseName, int start, Jedis jedis, CountDownLatch latch)
- throws InterruptedException {
- int delCount = 0;
- latch.await();
-
- for (int i = start; i < ITERATION_COUNT; i += 2) {
- delCount += jedis.del(keyBaseName + i);
- Thread.yield();
- }
- return delCount;
}
@Test
@@ -870,29 +853,15 @@ public class StringsIntegrationTest {
public void testDecr_shouldBeAtomic() throws ExecutionException, InterruptedException {
jedis.set("contestedKey", "0");
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService pool = Executors.newFixedThreadPool(2);
- Callable<Integer> callable1 = () -> doABunchOfDecrs(jedis, latch);
- Callable<Integer> callable2 = () -> doABunchOfDecrs(jedis2, latch);
- Future<Integer> future1 = pool.submit(callable1);
- Future<Integer> future2 = pool.submit(callable2);
-
- latch.countDown();
-
- future1.get();
- future2.get();
+ new ConcurrentLoopingThreads(
+ ITERATION_COUNT,
+ (i) -> jedis.decr("contestedKey"),
+ (i) -> jedis2.decr("contestedKey"))
+ .run();
assertThat(jedis.get("contestedKey")).isEqualTo(Integer.toString(-2 * ITERATION_COUNT));
}
- private Integer doABunchOfDecrs(Jedis jedis, CountDownLatch latch) throws InterruptedException {
- latch.await();
- for (int i = 0; i < ITERATION_COUNT; i++) {
- jedis.decr("contestedKey");
- }
- return ITERATION_COUNT;
- }
-
@Test
public void testIncr() {
String oneHundredKey = randString();
@@ -945,17 +914,12 @@ public class StringsIntegrationTest {
public void testIncr_shouldBeAtomic() throws ExecutionException, InterruptedException {
jedis.set("contestedKey", "0");
- CountDownLatch latch = new CountDownLatch(1);
- ExecutorService pool = Executors.newFixedThreadPool(2);
- Callable<Integer> callable1 = () -> doABunchOfIncrs(jedis, latch);
- Callable<Integer> callable2 = () -> doABunchOfIncrs(jedis2, latch);
- Future<Integer> future1 = pool.submit(callable1);
- Future<Integer> future2 = pool.submit(callable2);
-
- latch.countDown();
+ new ConcurrentLoopingThreads(
+ ITERATION_COUNT,
+ (i) -> jedis.incr("contestedKey"),
+ (i) -> jedis2.incr("contestedKey"))
+ .run();
- future1.get();
- future2.get();
assertThat(jedis.get("contestedKey")).isEqualTo(Integer.toString(2 * ITERATION_COUNT));
}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ConcurrentLoopingThreads.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ConcurrentLoopingThreads.java
new file mode 100644
index 0000000..b609d79
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ConcurrentLoopingThreads.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.general;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+public class ConcurrentLoopingThreads {
+ private final int iterationCount;
+ private final Consumer<Integer>[] functions;
+
+ @SafeVarargs
+ public ConcurrentLoopingThreads(int iterationCount,
+ Consumer<Integer>... functions) {
+ this.iterationCount = iterationCount;
+ this.functions = functions;
+ }
+
+ public void run() {
+ CountDownLatch latch = new CountDownLatch(1);
+ Stream<LoopingThread> loopingThreadStream = Arrays
+ .stream(functions)
+ .map((r) -> new LoopingThread(r, iterationCount, latch))
+ .map((t) -> {
+ t.start();
+ return t;
+ });
+
+ latch.countDown();
+
+ loopingThreadStream.forEach(loopingThread -> {
+ try {
+ loopingThread.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ }
+
+ private class LoopingRunnable implements Runnable {
+ private final Consumer<Integer> runnable;
+ private final int iterationCount;
+ private CountDownLatch startLatch;
+
+ public LoopingRunnable(Consumer<Integer> runnable, int iterationCount,
+ CountDownLatch startLatch) {
+ this.runnable = runnable;
+ this.iterationCount = iterationCount;
+ this.startLatch = startLatch;
+ }
+
+ @Override
+ public void run() {
+ try {
+ startLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ for (int i = 0; i < iterationCount; i++) {
+ runnable.accept(i);
+ Thread.yield();
+ }
+ }
+ }
+
+ private class LoopingThread extends Thread {
+ public LoopingThread(Consumer<Integer> runnable, int iterationCount,
+ CountDownLatch latch) {
+ super(new LoopingRunnable(runnable, iterationCount, latch));
+ }
+ }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java
index 47660e1..abd1c37 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java
@@ -21,7 +21,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
import org.junit.After;
import org.junit.AfterClass;
@@ -268,75 +267,36 @@ public class ExistsIntegrationTest {
}
@Test
- public void shouldCorrectlyVerifyKeysExistConcurrently() throws InterruptedException {
+ public void shouldCorrectlyVerifyKeysExistConcurrently() {
int iterationCount = 5000;
- setKeys(jedis, iterationCount);
- AtomicLong existsCount = new AtomicLong(0);
-
- Thread thread1 =
- new LoopingThread((i) -> existsCount.addAndGet(jedis.exists(toArray("key" + i))),
- iterationCount);
- Thread thread2 =
- new LoopingThread((i) -> existsCount.addAndGet(jedis2.exists(toArray("key" + i))),
- iterationCount);
+ new ConcurrentLoopingThreads(iterationCount, (i) -> jedis.set("key" + i, "value" + i)).run();
- thread1.start();
- thread2.start();
- thread1.join();
- thread2.join();
+ AtomicLong existsCount = new AtomicLong(0);
+ new ConcurrentLoopingThreads(
+ iterationCount,
+ (i) -> existsCount.addAndGet(jedis.exists(toArray("key" + i))),
+ (i) -> existsCount.addAndGet(jedis2.exists(toArray("key" + i))))
+ .run();
assertThat(existsCount.get()).isEqualTo(2 * iterationCount);
}
@Test
- public void shouldNotThrowExceptionsWhenConcurrentlyCreatingCheckingAndDeletingKeys()
- throws InterruptedException {
- int iterationCount = 5000;
-
- Thread loopingThread1 = new LoopingThread((i) -> jedis.set("key", "value"), iterationCount);
- Thread loopingThread2 = new LoopingThread((i) -> jedis2.exists(toArray("key")), iterationCount);
- Thread loopingThread3 = new LoopingThread((i) -> jedis3.del("key"), iterationCount);
+ public void shouldNotThrowExceptionsWhenConcurrentlyCreatingCheckingAndDeletingKeys() {
- loopingThread1.start();
- loopingThread2.start();
- loopingThread3.start();
- loopingThread1.join();
- loopingThread2.join();
- loopingThread3.join();
+ int iterationCount = 5000;
+ new ConcurrentLoopingThreads(
+ iterationCount,
+ (i) -> jedis.set("key", "value"),
+ (i) -> jedis2.exists(toArray("key")),
+ (i) -> jedis3.del("key"))
+ .run();
}
public String[] toArray(String... strings) {
return strings;
}
- private void setKeys(Jedis jedis, int iterationCount) {
- for (int i = 0; i < iterationCount; i++) {
- jedis.set("key" + i, "value" + i);
- }
- }
-
- private class LoopingThread extends Thread {
- public LoopingThread(Function<Integer, Object> runnable, int iterationCount) {
- super(new LoopingRunnable(runnable, iterationCount));
- }
- }
-
- private class LoopingRunnable implements Runnable {
- private final Function<Integer, Object> runnable;
- private final int iterationCount;
-
- public LoopingRunnable(Function<Integer, Object> runnable, int iterationCount) {
- this.runnable = runnable;
- this.iterationCount = iterationCount;
- }
-
- @Override
- public void run() {
- for (int i = 0; i < iterationCount; i++) {
- runnable.apply(i);
- }
- }
- }
}