You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@geode.apache.org by GitBox <gi...@apache.org> on 2021/02/09 19:06:30 UTC

[GitHub] [geode] nonbinaryprogrammer commented on a change in pull request #5954: GEODE-8864:finish implementation of Redis HScan Command

nonbinaryprogrammer commented on a change in pull request #5954:
URL: https://github.com/apache/geode/pull/5954#discussion_r572468638



##########
File path: geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
##########
@@ -106,9 +107,10 @@ public int hstrlen(ByteArrayWrapper key, ByteArrayWrapper field) {
   }
 
   @Override
-  public Pair<BigInteger, List<Object>> hscan(ByteArrayWrapper key, Pattern matchPattern, int count,
+  public Pair<BigInteger, List<Object>> hscan(UUID clientID, ByteArrayWrapper key,
+      Pattern matchPattern, int count,
       BigInteger cursor) {
-    return invokeCommandFunction(key, HSCAN, matchPattern, count, cursor);
+    return invokeCommandFunction(key, HSCAN, matchPattern, count, cursor, clientID);

Review comment:
       might be nice to keep this consistent by putting clientID as the first parameter

##########
File path: geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanResult;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  private static Jedis jedis1;
+  private static Jedis jedis2;
+  private static Jedis jedis3;
+
+  private static Properties locatorProperties;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  private static int redisServerPort1;
+  private static int redisServerPort2;
+  private static int redisServerPort3;
+
+  private final int SIZE_OF_INITIAL_HASH_DATA = 1000;
+  private final Map<String, String> INITIAL_HASH_DATA = makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+  final String HASH_KEY = "key";
+  final String BASE_FIELD = "baseField_";
+
+  @BeforeClass
+  public static void classSetup() {
+    locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    int locatorPort = locator.getPort();
+
+    server1 = clusterStartUp
+        .startRedisVM(1,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server2 = clusterStartUp
+        .startRedisVM(2,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server3 = clusterStartUp
+        .startRedisVM(3,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    redisServerPort1 = clusterStartUp.getRedisPort(1);
+    redisServerPort2 = clusterStartUp.getRedisPort(2);
+    redisServerPort3 = clusterStartUp.getRedisPort(3);
+
+    jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+    jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+    jedis3 = new Jedis(LOCAL_HOST, redisServerPort3, JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void testSetup() {
+    jedis1.flushAll();
+    jedis2.flushAll();
+    jedis3.flushAll();
+
+    jedis1.hset(HASH_KEY, INITIAL_HASH_DATA);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis1.disconnect();
+    jedis2.disconnect();
+    jedis3.disconnect();
+
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void should_stillGetHScan_givenServerCrashThreadsDoingHScans() {
+    final int ITERATION_COUNT = 500;
+
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis1),
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis2),
+        (i) -> {
+          int fieldSuffix =
+              i < SIZE_OF_INITIAL_HASH_DATA ? i : (ITERATION_COUNT % SIZE_OF_INITIAL_HASH_DATA);
+
+          jedis3.hset(HASH_KEY, BASE_FIELD + fieldSuffix, "new_value_" + i);
+        }).run();
+  }
+
+  @Test
+  public void should_notLoseDataForConsistentlyPresentFields_givenConcurrentThreadsAddingAndRemovingFields() {
+    final int ITERATION_COUNT = 500;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis1),
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis2),
+        (i) -> {
+          String field = "new_" + BASE_FIELD + i;
+          jedis3.hset(HASH_KEY, field, "whatever");
+          jedis3.hdel(HASH_KEY, field);
+        }).run();

Review comment:
       might be good to assert that the new fields have all been deleted and all the original values remain

##########
File path: geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanResult;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  private static Jedis jedis1;
+  private static Jedis jedis2;
+  private static Jedis jedis3;
+
+  private static Properties locatorProperties;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  private static int redisServerPort1;
+  private static int redisServerPort2;
+  private static int redisServerPort3;
+
+  private final int SIZE_OF_INITIAL_HASH_DATA = 1000;
+  private final Map<String, String> INITIAL_HASH_DATA = makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+  final String HASH_KEY = "key";
+  final String BASE_FIELD = "baseField_";
+
+  @BeforeClass
+  public static void classSetup() {
+    locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    int locatorPort = locator.getPort();
+
+    server1 = clusterStartUp
+        .startRedisVM(1,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server2 = clusterStartUp
+        .startRedisVM(2,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server3 = clusterStartUp
+        .startRedisVM(3,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    redisServerPort1 = clusterStartUp.getRedisPort(1);
+    redisServerPort2 = clusterStartUp.getRedisPort(2);
+    redisServerPort3 = clusterStartUp.getRedisPort(3);
+
+    jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+    jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+    jedis3 = new Jedis(LOCAL_HOST, redisServerPort3, JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void testSetup() {
+    jedis1.flushAll();
+    jedis2.flushAll();
+    jedis3.flushAll();
+
+    jedis1.hset(HASH_KEY, INITIAL_HASH_DATA);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis1.disconnect();
+    jedis2.disconnect();
+    jedis3.disconnect();
+
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void should_stillGetHScan_givenServerCrashThreadsDoingHScans() {

Review comment:
       this test name doesn't quite make sense

##########
File path: geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
##########
@@ -26,8 +27,15 @@
 public class Client {
   private Channel channel;
 
+  public UUID getId() {
+    return id;
+  }

Review comment:
       can you move this getter after the overridden methods?

##########
File path: geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
##########
@@ -161,4 +163,79 @@ public void setExpirationTimestamp_stores_delta_that_is_stable() throws IOExcept
     o2.fromDelta(in);
     assertThat(o2).isEqualTo(o1);
   }
+
+  @Test
+  public void hscanSnaphots_shouldBeEmpty_givenHscanHasNotBeenCalled() {
+    RedisHash subject = createRedisHash(100);
+    assertThat(subject.getHscanSnapShots()).isEmpty();
+  }
+
+  @Test
+  public void hscanSnaphots_shouldContainSnapshot_givenHscanHasBeenCalled() {
+
+    RedisHash subject = createRedisHash(100);
+
+    UUID clientID = UUID.randomUUID();
+
+    subject.hscan(clientID, null, 10, BigInteger.valueOf(0));

Review comment:
       might be good to add a test case for starting the scan at a non-zero index

##########
File path: geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
##########
@@ -161,4 +163,79 @@ public void setExpirationTimestamp_stores_delta_that_is_stable() throws IOExcept
     o2.fromDelta(in);
     assertThat(o2).isEqualTo(o1);
   }
+
+  @Test
+  public void hscanSnaphots_shouldBeEmpty_givenHscanHasNotBeenCalled() {
+    RedisHash subject = createRedisHash(100);
+    assertThat(subject.getHscanSnapShots()).isEmpty();
+  }
+
+  @Test
+  public void hscanSnaphots_shouldContainSnapshot_givenHscanHasBeenCalled() {
+
+    RedisHash subject = createRedisHash(100);
+
+    UUID clientID = UUID.randomUUID();
+
+    subject.hscan(clientID, null, 10, BigInteger.valueOf(0));
+
+    HashMap<UUID, List<ByteArrayWrapper>> hscanSnapShotMap = subject.getHscanSnapShots();
+
+    assertThat(hscanSnapShotMap.containsKey(clientID)).isTrue();
+
+    List<ByteArrayWrapper> keyList = hscanSnapShotMap.get(clientID);
+    assertThat(keyList).isNotEmpty();
+
+    assertThat(hscanSnapShotMap.get(clientID).contains("field_1"));
+    assertThat(hscanSnapShotMap.get(clientID).contains("value_1"));
+    assertThat(hscanSnapShotMap.get(clientID).contains("field_8"));
+    assertThat(hscanSnapShotMap.get(clientID).contains("value_8"));

Review comment:
       why not iterate through all the fields to verify the data?

##########
File path: geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanResult;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  private static Jedis jedis1;
+  private static Jedis jedis2;
+  private static Jedis jedis3;
+
+  private static Properties locatorProperties;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  private static int redisServerPort1;
+  private static int redisServerPort2;
+  private static int redisServerPort3;
+
+  private final int SIZE_OF_INITIAL_HASH_DATA = 1000;
+  private final Map<String, String> INITIAL_HASH_DATA = makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+  final String HASH_KEY = "key";
+  final String BASE_FIELD = "baseField_";
+
+  @BeforeClass
+  public static void classSetup() {
+    locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    int locatorPort = locator.getPort();
+
+    server1 = clusterStartUp
+        .startRedisVM(1,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server2 = clusterStartUp
+        .startRedisVM(2,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server3 = clusterStartUp
+        .startRedisVM(3,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    redisServerPort1 = clusterStartUp.getRedisPort(1);
+    redisServerPort2 = clusterStartUp.getRedisPort(2);
+    redisServerPort3 = clusterStartUp.getRedisPort(3);
+
+    jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+    jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+    jedis3 = new Jedis(LOCAL_HOST, redisServerPort3, JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void testSetup() {
+    jedis1.flushAll();
+    jedis2.flushAll();
+    jedis3.flushAll();
+
+    jedis1.hset(HASH_KEY, INITIAL_HASH_DATA);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis1.disconnect();
+    jedis2.disconnect();
+    jedis3.disconnect();
+
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void should_stillGetHScan_givenServerCrashThreadsDoingHScans() {
+    final int ITERATION_COUNT = 500;
+
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis1),
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis2),
+        (i) -> {
+          int fieldSuffix =
+              i < SIZE_OF_INITIAL_HASH_DATA ? i : (ITERATION_COUNT % SIZE_OF_INITIAL_HASH_DATA);
+
+          jedis3.hset(HASH_KEY, BASE_FIELD + fieldSuffix, "new_value_" + i);
+        }).run();
+  }
+
+  @Test
+  public void should_notLoseDataForConsistentlyPresentFields_givenConcurrentThreadsAddingAndRemovingFields() {
+    final int ITERATION_COUNT = 500;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis1),
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis2),
+        (i) -> {
+          String field = "new_" + BASE_FIELD + i;
+          jedis3.hset(HASH_KEY, field, "whatever");
+          jedis3.hdel(HASH_KEY, field);
+        }).run();
+  }
+
+
+  private void multipleHScanAndAssertOnContentOfResultSet(Jedis jedis) {

Review comment:
       this does not appear to be doing multiple hscans

##########
File path: geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -174,13 +211,62 @@ public void givenNonexistentKey_returnsEmptyArray() {
     assertThat(result.getResult()).isEmpty();
   }
 
+  @Test
+  public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
+    Map<String, String> entryMap = new HashMap<>();
+    entryMap.put("1", "yellow");
+    entryMap.put("2", "green");
+    entryMap.put("3", "orange");
+    jedis.hmset("colors", entryMap);
+
+    String cursor = "-100";
+    ScanResult<Map.Entry<String, String>> result;
+    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+
+    do {
+      result = jedis.hscan("colors", cursor);
+      allEntries.addAll(result.getResult());
+      cursor = result.getCursor();
+    } while (!result.isCompleteIteration());
+
+    assertThat(allEntries).hasSize(3);
+    assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+  }
+
+  @Test
+  public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
+    assertThatThrownBy(() -> jedis.hscan("a", "18446744073709551616"))
+        .hasMessageContaining(ERROR_CURSOR);
+  }
+
+  @Test
+  public void givenNegativeCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {

Review comment:
       consider combining this and the above test into something like "givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError"

##########
File path: geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -336,48 +453,120 @@ public void givenMultipleCountsAndMatches_returnsEntriesMatchingLastMatchParamet
   }
 
   @Test
-  public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
-    Map<String, String> entryMap = new HashMap<>();
-    entryMap.put("1", "yellow");
-    entryMap.put("2", "green");
-    entryMap.put("3", "orange");
-    jedis.hmset("colors", entryMap);
+  public void should_notReturnValue_givenValueWasRemovedBeforeHSCANISCalled() {
 
-    String cursor = "-100";
-    ScanResult<Map.Entry<String, String>> result;
-    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    Map<String, String> originalData = new HashMap<>();
+    originalData.put("field_1", "yellow");
+    originalData.put("field_2", "green");
+    originalData.put("field_3", "grey");
+    jedis.hmset("colors", originalData);
 
-    do {
-      result = jedis.hscan("colors", cursor);
-      allEntries.addAll(result.getResult());
-      cursor = result.getCursor();
-    } while (!result.isCompleteIteration());
+    jedis.hdel("colors", "field_3");
+    originalData.remove("field_3");
 
-    assertThat(allEntries).hasSize(3);
-    assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+    GeodeAwaitility.await().untilAsserted(
+        () -> assertThat(jedis.hget("colors", "field_3")).isNull());
+
+    ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "0");
+
+    assertThat(new HashSet<>(result.getResult()))
+        .containsExactlyInAnyOrderElementsOf(originalData.entrySet());
   }
 
+
+  /**** Concurrency ***/
+
+  private final int SIZE_OF_INITIAL_HASH_DATA = 100;
+  private Map<String, String> INITIAL_HASH_DATA = makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+  final String HASH_KEY = "key";
+  final String BASE_FIELD = "baseField_";
+
   @Test
-  public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.hscan("a", "18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  public void should_notLoseFields_givenConcurrentThreadsDoingHScansAndChangingValues() {
+    jedis.hset(HASH_KEY, INITIAL_HASH_DATA);
+    final int ITERATION_COUNT = 500;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis),
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis2),
+        (i) -> {
+          int fieldSuffix = i % SIZE_OF_INITIAL_HASH_DATA;
+          jedis3.hset(HASH_KEY, BASE_FIELD + fieldSuffix, "new_value_" + i);
+        }).run();

Review comment:
       might want to validate the data at the end of this test as well

##########
File path: geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -237,15 +326,42 @@ public void givenMultipleCounts_returnsAllEntriesWithoutDuplicates() {
     String cursor = "0";
 
     do {
-      result = (List<Object>) jedis.sendCommand(Protocol.Command.HSCAN, "colors", cursor, "COUNT",
-          "2", "COUNT", "1");
+      result = (List<Object>) jedis.sendCommand(Protocol.Command.HSCAN,
+          "colors",
+          cursor,
+          "COUNT", "2",
+          "COUNT", "1");
+
       allEntries.addAll((List<byte[]>) result.get(1));
       cursor = new String((byte[]) result.get(0));
     } while (!Arrays.equals((byte[]) result.get(0), "0".getBytes()));
 
     assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
-    assertThat(allEntries).containsExactlyInAnyOrder("1".getBytes(), "yellow".getBytes(),
-        "12".getBytes(), "green".getBytes(), "3".getBytes(), "grey".getBytes());
+
+    assertThat(allEntries.size()).isCloseTo(3, Offset.offset(1));
+  }
+
+  @Test
+  public void givenCompleteIteration_shouldReturnCursorWithValueOfZero() {
+    Map<String, String> entryMap = new HashMap<>();
+    entryMap.put("1", "yellow");
+    entryMap.put("2", "green");
+    entryMap.put("3", "orange");
+    jedis.hmset("colors", entryMap);
+
+    ScanParams scanParams = new ScanParams();
+    scanParams.count(1);
+    ScanResult<Map.Entry<String, String>> result;
+    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    String cursor = "0";
+
+    do {
+      result = jedis.hscan("colors", cursor, scanParams);
+      allEntries.addAll(result.getResult());
+      cursor = result.getCursor();
+    } while (!result.isCompleteIteration());
+
+    assertThat(result.getCursor()).isEqualTo("0");

Review comment:
       might be good to also check that all of the values are correct to verify that data wasn't messed with by the hscan

##########
File path: geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
##########
@@ -203,49 +261,142 @@ public int hstrlen(ByteArrayWrapper field) {
     return new ArrayList<>(hash.keySet());
   }
 
-  public Pair<BigInteger, List<Object>> hscan(Pattern matchPattern, int count, BigInteger cursor) {
-    List<Object> returnList = new ArrayList<Object>();
-    int size = hash.size();
-    BigInteger beforeCursor = new BigInteger("0");
-    int numElements = 0;
-    int i = -1;
-    for (Map.Entry<ByteArrayWrapper, ByteArrayWrapper> entry : hash.entrySet()) {
+  public Pair<BigInteger, List<Object>> hscan(UUID clientID, Pattern matchPattern,
+      int count,
+      BigInteger cursorParameter) {
+
+    int startCursor = cursorParameter.intValue();
+
+    System.out.println("hscan receiving startCursor: " + startCursor);
+
+    List<ByteArrayWrapper> keysForScan = getSnapShotOfKeySet(clientID);
+
+    keysForScan = keysForScan.subList(startCursor, keysForScan.size());
+    Map<ByteArrayWrapper, ByteArrayWrapper> dataForKeySnapShot = getDataForKeys(keysForScan);
+
+
+    List<Object> resultList =
+        getResultList(matchPattern, count, dataForKeySnapShot);
+
+    int indexOfLast = resultList.size() - 1;
+    int numberOfIterationsCompleted = Integer.parseInt(resultList.get(indexOfLast).toString());
+    resultList.remove(indexOfLast);
+
+
+    BigInteger returnCursorValue =
+        getCursorValueToReturn(count, startCursor, numberOfIterationsCompleted);
+
+    if (returnCursorValue.intValue() == 0) {
+      removeHSCANSnapshot(clientID);
+    }
+
+    System.out.println("hscan returning Cursor: " + returnCursorValue);
+
+    // returnCursorValue = new BigInteger(String.valueOf(0));
+
+    return new ImmutablePair<>(returnCursorValue, resultList);
+  }
+
+  private void removeHSCANSnapshot(UUID clientID) {
+    this.hScanSnapShots.remove(clientID);
+    this.hScanSnapShotCreationTimes.remove(clientID);
+
+    if (this.hScanSnapShots.isEmpty()) {
+      shutDownHscanSnapshotScheduledRemoval();
+    }
+  }
+
+  private List<Object> getResultList(Pattern matchPattern, int count,
+      Map<ByteArrayWrapper, ByteArrayWrapper> dataForKeySnapShot) {
+
+    int currentCursor = 0;
+    List<Object> resultList = new ArrayList<>();
+
+    for (Map.Entry<ByteArrayWrapper, ByteArrayWrapper> entry : dataForKeySnapShot.entrySet()) {
+
+      if (count == currentCursor) {
+        break;
+      }
+
+      currentCursor++;
+
       ByteArrayWrapper key = entry.getKey();
       ByteArrayWrapper value = entry.getValue();
-      i++;
-      if (beforeCursor.compareTo(cursor) < 0) {
-        beforeCursor = beforeCursor.add(new BigInteger("1"));
-        continue;
-      }
 
       if (matchPattern != null) {
-        if (matchPattern.matcher(key.toString()).matches()) {
-          returnList.add(key);
-          returnList.add(value);
-          numElements++;
-        }
+        addMatches(matchPattern, resultList, key, value);

Review comment:
       I think this would be worth inlining because it's only three lines 

##########
File path: geode-redis/src/commonTest/java/org/apache/geode/redis/ConcurrentLoopingThreads.java
##########
@@ -63,13 +65,21 @@ private ConcurrentLoopingThreads start(boolean lockstep) {
    * operations.
    */
   public void await() {
-    loopingFutures.forEach(loopingThread -> {
-      try {
-        loopingThread.get();
-      } catch (InterruptedException | ExecutionException e) {
-        throw new RuntimeException(e);
+    boolean retry = false;

Review comment:
       this doesn't need to be set to false here. it will always get set on line 71. also maybe rename this to something like `timedOut`

##########
File path: geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -336,48 +453,120 @@ public void givenMultipleCountsAndMatches_returnsEntriesMatchingLastMatchParamet
   }
 
   @Test
-  public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
-    Map<String, String> entryMap = new HashMap<>();
-    entryMap.put("1", "yellow");
-    entryMap.put("2", "green");
-    entryMap.put("3", "orange");
-    jedis.hmset("colors", entryMap);
+  public void should_notReturnValue_givenValueWasRemovedBeforeHSCANISCalled() {
 
-    String cursor = "-100";
-    ScanResult<Map.Entry<String, String>> result;
-    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    Map<String, String> originalData = new HashMap<>();

Review comment:
       can you please rename this to either "data" or "expectedData" since you remove an element of it when deleting field_3

##########
File path: geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
##########
@@ -336,48 +453,120 @@ public void givenMultipleCountsAndMatches_returnsEntriesMatchingLastMatchParamet
   }
 
   @Test
-  public void givenNegativeCursor_returnsEntriesUsingAbsoluteValueOfCursor() {
-    Map<String, String> entryMap = new HashMap<>();
-    entryMap.put("1", "yellow");
-    entryMap.put("2", "green");
-    entryMap.put("3", "orange");
-    jedis.hmset("colors", entryMap);
+  public void should_notReturnValue_givenValueWasRemovedBeforeHSCANISCalled() {
 
-    String cursor = "-100";
-    ScanResult<Map.Entry<String, String>> result;
-    List<Map.Entry<String, String>> allEntries = new ArrayList<>();
+    Map<String, String> originalData = new HashMap<>();
+    originalData.put("field_1", "yellow");
+    originalData.put("field_2", "green");
+    originalData.put("field_3", "grey");
+    jedis.hmset("colors", originalData);
 
-    do {
-      result = jedis.hscan("colors", cursor);
-      allEntries.addAll(result.getResult());
-      cursor = result.getCursor();
-    } while (!result.isCompleteIteration());
+    jedis.hdel("colors", "field_3");
+    originalData.remove("field_3");
 
-    assertThat(allEntries).hasSize(3);
-    assertThat(new HashSet<>(allEntries)).isEqualTo(entryMap.entrySet());
+    GeodeAwaitility.await().untilAsserted(
+        () -> assertThat(jedis.hget("colors", "field_3")).isNull());
+
+    ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "0");
+
+    assertThat(new HashSet<>(result.getResult()))
+        .containsExactlyInAnyOrderElementsOf(originalData.entrySet());
   }
 
+
+  /**** Concurrency ***/
+
+  private final int SIZE_OF_INITIAL_HASH_DATA = 100;
+  private Map<String, String> INITIAL_HASH_DATA = makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+  final String HASH_KEY = "key";
+  final String BASE_FIELD = "baseField_";
+
   @Test
-  public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.hscan("a", "18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  public void should_notLoseFields_givenConcurrentThreadsDoingHScansAndChangingValues() {
+    jedis.hset(HASH_KEY, INITIAL_HASH_DATA);
+    final int ITERATION_COUNT = 500;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis),
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis2),
+        (i) -> {
+          int fieldSuffix = i % SIZE_OF_INITIAL_HASH_DATA;
+          jedis3.hset(HASH_KEY, BASE_FIELD + fieldSuffix, "new_value_" + i);
+        }).run();
   }
 
+
+
   @Test
-  public void givenNegativeCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.hscan("a", "-18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  public void should_notLoseDataForConsistentlyPresentFields_givenConcurrentThreadsAddingAndRemovingFields() {
+
+    Logger logger = LogManager.getLogger("org.apache.geode.redis.internal");
+    Configurator.setAllLevels(logger.getName(), Level.getLevel("DEBUG"));
+    FastLogger.setDelegating(true);
+
+    jedis.hset(HASH_KEY, INITIAL_HASH_DATA);
+    final int ITERATION_COUNT = 5;
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis),
+        // (i) -> multipleHScanAndAssertOnContentOfResultSet(jedis2),
+        (i) -> {
+          String field = "new_" + BASE_FIELD + i;
+          jedis3.hset(HASH_KEY, field, "whatever");
+          jedis3.hdel(HASH_KEY, field);
+        }).run();

Review comment:
       you should also validate the data at the end of this test

##########
File path: geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
##########
@@ -203,49 +261,142 @@ public int hstrlen(ByteArrayWrapper field) {
     return new ArrayList<>(hash.keySet());
   }
 
-  public Pair<BigInteger, List<Object>> hscan(Pattern matchPattern, int count, BigInteger cursor) {
-    List<Object> returnList = new ArrayList<Object>();
-    int size = hash.size();
-    BigInteger beforeCursor = new BigInteger("0");
-    int numElements = 0;
-    int i = -1;
-    for (Map.Entry<ByteArrayWrapper, ByteArrayWrapper> entry : hash.entrySet()) {
+  public Pair<BigInteger, List<Object>> hscan(UUID clientID, Pattern matchPattern,
+      int count,
+      BigInteger cursorParameter) {
+
+    int startCursor = cursorParameter.intValue();
+
+    System.out.println("hscan receiving startCursor: " + startCursor);
+
+    List<ByteArrayWrapper> keysForScan = getSnapShotOfKeySet(clientID);
+
+    keysForScan = keysForScan.subList(startCursor, keysForScan.size());
+    Map<ByteArrayWrapper, ByteArrayWrapper> dataForKeySnapShot = getDataForKeys(keysForScan);
+
+
+    List<Object> resultList =
+        getResultList(matchPattern, count, dataForKeySnapShot);
+
+    int indexOfLast = resultList.size() - 1;
+    int numberOfIterationsCompleted = Integer.parseInt(resultList.get(indexOfLast).toString());
+    resultList.remove(indexOfLast);
+
+
+    BigInteger returnCursorValue =
+        getCursorValueToReturn(count, startCursor, numberOfIterationsCompleted);
+
+    if (returnCursorValue.intValue() == 0) {
+      removeHSCANSnapshot(clientID);
+    }
+
+    System.out.println("hscan returning Cursor: " + returnCursorValue);
+
+    // returnCursorValue = new BigInteger(String.valueOf(0));
+
+    return new ImmutablePair<>(returnCursorValue, resultList);
+  }
+
+  private void removeHSCANSnapshot(UUID clientID) {
+    this.hScanSnapShots.remove(clientID);
+    this.hScanSnapShotCreationTimes.remove(clientID);
+
+    if (this.hScanSnapShots.isEmpty()) {
+      shutDownHscanSnapshotScheduledRemoval();
+    }
+  }
+
+  private List<Object> getResultList(Pattern matchPattern, int count,
+      Map<ByteArrayWrapper, ByteArrayWrapper> dataForKeySnapShot) {
+
+    int currentCursor = 0;
+    List<Object> resultList = new ArrayList<>();
+
+    for (Map.Entry<ByteArrayWrapper, ByteArrayWrapper> entry : dataForKeySnapShot.entrySet()) {
+
+      if (count == currentCursor) {
+        break;
+      }
+
+      currentCursor++;
+
       ByteArrayWrapper key = entry.getKey();
       ByteArrayWrapper value = entry.getValue();
-      i++;
-      if (beforeCursor.compareTo(cursor) < 0) {
-        beforeCursor = beforeCursor.add(new BigInteger("1"));
-        continue;
-      }
 
       if (matchPattern != null) {
-        if (matchPattern.matcher(key.toString()).matches()) {
-          returnList.add(key);
-          returnList.add(value);
-          numElements++;
-        }
+        addMatches(matchPattern, resultList, key, value);
       } else {
-        returnList.add(key);
-        returnList.add(value);
-        numElements++;
+        resultList.add(key);
+        resultList.add(value);
       }
+    }
+    resultList.add(new ByteArrayWrapper(String.valueOf(currentCursor).getBytes()));
+    return resultList;
+  }
 
-      if (numElements == count) {
-        break;
-      }
+  private void addMatches(Pattern matchPattern, List<Object> returnList,
+      ByteArrayWrapper key,
+      ByteArrayWrapper value) {
+    if (matchPattern.matcher(key.toString()).matches()) {
+      returnList.add(key);
+      returnList.add(value);
     }
+  }
 
-    Pair<BigInteger, List<Object>> scanResult;
-    if (i >= size - 1) {
-      scanResult = new ImmutablePair<>(new BigInteger("0"), returnList);
-    } else {
-      scanResult = new ImmutablePair<>(new BigInteger(String.valueOf(i + 1)), returnList);
+  @SuppressWarnings("unchecked")
+  private Map<ByteArrayWrapper, ByteArrayWrapper> getDataForKeys(
+      List<ByteArrayWrapper> keysForScan) {
+    Map<ByteArrayWrapper, ByteArrayWrapper> dataMap = new HashMap<>();
+
+    keysForScan.forEach(key -> {
+      ByteArrayWrapper value = this.hash.get(key);
+      dataMap.put(key, value);
+    });
+
+    return dataMap;
+  }
+
+  // either increment the cursor by the num,ber iterations we did, or
+  // if we're at the end, we return 0
+  // numberOfIterations < count is assumed, is ther a better way?
+
+  private BigInteger getCursorValueToReturn(int count, int startCursor,
+      int numberOfIterationsCompleted) {
+
+    if (numberOfIterationsCompleted < count) {
+      return BigInteger.valueOf(0);

Review comment:
       why not return an integer here that the caller can then create a BigInteger from if that's what they need?

##########
File path: geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/hash/HScanDunitTest.java
##########
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.hash;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.ScanResult;
+
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class HScanDunitTest {
+  @ClassRule
+  public static RedisClusterStartupRule clusterStartUp = new RedisClusterStartupRule(4);
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+
+  private static Jedis jedis1;
+  private static Jedis jedis2;
+  private static Jedis jedis3;
+
+  private static Properties locatorProperties;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  private static int redisServerPort1;
+  private static int redisServerPort2;
+  private static int redisServerPort3;
+
+  private final int SIZE_OF_INITIAL_HASH_DATA = 1000;
+  private final Map<String, String> INITIAL_HASH_DATA = makeEntrySet(SIZE_OF_INITIAL_HASH_DATA);
+  final String HASH_KEY = "key";
+  final String BASE_FIELD = "baseField_";
+
+  @BeforeClass
+  public static void classSetup() {
+    locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+    locator = clusterStartUp.startLocatorVM(0, locatorProperties);
+    int locatorPort = locator.getPort();
+
+    server1 = clusterStartUp
+        .startRedisVM(1,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server2 = clusterStartUp
+        .startRedisVM(2,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    server3 = clusterStartUp
+        .startRedisVM(3,
+            x -> x.withProperty(ConfigurationProperties.SERIALIZABLE_OBJECT_FILTER,
+                "org.apache.commons.lang3.tuple.**;org.apache.geode.**")
+                .withConnectionToLocator(locatorPort));
+
+    redisServerPort1 = clusterStartUp.getRedisPort(1);
+    redisServerPort2 = clusterStartUp.getRedisPort(2);
+    redisServerPort3 = clusterStartUp.getRedisPort(3);
+
+    jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+    jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+    jedis3 = new Jedis(LOCAL_HOST, redisServerPort3, JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void testSetup() {
+    jedis1.flushAll();
+    jedis2.flushAll();
+    jedis3.flushAll();
+
+    jedis1.hset(HASH_KEY, INITIAL_HASH_DATA);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    jedis1.disconnect();
+    jedis2.disconnect();
+    jedis3.disconnect();
+
+    server1.stop();
+    server2.stop();
+    server3.stop();
+  }
+
+  @Test
+  public void should_stillGetHScan_givenServerCrashThreadsDoingHScans() {
+    final int ITERATION_COUNT = 500;
+
+
+    new ConcurrentLoopingThreads(ITERATION_COUNT,
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis1),
+        (i) -> multipleHScanAndAssertOnSizeOfResultSet(jedis2),
+        (i) -> {
+          int fieldSuffix =
+              i < SIZE_OF_INITIAL_HASH_DATA ? i : (ITERATION_COUNT % SIZE_OF_INITIAL_HASH_DATA);
+
+          jedis3.hset(HASH_KEY, BASE_FIELD + fieldSuffix, "new_value_" + i);
+        }).run();
+  }
+
+  @Test
+  public void should_notLoseDataForConsistentlyPresentFields_givenConcurrentThreadsAddingAndRemovingFields() {
+    final int ITERATION_COUNT = 500;

Review comment:
       extract this out since it is used more than once




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org