You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2020/04/22 13:08:53 UTC

[geode] branch develop updated: GEODE-8002: Extract common concurrent execution test code into LoopingThreads class (#4973)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7dbd970  GEODE-8002: Extract common concurrent execution test code into LoopingThreads class (#4973)
7dbd970 is described below

commit 7dbd9705970db2fe07bff8d21ed4bd0dc00fa8b0
Author: Murtuza Boxwala <1m...@gmail.com>
AuthorDate: Wed Apr 22 09:08:07 2020 -0400

    GEODE-8002: Extract common concurrent execution test code into LoopingThreads class (#4973)
---
 .../apache/geode/redis/HashesIntegrationTest.java  | 314 ++++++---------------
 .../apache/geode/redis/StringsIntegrationTest.java |  82 ++----
 .../redis/general/ConcurrentLoopingThreads.java    |  88 ++++++
 .../geode/redis/general/ExistsIntegrationTest.java |  72 ++---
 4 files changed, 221 insertions(+), 335 deletions(-)

diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java
index 4d35d48..776d6ee 100755
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/HashesIntegrationTest.java
@@ -38,14 +38,11 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.assertj.core.util.Maps;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -59,6 +56,7 @@ import redis.clients.jedis.exceptions.JedisDataException;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.redis.general.ConcurrentLoopingThreads;
 import org.apache.geode.test.junit.categories.RedisTest;
 
 @Category({RedisTest.class})
@@ -478,52 +476,33 @@ public class HashesIntegrationTest {
 
   @Test
   public void testConcurrentHMSet_differentKeyPerClient() throws InterruptedException {
-    String key1 = "HMSET" + randString();
-    String key2 = "HMSET" + randString();
-    Map<String, String> record1 = new HashMap<String, String>();
-    Map<String, String> record2 = new HashMap<String, String>();
-
-    Runnable runnable1 = () -> doABunchOfHMSets(key1, record1, jedis);
-    Runnable runnable2 = () -> doABunchOfHMSets(key2, record2, jedis2);
-    Thread thread1 = new Thread(runnable1);
-    Thread thread2 = new Thread(runnable2);
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
+    String key1 = "HMSET1";
+    String key2 = "HMSET2";
+    Map<String, String> expectedMap = new HashMap<String, String>();
+    for (int i = 0; i < ITERATION_COUNT; i++) {
+      expectedMap.put("field" + i, "value" + i);
+    }
 
-    Map<String, String> result = jedis.hgetAll(key1);
-    assertEquals(record1.size(), result.size());
-    assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
-    assertThat(result.values().containsAll(record1.values())).isTrue();
-
-    Map<String, String> result2 = jedis.hgetAll(key2);
-    assertEquals(record2.size(), result2.size());
-    assertThat(result2.keySet().containsAll(record2.keySet())).isTrue();
-    assertThat(result2.values().containsAll(record2.values())).isTrue();
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> jedis.hmset(key1, Maps.newHashMap("field" + i, "value" + i)),
+        (i) -> jedis2.hmset(key2, Maps.newHashMap("field" + i, "value" + i)))
+            .run();
+
+    assertThat(jedis.hgetAll(key1)).isEqualTo(expectedMap);
+    assertThat(jedis.hgetAll(key2)).isEqualTo(expectedMap);
   }
 
   @Test
   public void testConcurrentHMSet_sameKeyPerClient() throws InterruptedException {
-    String key = "HMSET" + randString();
-    Map<String, String> record1 = new HashMap<String, String>();
-    Map<String, String> record2 = new HashMap<String, String>();
-
-    Runnable runnable1 = () -> doABunchOfHMSets(key, record1, jedis);
-    Runnable runnable2 = () -> doABunchOfHMSets(key, record2, jedis2);
-    Thread thread1 = new Thread(runnable1);
-    Thread thread2 = new Thread(runnable2);
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
+    String key = "HMSET1";
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> jedis.hmset(key, Maps.newHashMap("fieldA" + i, "valueA" + i)),
+        (i) -> jedis2.hmset(key, Maps.newHashMap("fieldB" + i, "valueB" + i)))
+            .run();
 
     Map<String, String> result = jedis.hgetAll(key);
-    assertEquals(record1.size() + record2.size(), result.size());
-    assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
-    assertThat(result.keySet().containsAll(record2.keySet())).isTrue();
-    assertThat(result.values().containsAll(record1.values())).isTrue();
-    assertThat(result.values().containsAll(record2.values())).isTrue();
+    assertThat(result).hasSize(ITERATION_COUNT * 2);
   }
 
   @Test
@@ -536,109 +515,60 @@ public class HashesIntegrationTest {
       fields.add(randString());
     }
 
+    AtomicLong successCount = new AtomicLong();
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> successCount.addAndGet(jedis.hsetnx(key, "field" + i, "A")),
+        (i) -> successCount.addAndGet(jedis2.hsetnx(key, "field" + i, "B")))
+            .run();
 
-    CountDownLatch latch = new CountDownLatch(1);
-    ExecutorService pool = Executors.newFixedThreadPool(2);
-    Callable<Integer> callable1 = () -> doABunchOfHSetNXs(key, fields, "Thread1", jedis, latch);
-    Callable<Integer> callable2 = () -> doABunchOfHSetNXs(key, fields, "Thread2", jedis2, latch);
-    Future<Integer> future1 = pool.submit(callable1);
-    Future<Integer> future2 = pool.submit(callable2);
-
-    latch.countDown();
-    assertThat(future1.get() + future2.get()).isEqualTo(ITERATION_COUNT);
-
-    pool.shutdown();
+    assertThat(successCount.get()).isEqualTo(ITERATION_COUNT);
   }
 
-  private int doABunchOfHSetNXs(String key, ArrayList<String> fields, String fieldValue,
-      Jedis jedis, CountDownLatch latch) throws InterruptedException {
-    int successes = 0;
-
-    latch.await();
+  @Test
+  public void testConcurrentHSet_differentKeyPerClient() {
+    String key1 = "HSET1";
+    String key2 = "HSET2";
+    Map<String, String> expectedMap = new HashMap<String, String>();
     for (int i = 0; i < ITERATION_COUNT; i++) {
-      if (jedis.hsetnx(key, fields.get(i), fieldValue) == 1) {
-        successes++;
-        Thread.yield();
-      }
+      expectedMap.put("field" + i, "value" + i);
     }
-    return successes;
-  }
 
-  @Test
-  public void testConcurrentHSet_differentKeyPerClient() throws InterruptedException {
-    String key1 = "HSET" + randString();
-    String key2 = "HSET" + randString();
-    Map<String, String> record1 = new HashMap<String, String>();
-    Map<String, String> record2 = new HashMap<String, String>();
-
-    Runnable runnable1 = () -> doABunchOfHSets(key1, record1, jedis);
-    Runnable runnable2 = () -> doABunchOfHSets(key2, record2, jedis2);
-    Thread thread1 = new Thread(runnable1);
-    Thread thread2 = new Thread(runnable2);
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> jedis.hset(key1, "field" + i, "value" + i),
+        (i) -> jedis2.hset(key2, "field" + i, "value" + i))
+            .run();
 
-    Map<String, String> result = jedis.hgetAll(key1);
-    assertEquals(record1.size(), result.size());
-    assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
-    assertThat(result.values().containsAll(record1.values())).isTrue();
-
-    Map<String, String> result2 = jedis.hgetAll(key2);
-    assertEquals(record2.size(), result2.size());
-    assertThat(result2.keySet().containsAll(record2.keySet())).isTrue();
-    assertThat(result2.values().containsAll(record2.values())).isTrue();
+    assertThat(jedis.hgetAll(key1)).isEqualTo(expectedMap);
+    assertThat(jedis.hgetAll(key2)).isEqualTo(expectedMap);
   }
 
   @Test
   public void testConcurrentHSet_sameKeyPerClient() throws InterruptedException {
-    String key1 = "HSET" + randString();
-    Map<String, String> record1 = new HashMap<String, String>();
-    Map<String, String> record2 = new HashMap<String, String>();
-
-    Runnable runnable1 = () -> doABunchOfHSets(key1, record1, jedis);
-    Runnable runnable2 = () -> doABunchOfHSets(key1, record2, jedis2);
-    Thread thread1 = new Thread(runnable1);
-    Thread thread2 = new Thread(runnable2);
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
+    String key1 = "HSET1";
 
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> jedis.hset(key1, "fieldA" + i, "value" + i),
+        (i) -> jedis2.hset(key1, "fieldB" + i, "value" + i))
+            .run();
     Map<String, String> result = jedis.hgetAll(key1);
-    assertEquals(record1.size() + record2.size(), result.size());
-    assertThat(result.keySet().containsAll(record1.keySet())).isTrue();
-    assertThat(result.keySet().containsAll(record2.keySet())).isTrue();
-    assertThat(result.values().containsAll(record1.values())).isTrue();
-    assertThat(result.values().containsAll(record2.values())).isTrue();
+
+    assertThat(result).hasSize(ITERATION_COUNT * 2);
   }
 
   @Test
   public void testConcurrentHIncr_sameKeyPerClient() throws InterruptedException {
-    String key1 = "HSET" + randString();
-    String field = "FIELD" + randString();
+    String key = "KEY";
+    String field = "FIELD";
 
+    jedis.hset(key, field, "0");
 
-    jedis.hset(key1, field, "0");
-
-    Runnable runnable1 = () -> doABunchOfHIncrs(key1, field, ITERATION_COUNT / 2, jedis);
-    Runnable runnable2 = () -> doABunchOfHIncrs(key1, field, ITERATION_COUNT / 2, jedis2);
-    Thread thread1 = new Thread(runnable1);
-    Thread thread2 = new Thread(runnable2);
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
-
-    String value = jedis.hget(key1, field);
-    assertThat(value).isEqualTo(Integer.toString(ITERATION_COUNT));
-  }
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> jedis.hincrBy(key, field, 1),
+        (i) -> jedis2.hincrBy(key, field, 1))
+            .run();
 
-  private void doABunchOfHIncrs(String key, String field, int incrCount, Jedis jedis) {
-    for (int i = 0; i < incrCount; i++) {
-      jedis.hincrBy(key, field, 1);
-    }
+    String value = jedis.hget(key, field);
+    assertThat(value).isEqualTo(Integer.toString(ITERATION_COUNT * 2));
   }
 
   @Test
@@ -646,28 +576,14 @@ public class HashesIntegrationTest {
     String key = "HSET" + randString();
     String field = "FIELD" + randString();
 
-
     jedis.hset(key, field, "0");
 
-    Runnable runnable1 = () -> doABunchOfHIncrByFloats(key, field, ITERATION_COUNT / 2, 1.0, jedis);
-    Runnable runnable2 =
-        () -> doABunchOfHIncrByFloats(key, field, ITERATION_COUNT / 2, 0.5, jedis2);
-    Thread thread1 = new Thread(runnable1);
-    Thread thread2 = new Thread(runnable2);
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> jedis.hincrByFloat(key, field, 0.5),
+        (i) -> jedis.hincrByFloat(key, field, 1.0)).run();
 
     String value = jedis.hget(key, field);
-    assertThat(value).isEqualTo(String.format("%.0f", ITERATION_COUNT * 0.75));
-  }
-
-  private void doABunchOfHIncrByFloats(String key, String field, int incrCount, double incrValue,
-      Jedis jedis) {
-    for (int i = 0; i < incrCount; i++) {
-      jedis.hincrByFloat(key, field, incrValue);
-    }
+    assertThat(value).isEqualTo(String.format("%.0f", ITERATION_COUNT * 1.5));
   }
 
   @Test
@@ -681,79 +597,52 @@ public class HashesIntegrationTest {
 
   @Test
   public void testConcurrentHSetHDel_sameKeyPerClient() throws InterruptedException {
-    String key1 = "HSET" + randString();
+    String key = "HSET1";
 
     ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(ITERATION_COUNT);
 
-    Runnable runnable1 = () -> doABunchOfHSetsWithBlockingQueue(key1, blockingQueue, jedis);
-    Runnable runnable2 = () -> doABunchOfHDelsWithBlockingQueue(key1, blockingQueue, jedis2);
-    Thread thread1 = new Thread(runnable1);
-    Thread thread2 = new Thread(runnable2);
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> {
+          jedis.hset(key, "field" + i, "value" + i);
+          blockingQueue.add("field" + i);
+        },
+        (i) -> {
+          try {
+            String fieldToDelete = blockingQueue.take();
+            jedis.hdel(key, fieldToDelete);
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        })
+            .run();
+
+    Map<String, String> result = jedis.hgetAll(key);
 
-    Map<String, String> result = jedis.hgetAll(key1);
     assertThat(result).isEmpty();
   }
 
   @Test
   public void testConcurrentHGetAll() throws InterruptedException, ExecutionException {
-    String key1 = "HSET" + randString();
+    String key = "HSET1";
     HashMap<String, String> record = new HashMap<>();
 
-    doABunchOfHSets(key1, record, jedis);
-
-    ExecutorService pool = Executors.newFixedThreadPool(2);
-    Callable<Integer> callable1 = () -> doABunchOfHGetAlls(key1, jedis);
-    Callable<Integer> callable2 = () -> doABunchOfHGetAlls(key1, jedis2);
-    Future<Integer> future1 = pool.submit(callable1);
-    Future<Integer> future2 = pool.submit(callable2);
-
-    assertThat(future1.get()).isEqualTo(ITERATION_COUNT);
-    assertThat(future2.get()).isEqualTo(ITERATION_COUNT);
+    doABunchOfHSets(key, record, jedis);
 
-    pool.shutdown();
-  }
-
-  private int doABunchOfHGetAlls(String key, Jedis jedis) {
-    int returnedCount = 0;
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      if (jedis.hgetAll(key).size() == ITERATION_COUNT) {
-        returnedCount++;
-      }
-    }
-    return returnedCount;
-  }
-
-  private void doABunchOfHDelsWithBlockingQueue(String key,
-      ArrayBlockingQueue<String> blockingQueue,
-      Jedis jedis) {
-    String field;
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      try {
-        field = blockingQueue.take();
-      } catch (InterruptedException e) {
-        throw new RuntimeException("HDel thread was interrupted unexpectedly", e);
-      }
-      jedis.hdel(key, field);
-    }
-  }
-
-  private void doABunchOfHSetsWithBlockingQueue(String key,
-      ArrayBlockingQueue<String> blockingQueue,
-      Jedis jedis) {
-    String field;
-    String fieldValue;
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      field = randString();
-      fieldValue = randString();
-
-      jedis.hset(key, field, fieldValue);
+    AtomicLong successCount = new AtomicLong();
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> {
+          if (jedis.hgetAll(key).size() == ITERATION_COUNT) {
+            successCount.incrementAndGet();
+          }
+        },
+        (i) -> {
+          if (jedis.hgetAll(key).size() == ITERATION_COUNT) {
+            successCount.incrementAndGet();
+          }
+        })
+            .run();
 
-      blockingQueue.add(field);
-    }
+    assertThat(successCount.get()).isEqualTo(ITERATION_COUNT * 2);
   }
 
   private void doABunchOfHSets(String key, Map<String, String> record, Jedis jedis) {
@@ -769,21 +658,6 @@ public class HashesIntegrationTest {
     }
   }
 
-  private void doABunchOfHMSets(String key, Map<String, String> record, Jedis jedis) {
-    String field;
-    String fieldValue;
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      field = randString();
-      fieldValue = randString();
-      Map<String, String> hmsetMap = new HashMap<>();
-      hmsetMap.put(field, fieldValue);
-
-      record.put(field, fieldValue);
-
-      jedis.hmset(key, hmsetMap);
-    }
-  }
-
   private String randString() {
     int length = rand.nextInt(8) + 5;
     return RandomStringUtils.randomAlphanumeric(length);
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java
index 9f38251..48a461a 100755
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/StringsIntegrationTest.java
@@ -34,6 +34,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -48,6 +49,7 @@ import redis.clients.jedis.params.SetParams;
 import org.apache.geode.cache.CacheFactory;
 import org.apache.geode.cache.GemFireCache;
 import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.redis.general.ConcurrentLoopingThreads;
 import org.apache.geode.redis.internal.RedisConstants;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.junit.categories.RedisTest;
@@ -787,46 +789,27 @@ public class StringsIntegrationTest {
   }
 
   @Test
-  public void testConcurrentDel_differentClients()
-      throws InterruptedException, ExecutionException {
+  public void testConcurrentDel_differentClients() {
     String keyBaseName = "DELBASE";
 
-    doABunchOfSets(keyBaseName, jedis);
+    new ConcurrentLoopingThreads(
+        ITERATION_COUNT,
+        (i) -> jedis.set(keyBaseName + i, "value" + i))
+            .run();
 
-    CountDownLatch latch = new CountDownLatch(1);
-    ExecutorService pool = Executors.newFixedThreadPool(2);
-    Callable<Integer> callable1 = () -> doABunchOfDels(keyBaseName, 0, jedis, latch);
-    Callable<Integer> callable2 = () -> doABunchOfDels(keyBaseName, 1, jedis2, latch);
-    Future<Integer> future1 = pool.submit(callable1);
-    Future<Integer> future2 = pool.submit(callable2);
+    AtomicLong deletedCount = new AtomicLong();
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> deletedCount.addAndGet(jedis.del(keyBaseName + i)),
+        (i) -> deletedCount.addAndGet(jedis2.del(keyBaseName + i)))
+            .run();
 
-    latch.countDown();
 
-    assertThat(future1.get() + future2.get()).isEqualTo(ITERATION_COUNT);
+    assertThat(deletedCount.get()).isEqualTo(ITERATION_COUNT);
 
     for (int i = 0; i < ITERATION_COUNT; i++) {
       assertThat(jedis.get(keyBaseName + i)).isNull();
     }
 
-    pool.shutdown();
-  }
-
-  private void doABunchOfSets(String keyBaseName, Jedis jedis) {
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      jedis.set(keyBaseName + i, "value" + i);
-    }
-  }
-
-  private int doABunchOfDels(String keyBaseName, int start, Jedis jedis, CountDownLatch latch)
-      throws InterruptedException {
-    int delCount = 0;
-    latch.await();
-
-    for (int i = start; i < ITERATION_COUNT; i += 2) {
-      delCount += jedis.del(keyBaseName + i);
-      Thread.yield();
-    }
-    return delCount;
   }
 
   @Test
@@ -870,29 +853,15 @@ public class StringsIntegrationTest {
   public void testDecr_shouldBeAtomic() throws ExecutionException, InterruptedException {
     jedis.set("contestedKey", "0");
 
-    CountDownLatch latch = new CountDownLatch(1);
-    ExecutorService pool = Executors.newFixedThreadPool(2);
-    Callable<Integer> callable1 = () -> doABunchOfDecrs(jedis, latch);
-    Callable<Integer> callable2 = () -> doABunchOfDecrs(jedis2, latch);
-    Future<Integer> future1 = pool.submit(callable1);
-    Future<Integer> future2 = pool.submit(callable2);
-
-    latch.countDown();
-
-    future1.get();
-    future2.get();
+    new ConcurrentLoopingThreads(
+        ITERATION_COUNT,
+        (i) -> jedis.decr("contestedKey"),
+        (i) -> jedis2.decr("contestedKey"))
+            .run();
 
     assertThat(jedis.get("contestedKey")).isEqualTo(Integer.toString(-2 * ITERATION_COUNT));
   }
 
-  private Integer doABunchOfDecrs(Jedis jedis, CountDownLatch latch) throws InterruptedException {
-    latch.await();
-    for (int i = 0; i < ITERATION_COUNT; i++) {
-      jedis.decr("contestedKey");
-    }
-    return ITERATION_COUNT;
-  }
-
   @Test
   public void testIncr() {
     String oneHundredKey = randString();
@@ -945,17 +914,12 @@ public class StringsIntegrationTest {
   public void testIncr_shouldBeAtomic() throws ExecutionException, InterruptedException {
     jedis.set("contestedKey", "0");
 
-    CountDownLatch latch = new CountDownLatch(1);
-    ExecutorService pool = Executors.newFixedThreadPool(2);
-    Callable<Integer> callable1 = () -> doABunchOfIncrs(jedis, latch);
-    Callable<Integer> callable2 = () -> doABunchOfIncrs(jedis2, latch);
-    Future<Integer> future1 = pool.submit(callable1);
-    Future<Integer> future2 = pool.submit(callable2);
-
-    latch.countDown();
+    new ConcurrentLoopingThreads(
+        ITERATION_COUNT,
+        (i) -> jedis.incr("contestedKey"),
+        (i) -> jedis2.incr("contestedKey"))
+            .run();
 
-    future1.get();
-    future2.get();
 
     assertThat(jedis.get("contestedKey")).isEqualTo(Integer.toString(2 * ITERATION_COUNT));
   }
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ConcurrentLoopingThreads.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ConcurrentLoopingThreads.java
new file mode 100644
index 0000000..b609d79
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ConcurrentLoopingThreads.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.general;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
+
+public class ConcurrentLoopingThreads {
+  private final int iterationCount;
+  private final Consumer<Integer>[] functions;
+
+  @SafeVarargs
+  public ConcurrentLoopingThreads(int iterationCount,
+      Consumer<Integer>... functions) {
+    this.iterationCount = iterationCount;
+    this.functions = functions;
+  }
+
+  public void run() {
+    CountDownLatch latch = new CountDownLatch(1);
+    Stream<LoopingThread> loopingThreadStream = Arrays
+        .stream(functions)
+        .map((r) -> new LoopingThread(r, iterationCount, latch))
+        .map((t) -> {
+          t.start();
+          return t;
+        });
+
+    latch.countDown();
+
+    loopingThreadStream.forEach(loopingThread -> {
+      try {
+        loopingThread.join();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    });
+
+  }
+
+  private class LoopingRunnable implements Runnable {
+    private final Consumer<Integer> runnable;
+    private final int iterationCount;
+    private CountDownLatch startLatch;
+
+    public LoopingRunnable(Consumer<Integer> runnable, int iterationCount,
+        CountDownLatch startLatch) {
+      this.runnable = runnable;
+      this.iterationCount = iterationCount;
+      this.startLatch = startLatch;
+    }
+
+    @Override
+    public void run() {
+      try {
+        startLatch.await();
+      } catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      for (int i = 0; i < iterationCount; i++) {
+        runnable.accept(i);
+        Thread.yield();
+      }
+    }
+  }
+
+  private class LoopingThread extends Thread {
+    public LoopingThread(Consumer<Integer> runnable, int iterationCount,
+        CountDownLatch latch) {
+      super(new LoopingRunnable(runnable, iterationCount, latch));
+    }
+  }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java
index 47660e1..abd1c37 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/general/ExistsIntegrationTest.java
@@ -21,7 +21,6 @@ import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
 
 import org.junit.After;
 import org.junit.AfterClass;
@@ -268,75 +267,36 @@ public class ExistsIntegrationTest {
   }
 
   @Test
-  public void shouldCorrectlyVerifyKeysExistConcurrently() throws InterruptedException {
+  public void shouldCorrectlyVerifyKeysExistConcurrently() {
     int iterationCount = 5000;
-    setKeys(jedis, iterationCount);
 
-    AtomicLong existsCount = new AtomicLong(0);
-
-    Thread thread1 =
-        new LoopingThread((i) -> existsCount.addAndGet(jedis.exists(toArray("key" + i))),
-            iterationCount);
-    Thread thread2 =
-        new LoopingThread((i) -> existsCount.addAndGet(jedis2.exists(toArray("key" + i))),
-            iterationCount);
+    new ConcurrentLoopingThreads(iterationCount, (i) -> jedis.set("key" + i, "value" + i)).run();
 
-    thread1.start();
-    thread2.start();
-    thread1.join();
-    thread2.join();
+    AtomicLong existsCount = new AtomicLong(0);
+    new ConcurrentLoopingThreads(
+        iterationCount,
+        (i) -> existsCount.addAndGet(jedis.exists(toArray("key" + i))),
+        (i) -> existsCount.addAndGet(jedis2.exists(toArray("key" + i))))
+            .run();
 
     assertThat(existsCount.get()).isEqualTo(2 * iterationCount);
   }
 
   @Test
-  public void shouldNotThrowExceptionsWhenConcurrentlyCreatingCheckingAndDeletingKeys()
-      throws InterruptedException {
-    int iterationCount = 5000;
-
-    Thread loopingThread1 = new LoopingThread((i) -> jedis.set("key", "value"), iterationCount);
-    Thread loopingThread2 = new LoopingThread((i) -> jedis2.exists(toArray("key")), iterationCount);
-    Thread loopingThread3 = new LoopingThread((i) -> jedis3.del("key"), iterationCount);
+  public void shouldNotThrowExceptionsWhenConcurrentlyCreatingCheckingAndDeletingKeys() {
 
-    loopingThread1.start();
-    loopingThread2.start();
-    loopingThread3.start();
-    loopingThread1.join();
-    loopingThread2.join();
-    loopingThread3.join();
+    int iterationCount = 5000;
+    new ConcurrentLoopingThreads(
+        iterationCount,
+        (i) -> jedis.set("key", "value"),
+        (i) -> jedis2.exists(toArray("key")),
+        (i) -> jedis3.del("key"))
+            .run();
   }
 
   public String[] toArray(String... strings) {
     return strings;
   }
 
-  private void setKeys(Jedis jedis, int iterationCount) {
-    for (int i = 0; i < iterationCount; i++) {
-      jedis.set("key" + i, "value" + i);
-    }
-  }
-
-  private class LoopingThread extends Thread {
 
-    public LoopingThread(Function<Integer, Object> runnable, int iterationCount) {
-      super(new LoopingRunnable(runnable, iterationCount));
-    }
-  }
-
-  private class LoopingRunnable implements Runnable {
-    private final Function<Integer, Object> runnable;
-    private final int iterationCount;
-
-    public LoopingRunnable(Function<Integer, Object> runnable, int iterationCount) {
-      this.runnable = runnable;
-      this.iterationCount = iterationCount;
-    }
-
-    @Override
-    public void run() {
-      for (int i = 0; i < iterationCount; i++) {
-        runnable.apply(i);
-      }
-    }
-  }
 }