You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/05/14 19:44:08 UTC

[geode] branch develop updated (78e5016 -> c407da1)

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a change to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from 78e5016  GEODE-9220: Switch String integration tests to use JedisCluster. (#6444)
     new c8ab8f5  Extend the fastutil hashmap with a stateless cursor method
     new c407da1  GEODE-9256: Use a stateless hscan implementation

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 geode-apis-compatible-with-redis/build.gradle      |   2 +
 .../hash/AbstractHScanIntegrationTest.java         |   2 +-
 .../executor/hash/HScanIntegrationTest.java        |  28 ---
 .../hash/MemoryOverheadIntegrationTest.java        |   2 +-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 .../Object2ObjectOpenCustomHashMapWithCursor.java  | 185 +++++++++++++++++++
 .../geode/redis/internal/data/RedisHash.java       | 184 +++----------------
 .../data/RedisHashCommandsFunctionExecutor.java    |   5 +-
 .../redis/internal/executor/CommandFunction.java   |   4 +-
 .../internal/executor/hash/HScanExecutor.java      |  11 +-
 .../internal/executor/hash/RedisHashCommands.java  |   3 +-
 .../hash/RedisHashCommandsFunctionInvoker.java     |   5 +-
 .../apache/geode/redis/internal/netty/Client.java  |   8 -
 .../internal/netty/ExecutionHandlerContext.java    |  15 --
 ...tOpenCustomHashMapWithCursorQuickCheckTest.java |  89 +++++++++
 ...ject2ObjectOpenCustomHashMapWithCursorTest.java | 204 +++++++++++++++++++++
 .../geode/redis/internal/data/RedisHashTest.java   | 105 ++---------
 17 files changed, 532 insertions(+), 321 deletions(-)
 create mode 100644 geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursor.java
 create mode 100644 geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorQuickCheckTest.java
 create mode 100644 geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorTest.java

[geode] 02/02: GEODE-9256: Use a stateless hscan implementation

Posted by up...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c407da1306990885052c411d3e679cafedda1e08
Author: Dan Smith <da...@vmware.com>
AuthorDate: Fri May 7 16:07:11 2021 -0700

    GEODE-9256: Use a stateless hscan implementation
    
    Replacing our stateful hscan implementation with a stateless implementation
    based on the same algorim as redis. This uses the stateless cursor method
    add Object2ObjectOpenCustomHashMap in the previous commit.
---
 .../hash/AbstractHScanIntegrationTest.java         |   2 +-
 .../executor/hash/HScanIntegrationTest.java        |  28 ----
 .../hash/MemoryOverheadIntegrationTest.java        |   2 +-
 .../geode/redis/internal/data/RedisHash.java       | 184 +++------------------
 .../data/RedisHashCommandsFunctionExecutor.java    |   5 +-
 .../redis/internal/executor/CommandFunction.java   |   4 +-
 .../internal/executor/hash/HScanExecutor.java      |  11 +-
 .../internal/executor/hash/RedisHashCommands.java  |   3 +-
 .../hash/RedisHashCommandsFunctionInvoker.java     |   5 +-
 .../apache/geode/redis/internal/netty/Client.java  |   8 -
 .../internal/netty/ExecutionHandlerContext.java    |  15 --
 .../geode/redis/internal/data/RedisHashTest.java   | 105 +++---------
 12 files changed, 51 insertions(+), 321 deletions(-)

diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
index c02538f..f0146b7 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
@@ -453,7 +453,7 @@ public abstract class AbstractHScanIntegrationTest implements RedisIntegrationTe
     ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "5");
 
     assertThat(new HashSet<>(result.getResult()))
-        .containsExactlyInAnyOrderElementsOf(data.entrySet());
+        .isSubsetOf(data.entrySet());
   }
 
   /**** Concurrency ***/
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java
index 2a15f3c..4a8ba99 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java
@@ -17,17 +17,12 @@ package org.apache.geode.redis.internal.executor.hash;
 
 
 import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
 
 import org.junit.ClassRule;
 import org.junit.Test;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
 
 import org.apache.geode.redis.GeodeRedisServerRule;
 
@@ -64,27 +59,4 @@ public class HScanIntegrationTest extends AbstractHScanIntegrationTest {
     assertThatThrownBy(() -> jedis.hscan("a", tooSmallCursor.toString()))
         .hasMessageContaining(ERROR_CURSOR);
   }
-
-
-  @Test
-  public void givenCount_shouldReturnExpectedNumberOfEntries() {
-    Map<byte[], byte[]> entryMap = new HashMap<>();
-    entryMap.put("1".getBytes(), "yellow".getBytes());
-    entryMap.put("2".getBytes(), "green".getBytes());
-    entryMap.put("3".getBytes(), "orange".getBytes());
-    jedis.hmset("colors".getBytes(), entryMap);
-
-    int COUNT_PARAM = 2;
-
-    ScanParams scanParams = new ScanParams();
-    scanParams.count(COUNT_PARAM);
-    ScanResult<Map.Entry<byte[], byte[]>> result;
-
-    String cursor = "0";
-
-    result = jedis.hscan("colors".getBytes(), cursor.getBytes(), scanParams);
-
-    assertThat(result.getResult().size()).isEqualTo(COUNT_PARAM);
-  }
-
 }
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
index eae64bb..ac66c5a 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
@@ -56,7 +56,7 @@ public class MemoryOverheadIntegrationTest extends AbstractMemoryOverheadIntegra
     result.put(Measurement.STRING, 185);
     result.put(Measurement.SET, 386);
     result.put(Measurement.SET_ENTRY, 72);
-    result.put(Measurement.HASH, 490);
+    result.put(Measurement.HASH, 338);
     result.put(Measurement.HASH_ENTRY, 50);
 
     return result;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
index eea510c..8550651 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
@@ -16,9 +16,6 @@
 
 package org.apache.geode.redis.internal.data;
 
-import static java.lang.System.currentTimeMillis;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;
 import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
 import static org.apache.geode.redis.internal.RedisConstants.ERROR_OVERFLOW;
 
@@ -33,15 +30,10 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.regex.Pattern;
 
 import it.unimi.dsi.fastutil.bytes.ByteArrays;
-import it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap;
 import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.VisibleForTesting;
@@ -49,100 +41,47 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.redis.internal.collections.Object2ObjectOpenCustomHashMapWithCursor;
 import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
 import org.apache.geode.redis.internal.delta.DeltaInfo;
 import org.apache.geode.redis.internal.delta.RemsDeltaInfo;
 import org.apache.geode.redis.internal.netty.Coder;
 
 public class RedisHash extends AbstractRedisData {
-  private Object2ObjectOpenCustomHashMap<byte[], byte[]> hash;
-  private final ConcurrentHashMap<UUID, List<byte[]>> hScanSnapShots = new ConcurrentHashMap<>();
-  private final ConcurrentHashMap<UUID, Long> hScanSnapShotCreationTimes =
-      new ConcurrentHashMap<>();
-  private ScheduledExecutorService HSCANSnapshotExpirationExecutor = null;
-
-  private int sizeInBytes = BASE_REDIS_HASH_OVERHEAD;
-
   // the following constants were calculated using reflection and math. you can find the tests for
   // these values in RedisHashTest, which show the way these numbers were calculated. the constants
   // have the advantage of saving us a lot of computation that would happen every time a new key was
   // added. if our internal implementation changes, these values may be incorrect. the tests will
   // catch this change. an increase in overhead should be carefully considered.
-  protected static final int BASE_REDIS_HASH_OVERHEAD = 336;
+  protected static final int BASE_REDIS_HASH_OVERHEAD = 184;
   protected static final int HASH_MAP_VALUE_PAIR_OVERHEAD = 48;
 
-  private static final int defaultHscanSnapshotsExpireCheckFrequency =
-      Integer.getInteger("redis.hscan-snapshot-cleanup-interval", 30000);
-
-  private static final int defaultHscanSnapshotsMillisecondsToLive =
-      Integer.getInteger("redis.hscan-snapshot-expiry", 30000);
+  private Object2ObjectOpenCustomHashMapWithCursor<byte[], byte[]> hash;
 
-  private int HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS =
-      defaultHscanSnapshotsExpireCheckFrequency;
-  private int MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE =
-      defaultHscanSnapshotsMillisecondsToLive;
+  private int sizeInBytes = BASE_REDIS_HASH_OVERHEAD;
 
 
   @VisibleForTesting
-  public RedisHash(List<byte[]> fieldsToSet, int hscanSnapShotExpirationCheckFrequency,
-      int minimumLifeForHscanSnaphot) {
+  public RedisHash(List<byte[]> fieldsToSet) {
     final int numKeysAndValues = fieldsToSet.size();
     if (numKeysAndValues % 2 != 0) {
       throw new IllegalStateException(
           "fieldsToSet should have an even number of elements but was size " + numKeysAndValues);
     }
 
-    HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS = hscanSnapShotExpirationCheckFrequency;
-    MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE = minimumLifeForHscanSnaphot;
-
-    hash = new Object2ObjectOpenCustomHashMap<>(numKeysAndValues / 2, ByteArrays.HASH_STRATEGY);
+    hash = new Object2ObjectOpenCustomHashMapWithCursor<>(numKeysAndValues / 2,
+        ByteArrays.HASH_STRATEGY);
     Iterator<byte[]> iterator = fieldsToSet.iterator();
     while (iterator.hasNext()) {
       hashPut(iterator.next(), iterator.next());
     }
   }
 
-  public RedisHash(List<byte[]> fieldsToSet) {
-    this(fieldsToSet,
-        defaultHscanSnapshotsExpireCheckFrequency,
-        defaultHscanSnapshotsMillisecondsToLive);
-  }
-
   /**
    * For deserialization only.
    */
   public RedisHash() {}
 
-  private void expireHScanSnapshots() {
-    hScanSnapShotCreationTimes.forEach((client, creationTime) -> {
-      long millisecondsSinceCreation = currentTimeMillis() - creationTime;
-
-      if (millisecondsSinceCreation >= MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE) {
-        removeHSCANSnapshot(client);
-      }
-    });
-  }
-
-  @VisibleForTesting
-  public ConcurrentHashMap<UUID, List<byte[]>> getHscanSnapShots() {
-    return hScanSnapShots;
-  }
-
-  private void startHscanSnapshotScheduledRemoval() {
-    final int DELAY = HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS;
-
-    HSCANSnapshotExpirationExecutor =
-        newSingleThreadScheduledExecutor("GemFireRedis-HSCANSnapshotRemoval-");
-
-    HSCANSnapshotExpirationExecutor.scheduleWithFixedDelay(
-        this::expireHScanSnapshots, DELAY, DELAY, MILLISECONDS);
-  }
-
-  private void shutDownHscanSnapshotScheduledRemoval() {
-    HSCANSnapshotExpirationExecutor.shutdown();
-    HSCANSnapshotExpirationExecutor = null;
-  }
-
   /**
    * Since GII (getInitialImage) can come in and call toData while other threads are modifying this
    * object, the striped executor will not protect toData. So any methods that modify "hash" needs
@@ -166,7 +105,7 @@ public class RedisHash extends AbstractRedisData {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     int size = DataSerializer.readInteger(in);
-    hash = new Object2ObjectOpenCustomHashMap<>(size, ByteArrays.HASH_STRATEGY);
+    hash = new Object2ObjectOpenCustomHashMapWithCursor<>(size, ByteArrays.HASH_STRATEGY);
     for (int i = 0; i < size; i++) {
       hash.put(DataSerializer.readByteArray(in), DataSerializer.readByteArray(in));
     }
@@ -332,107 +271,30 @@ public class RedisHash extends AbstractRedisData {
     return new ArrayList<>(hash.keySet());
   }
 
-  public ImmutablePair<Integer, List<byte[]>> hscan(UUID clientID, Pattern matchPattern,
+  public ImmutablePair<Integer, List<byte[]>> hscan(Pattern matchPattern,
       int count,
-      int startCursor) {
-
-    List<byte[]> keysToScan = getSnapShotOfKeySet(clientID);
+      int cursor) {
 
-    Pair<Integer, List<byte[]>> resultsPair =
-        getResultsPair(keysToScan, startCursor, count, matchPattern);
+    ArrayList<byte[]> resultList = new ArrayList<>(count + 2);
+    do {
+      cursor = hash.scan(cursor, 1,
+          (list, key, value) -> addIfMatching(matchPattern, list, key, value), resultList);
+    } while (cursor != 0 && resultList.size() < (count * 2));
 
-    List<byte[]> resultList = resultsPair.getRight();
-
-    Integer numberOfIterationsCompleted = resultsPair.getLeft();
-
-    int returnCursorValueAsInt =
-        getCursorValueToReturn(startCursor, numberOfIterationsCompleted, keysToScan);
-
-    if (returnCursorValueAsInt == 0) {
-      removeHSCANSnapshot(clientID);
-    }
-
-    return new ImmutablePair<>(returnCursorValueAsInt, resultList);
+    return new ImmutablePair<>(cursor, resultList);
   }
 
-  private void removeHSCANSnapshot(UUID clientID) {
-    hScanSnapShots.remove(clientID);
-    hScanSnapShotCreationTimes.remove(clientID);
-
-    if (hScanSnapShots.isEmpty()) {
-      shutDownHscanSnapshotScheduledRemoval();
-    }
-  }
-
-  private Pair<Integer, List<byte[]>> getResultsPair(List<byte[]> keysSnapShot,
-      int startCursor,
-      int count,
-      Pattern matchPattern) {
-
-    int indexOfKeys = startCursor;
-
-    List<byte[]> resultList = new ArrayList<>();
-
-    for (int index = startCursor; index < keysSnapShot.size(); index++) {
-      if ((index - startCursor) == count) {
-        break;
-      }
-
-      byte[] key = keysSnapShot.get(index);
-      indexOfKeys++;
-
-      byte[] value = hash.get(key);
-      if (value == null) {
-        continue;
-      }
-
-      if (matchPattern != null) {
-        if (matchPattern.matcher(Coder.bytesToString(key)).matches()) {
-          resultList.add(key);
-          resultList.add(value);
-        }
-      } else {
+  private void addIfMatching(Pattern matchPattern, List<byte[]> resultList, byte[] key,
+      byte[] value) {
+    if (matchPattern != null) {
+      if (matchPattern.matcher(Coder.bytesToString(key)).matches()) {
         resultList.add(key);
         resultList.add(value);
       }
+    } else {
+      resultList.add(key);
+      resultList.add(value);
     }
-
-    Integer numberOfIterationsCompleted = indexOfKeys - startCursor;
-
-    return new ImmutablePair<>(numberOfIterationsCompleted, resultList);
-  }
-
-  private int getCursorValueToReturn(int startCursor,
-      int numberOfIterationsCompleted,
-      List<byte[]> keySnapshot) {
-
-    if (startCursor + numberOfIterationsCompleted >= keySnapshot.size()) {
-      return 0;
-    }
-
-    return (startCursor + numberOfIterationsCompleted);
-  }
-
-  private List<byte[]> getSnapShotOfKeySet(UUID clientID) {
-    List<byte[]> keySnapShot = this.hScanSnapShots.get(clientID);
-
-    if (keySnapShot == null) {
-      if (hScanSnapShots.isEmpty()) {
-        startHscanSnapshotScheduledRemoval();
-      }
-      keySnapShot = createKeySnapShot(clientID);
-    }
-    return keySnapShot;
-  }
-
-  private List<byte[]> createKeySnapShot(UUID clientID) {
-
-    List<byte[]> keySnapShot = new ArrayList<>(hash.keySet());
-
-    hScanSnapShots.put(clientID, keySnapShot);
-    hScanSnapShotCreationTimes.put(clientID, currentTimeMillis());
-
-    return keySnapShot;
   }
 
   public long hincrby(Region<RedisKey, RedisData> region, RedisKey key,
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
index 0aa7c20..7fa4a05 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
@@ -19,7 +19,6 @@ package org.apache.geode.redis.internal.data;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -94,10 +93,10 @@ public class RedisHashCommandsFunctionExecutor extends RedisDataCommandsFunction
 
   @Override
   public Pair<Integer, List<byte[]>> hscan(RedisKey key, Pattern matchPattern,
-      int count, int cursor, UUID clientID) {
+      int count, int cursor) {
     return stripedExecute(key,
         () -> getRedisHash(key, true)
-            .hscan(clientID, matchPattern, count, cursor));
+            .hscan(matchPattern, count, cursor));
   }
 
   @Override
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
index aab3c59..8c5c601 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 import java.util.regex.Pattern;
 
 import org.apache.geode.cache.Region;
@@ -261,8 +260,7 @@ public class CommandFunction extends SingleResultRedisFunction {
         Pattern pattern = (Pattern) args[1];
         int count = (int) args[2];
         int cursor = (int) args[3];
-        UUID clientID = (UUID) args[4];
-        return hashCommands.hscan(key, pattern, count, cursor, clientID);
+        return hashCommands.hscan(key, pattern, count, cursor);
       }
       case HINCRBY: {
         byte[] field = (byte[]) args[1];
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
index bde3e4c..e3a9fe8 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
@@ -22,7 +22,6 @@ import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_HASH;
 
 import java.math.BigInteger;
 import java.util.List;
-import java.util.UUID;
 import java.util.regex.Pattern;
 import java.util.regex.PatternSyntaxException;
 
@@ -49,8 +48,6 @@ public class HScanExecutor extends AbstractScanExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
 
-    final UUID CLIENT_ID = context.getClientUUID();
-
     List<byte[]> commandElems = command.getProcessedCommand();
 
     String cursorString = Coder.bytesToString(commandElems.get(2));
@@ -65,10 +62,6 @@ public class HScanExecutor extends AbstractScanExecutor {
       return RedisResponse.error(ERROR_CURSOR);
     }
 
-    if (cursor != context.getHscanCursor()) {
-      cursor = 0;
-    }
-
     RedisKey key = command.getKey();
     if (!getDataRegion(context).containsKey(key)) {
       context.getRedisStats().incKeyspaceMisses();
@@ -117,9 +110,7 @@ public class HScanExecutor extends AbstractScanExecutor {
         new RedisHashCommandsFunctionInvoker(context.getRegionProvider().getDataRegion());
 
     Pair<Integer, List<byte[]>> scanResult =
-        redisHashCommands.hscan(key, matchPattern, count, cursor, CLIENT_ID);
-
-    context.setHscanCursor(scanResult.getLeft());
+        redisHashCommands.hscan(key, matchPattern, count, cursor);
 
     return RedisResponse.scan(new BigInteger(String.valueOf(scanResult.getLeft())),
         scanResult.getRight());
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
index b4119d8..cfe5046 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
@@ -18,7 +18,6 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -47,7 +46,7 @@ public interface RedisHashCommands {
   Collection<byte[]> hkeys(RedisKey key);
 
   Pair<Integer, List<byte[]>> hscan(RedisKey key, Pattern matchPattern, int count,
-      int cursor, UUID clientID);
+      int cursor);
 
   long hincrby(RedisKey key, byte[] field, long increment);
 
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
index 29cfcde..f6702fb 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
@@ -32,7 +32,6 @@ import static org.apache.geode.redis.internal.RedisCommandType.HVALS;
 import java.math.BigDecimal;
 import java.util.Collection;
 import java.util.List;
-import java.util.UUID;
 import java.util.regex.Pattern;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -107,8 +106,8 @@ public class RedisHashCommandsFunctionInvoker extends RedisCommandsFunctionInvok
 
   @Override
   public Pair<Integer, List<byte[]>> hscan(RedisKey key, Pattern matchPattern,
-      int count, int cursor, UUID clientID) {
-    return invokeCommandFunction(key, HSCAN, matchPattern, count, cursor, clientID);
+      int count, int cursor) {
+    return invokeCommandFunction(key, HSCAN, matchPattern, count, cursor);
   }
 
   @Override
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
index 4724ed6..4c1780a 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
@@ -17,7 +17,6 @@
 package org.apache.geode.redis.internal.netty;
 
 import java.util.Objects;
-import java.util.UUID;
 
 import io.netty.channel.Channel;
 import io.netty.util.concurrent.Future;
@@ -27,11 +26,8 @@ import io.netty.util.concurrent.GenericFutureListener;
 public class Client {
   private Channel channel;
 
-  private UUID id;
-
   public Client(Channel remoteAddress) {
     this.channel = remoteAddress;
-    this.id = UUID.randomUUID();
   }
 
   @Override
@@ -63,8 +59,4 @@ public class Client {
   public String toString() {
     return channel.toString();
   }
-
-  public UUID getId() {
-    return id;
-  }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 506056c..5abe6a3 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -18,7 +18,6 @@ package org.apache.geode.redis.internal.netty;
 
 import java.io.IOException;
 import java.math.BigInteger;
-import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -87,7 +86,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
   private final DistributedMember member;
   private BigInteger scanCursor;
   private BigInteger sscanCursor;
-  private int hscanCursor;
   private final AtomicBoolean channelInactive = new AtomicBoolean();
   private final int MAX_QUEUED_COMMANDS =
       Integer.getInteger("geode.redis.commandQueueSize", 1000);
@@ -131,7 +129,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     this.member = member;
     this.scanCursor = new BigInteger("0");
     this.sscanCursor = new BigInteger("0");
-    this.hscanCursor = 0;
     redisStats.addClient();
 
     backgroundExecutor.submit(this::processCommandQueue);
@@ -407,10 +404,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     return client;
   }
 
-  public UUID getClientUUID() {
-    return this.getClient().getId();
-  }
-
   public void shutdown() {
     shutdownInvoker.run();
   }
@@ -439,14 +432,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     this.sscanCursor = sscanCursor;
   }
 
-  public int getHscanCursor() {
-    return this.hscanCursor;
-  }
-
-  public void setHscanCursor(int hscanCursor) {
-    this.hscanCursor = hscanCursor;
-  }
-
   public String getMemberName() {
     return member.getUniqueId();
   }
diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
index 48d8077..de71499 100644
--- a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
+++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
@@ -17,7 +17,6 @@
 package org.apache.geode.redis.internal.data;
 
 import static java.lang.Math.round;
-import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.geode.redis.internal.data.RedisHash.BASE_REDIS_HASH_OVERHEAD;
 import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -34,10 +33,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.assertj.core.data.Offset;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -52,7 +51,6 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.internal.size.ReflectionObjectSizer;
 import org.apache.geode.redis.internal.netty.Coder;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
 
 public class RedisHashTest {
   private final ReflectionObjectSizer reflectionObjectSizer = ReflectionObjectSizer.getInstance();
@@ -180,85 +178,26 @@ public class RedisHashTest {
 
   /************* HSCAN *************/
   @Test
-  public void hscanSnaphots_shouldBeEmpty_givenHscanHasNotBeenCalled() {
-    RedisHash subject = createRedisHash(100);
-    assertThat(subject.getHscanSnapShots()).isEmpty();
-  }
-
-  @Test
-  public void hscanSnaphots_shouldContainSnapshot_givenHscanHasBeenCalled() {
-
-    final List<byte[]> FIELDS_AND_VALUES_FOR_HASH = createListOfDataElements(100);
-    RedisHash subject = new RedisHash(FIELDS_AND_VALUES_FOR_HASH);
-    UUID clientID = UUID.randomUUID();
-
-    subject.hscan(clientID, null, 10, 0);
-
-    ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap = subject.getHscanSnapShots();
-
-    assertThat(hscanSnapShotMap.containsKey(clientID)).isTrue();
-
-    List<byte[]> keyList = hscanSnapShotMap.get(clientID);
-    assertThat(keyList).isNotEmpty();
-
-    FIELDS_AND_VALUES_FOR_HASH.forEach((entry) -> {
-      if (Coder.bytesToString(entry).contains("field")) {
-        assertThat(keyList).contains(entry);
-      } else if (Coder.bytesToString(entry).contains("value")) {
-        assertThat(keyList).doesNotContain(entry);
-      }
-    });
-
-  }
-
-  @Test
-  public void hscanSnaphots_shouldContainSnapshot_givenHscanHasBeenCalled_WithNonZeroCursor() {
+  public void hscanReturnsCorrectNumberOfElements() throws IOException, ClassNotFoundException {
+    RedisHash hash = createRedisHash("k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4");
+    ImmutablePair<Integer, List<byte[]>> result =
+        hash.hscan(null, 2, 0);
 
-    final List<byte[]> FIELDS_AND_VALUES_FOR_HASH = createListOfDataElements(100);
-    RedisHash subject = new RedisHash(FIELDS_AND_VALUES_FOR_HASH);
-    UUID clientID = UUID.randomUUID();
-
-    subject.hscan(clientID, null, 10, 10);
-
-    ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap = subject.getHscanSnapShots();
-
-    assertThat(hscanSnapShotMap.containsKey(clientID)).isTrue();
-
-    List<byte[]> keyList = hscanSnapShotMap.get(clientID);
-    assertThat(keyList).isNotEmpty();
-
-    FIELDS_AND_VALUES_FOR_HASH.forEach((entry) -> {
-      if (Coder.bytesToString(entry).contains("field")) {
-        assertThat(keyList).contains(entry);
-      } else if (Coder.bytesToString(entry).contains("value")) {
-        assertThat(keyList).doesNotContain(entry);
-      }
-    });
+    assertThat(result.left).isNotEqualTo(0);
+    assertThat(result.right).hasSize(4);
+    result = hash.hscan(null, 3, result.left);
+    assertThat(result.left).isEqualTo(0);
+    assertThat(result.right).hasSize(4);
   }
 
   @Test
-  public void hscanSnaphots_shouldBeRemoved_givenCompleteIteration() {
-    RedisHash subject = createRedisHashWithExpiration(1, 100000);
-    UUID client_ID = UUID.randomUUID();
-
-    subject.hscan(client_ID, null, 10, 0);
+  public void hscanOnlyReturnsElementsMatchingPattern() throws IOException, ClassNotFoundException {
+    RedisHash hash = createRedisHash("ak1", "v1", "k2", "v2", "ak3", "v3", "k4", "v4");
+    ImmutablePair<Integer, List<byte[]>> result =
+        hash.hscan(Pattern.compile("a.*"), 3, 0);
 
-    ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap = subject.getHscanSnapShots();
-    assertThat(hscanSnapShotMap).isEmpty();
-  }
-
-  @Test
-  public void hscanSnaphots_shouldExpireAfterExpiryPeriod() {
-    RedisHash subject = createRedisHashWithExpiration(1000, 1);
-    UUID client_ID = UUID.randomUUID();
-
-    subject.hscan(client_ID, null, 1, 0);
-
-    GeodeAwaitility.await().atMost(4, SECONDS).untilAsserted(() -> {
-      ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap =
-          subject.getHscanSnapShots();
-      assertThat(hscanSnapShotMap).isEmpty();
-    });
+    assertThat(result.left).isEqualTo(0);
+    assertThat(toListOfStrings(result.right)).containsExactly("ak1", "v1", "ak3", "v3");
   }
 
   /************* Hash Size *************/
@@ -594,9 +533,8 @@ public class RedisHashTest {
   }
 
   /************* Helper Methods *************/
-  private RedisHash createRedisHash(int NumberOfFields) {
-    ArrayList<byte[]> elements = createListOfDataElements(NumberOfFields);
-    return new RedisHash(elements);
+  private List<String> toListOfStrings(List<byte[]> byteList) {
+    return byteList.stream().map(Coder::bytesToString).collect(Collectors.toList());
   }
 
   private RedisHash createRedisHash(String... keysAndValues) {
@@ -616,11 +554,6 @@ public class RedisHashTest {
     return elements;
   }
 
-  private RedisHash createRedisHashWithExpiration(int NumberOfFields, int hcanSnapshotExpiry) {
-    ArrayList<byte[]> elements = createListOfDataElements(NumberOfFields);
-    return new RedisHash(elements, hcanSnapshotExpiry, hcanSnapshotExpiry);
-  }
-
   private byte[] toBytes(String str) {
     return Coder.stringToBytes(str);
   }

[geode] 01/02: Extend the fastutil hashmap with a stateless cursor method

Posted by up...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git

commit c8ab8f5820ab64ede53e83302a59a491676cc84e
Author: Dan Smith <da...@vmware.com>
AuthorDate: Thu May 6 13:28:14 2021 -0700

    Extend the fastutil hashmap with a stateless cursor method
    
    Adding a new subclass, Object2ObjectOpenCustomHashMapWithCursor, that
    implements the redis HSCAN operation. This method uses the same algorithm as
    redis by incrementing the reverse of the cursor for each step. See
    https://github.com/redis/redis/blob/e504583b7806d946da9c3627784d551a742be4d0/src/dict.c#L651
---
 geode-apis-compatible-with-redis/build.gradle      |   2 +
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   1 +
 .../Object2ObjectOpenCustomHashMapWithCursor.java  | 185 +++++++++++++++++++
 ...tOpenCustomHashMapWithCursorQuickCheckTest.java |  89 +++++++++
 ...ject2ObjectOpenCustomHashMapWithCursorTest.java | 204 +++++++++++++++++++++
 5 files changed, 481 insertions(+)

diff --git a/geode-apis-compatible-with-redis/build.gradle b/geode-apis-compatible-with-redis/build.gradle
index 8ad6652..f01590a 100644
--- a/geode-apis-compatible-with-redis/build.gradle
+++ b/geode-apis-compatible-with-redis/build.gradle
@@ -45,6 +45,8 @@ dependencies {
   testImplementation(project(':geode-junit'))
   testImplementation('org.mockito:mockito-core')
   testImplementation('redis.clients:jedis')
+  testImplementation('com.pholser:junit-quickcheck-core')
+  testImplementation('com.pholser:junit-quickcheck-generators')
 
   commonTestImplementation(project(':geode-junit'))
   commonTestImplementation(project(':geode-dunit'))
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index e69de29..7f8e74a 100644
--- a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -0,0 +1 @@
+org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursor
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursor.java
new file mode 100644
index 0000000..8ded564
--- /dev/null
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursor.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.collections;
+
+import static it.unimi.dsi.fastutil.HashCommon.mix;
+
+import java.util.Map;
+
+import it.unimi.dsi.fastutil.objects.Object2ObjectMap;
+import it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap;
+
+/**
+ * An extention of {@link Object2ObjectOpenCustomHashMap} that supports
+ * a method of iteration where each scan operation returns an integer cursor
+ * that allows future scan operations to start from that same point.
+ *
+ * The scan method provides the same guarantees as Redis's HSCAN, and in fact
+ * uses the same algorithm.
+ */
+public class Object2ObjectOpenCustomHashMapWithCursor<K, V>
+    extends Object2ObjectOpenCustomHashMap<K, V> {
+
+
+  public Object2ObjectOpenCustomHashMapWithCursor(int expected, float f,
+      Strategy<? super K> strategy) {
+    super(expected, f, strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(int expected,
+      Strategy<? super K> strategy) {
+    super(expected, strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(Strategy<? super K> strategy) {
+    super(strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(Map<? extends K, ? extends V> m, float f,
+      Strategy<? super K> strategy) {
+    super(m, f, strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(Map<? extends K, ? extends V> m,
+      Strategy<? super K> strategy) {
+    super(m, strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(
+      Object2ObjectMap<K, V> m,
+      float f,
+      Strategy<? super K> strategy) {
+    super(m, f, strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(Object2ObjectMap<K, V> m,
+      Strategy<? super K> strategy) {
+    super(m, strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(K[] k, V[] v, float f,
+      Strategy<? super K> strategy) {
+    super(k, v, f, strategy);
+  }
+
+  public Object2ObjectOpenCustomHashMapWithCursor(K[] k, V[] v,
+      Strategy<? super K> strategy) {
+    super(k, v, strategy);
+  }
+
+  /**
+   * Scan entries and pass them to the given consumer function, starting at the passed in
+   * cursor. This method will scan until at least count entries are returned, or the entire
+   * map has been scanned. Once the returned cursor is 0, the entire map is scanned.
+   *
+   * This method may emit more than *count* number of elements if there are hash collisions.
+   *
+   * @param cursor The cursor to start from. Should be 0 for the initial scan. Subsequent calls
+   *        should use the cursor returned by the previous scan call.
+   * @param count The number of elements to scan
+   * @param consumer A function to pass the scanned keys and values to
+   * @param privateData Some data to pass to the function, for example a map to collect values in.
+   *        This
+   *        allows the function to be stateless.
+   * @param <D> The type of the data passed to the function/
+   * @return The next cursor to scan from, or 0 if the scan has touched all elements.
+   */
+  public <D> int scan(int cursor, int count, EntryConsumer<K, V, D> consumer, D privateData) {
+    // Implementation notes
+    //
+    // This stateless scan cursor algorithm is based on the dictScan cursor
+    // implementation from dict.c in redis. Please see the comments in that class for the full
+    // details. That iteration algorithm was designed by Pieter Noordhuis.
+    //
+    // There is one wrinkle due to the fact that we are using a different type of hashtable here.
+    // The parent class, Object2ObjectOpenHashMap, uses an open addressing with a linear
+    // probe. What that means is that when there is a hash collision, instead of putting
+    // a linked list of hash entries into a single hash bucket, this implementation simply
+    // moves on to the next element to the right in the array and tries to put the inserted
+    // object there, continuing until it finds a null slot.
+    //
+    // So in order to use the redis cursor algorithm, our scan needs to probe ahead to
+    // subsequent positions to find any hash entries that match the position we are scanning.
+    // This is logically equivalent to iterating over the linked list in a hashtable bucket
+    // for a redis style closed addressing hashtable.
+    //
+
+    do {
+      // Emit all of the entries at the cursor. This means looking forward in the hash
+      // table for any non-null entries that might hash to the current cursor and emitting
+      // those as well. This may even wrap around to the front of the hashtable.
+      int position = cursor;
+      while (key[position & mask] != null) {
+        if (elementAtHashesTo(position, cursor & mask)) {
+          consumer.consume(privateData, key[position & mask], value[position & mask]);
+          count--;
+        }
+        position++;
+      }
+
+      // Increment the reversed cursor
+      cursor |= ~mask;
+      cursor = rev(cursor);
+      cursor++;
+      cursor = rev(cursor);
+
+
+    } while (count > 0 && cursor != 0);
+
+    return cursor;
+  }
+
+  /**
+   * reverse the bits in a cursor.
+   *
+   * Package scope to allow for unit testing to make sure we don't have some silly
+   * java signed int issues
+   *
+   * @param value the value to reverse
+   * @return the reversed bits.
+   */
+  static int rev(int value) {
+    // This implementation is also based on dict.c from redis, which was originally from
+    // http://graphics.stanford.edu/~seander/bithacks.html#ReverseParallel
+    int s = 32;
+    int mask = ~0;
+    while ((s >>>= 1) > 0) {
+      mask ^= (mask << s);
+      value = ((value >>> s) & mask) | ((value << s) & ~mask);
+    }
+    return value;
+  }
+
+  /**
+   * Check to see if the element at position hashes to the expected hash.
+   */
+  private boolean elementAtHashesTo(int position, int expectedHash) {
+    // There is a small optimization here. If the previous element
+    // is null, we know that the element at position does hash to the expected
+    // hash because it is not here as a result of a collision at some previous position.
+
+    K previousEntry = key[(position - 1) & mask];
+    return previousEntry == null
+        || (hash(key[position & mask]) & mask) == expectedHash;
+  }
+
+  private int hash(K key) {
+    return mix(strategy.hashCode(key));
+  }
+
+  public interface EntryConsumer<K, V, D> {
+    void consume(D privateData, K key, V value);
+  }
+}
diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorQuickCheckTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorQuickCheckTest.java
new file mode 100644
index 0000000..6cd61d3
--- /dev/null
+++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorQuickCheckTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import com.pholser.junit.quickcheck.Property;
+import com.pholser.junit.quickcheck.generator.InRange;
+import com.pholser.junit.quickcheck.generator.Size;
+import com.pholser.junit.quickcheck.runner.JUnitQuickcheck;
+import it.unimi.dsi.fastutil.Hash;
+import org.junit.runner.RunWith;
+
+@RunWith(JUnitQuickcheck.class)
+public class Object2ObjectOpenCustomHashMapWithCursorQuickCheckTest {
+
+  private static final Hash.Strategy<Integer> NATURAL_HASH = new Hash.Strategy<Integer>() {
+    @Override
+    public int hashCode(Integer o) {
+      return o.hashCode();
+    }
+
+    @Override
+    public boolean equals(Integer a, Integer b) {
+      return a.equals(b);
+    }
+  };
+
+
+  @Property
+  public void scanWithConcurrentModifications_ReturnsExpectedElements(
+      @Size(min = 2, max = 500) Set<@InRange(minInt = 0, maxInt = 500) Integer> initialData,
+      @Size(max = 500) Set<@InRange(minInt = 0, maxInt = 1000) Integer> dataToAdd,
+      @Size(max = 500) Set<@InRange(minInt = 0, maxInt = 500) Integer> keysToRemove) {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, Integer> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(NATURAL_HASH);
+    initialData.forEach(i -> map.put(i, i));
+
+    HashMap<Integer, Integer> scanned = new HashMap<>();
+    int cursor = map.scan(0, initialData.size() / 2, HashMap::put, scanned);
+
+    dataToAdd.forEach(i -> map.put(i, i));
+    map.keySet().removeAll(keysToRemove);
+
+    cursor = map.scan(cursor, 100000, HashMap::put, scanned);
+    assertThat(cursor).isEqualTo(0);
+
+    // Test that we can scan all of the entries what were in the map the entire time.
+    Set<Integer> expectedKeys = new HashSet<>(initialData);
+    expectedKeys.removeAll(keysToRemove);
+    assertThat(scanned.keySet()).containsAll(expectedKeys);
+  }
+
+  @Property
+  public void scanWithNoModificationsDoesNotReturnDuplicates(
+      @Size(min = 2, max = 500) Set<@InRange(minInt = 0, maxInt = 500) Integer> initialData) {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, Integer> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(NATURAL_HASH);
+    initialData.forEach(i -> map.put(i, i));
+
+    List<Integer> scanned = new ArrayList<>();
+    int cursor = map.scan(0, initialData.size() / 2, (data, key, value) -> data.add(key), scanned);
+
+    cursor = map.scan(cursor, 100000, (data, key, value) -> data.add(key), scanned);
+    assertThat(cursor).isEqualTo(0);
+
+    // Test that no duplicate entries were added and no entries were missed.
+    assertThat(scanned).hasSize(initialData.size());
+    assertThat(scanned).containsExactlyInAnyOrderElementsOf(initialData);
+  }
+}
diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorTest.java
new file mode 100644
index 0000000..565cb3c
--- /dev/null
+++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/collections/Object2ObjectOpenCustomHashMapWithCursorTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.stream.IntStream;
+
+import it.unimi.dsi.fastutil.Hash;
+import org.junit.Test;
+
+public class Object2ObjectOpenCustomHashMapWithCursorTest {
+  private static final Hash.Strategy<Integer> NATURAL_HASH = new Hash.Strategy<Integer>() {
+    @Override
+    public int hashCode(Integer o) {
+      return o.hashCode();
+    }
+
+    @Override
+    public boolean equals(Integer a, Integer b) {
+      return a.equals(b);
+    }
+  };
+
+  private static Hash.Strategy<Integer> COLLIDING_HASH = new Hash.Strategy<Integer>() {
+    @Override
+    public int hashCode(Integer o) {
+      return o % 5;
+    }
+
+    @Override
+    public boolean equals(Integer a, Integer b) {
+      return a.equals(b);
+    }
+  };
+
+  @Test
+  public void scanEntireMap_ReturnsExpectedElements() {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, String> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(NATURAL_HASH);
+    IntStream.range(0, 10).forEach(i -> map.put(i, "value-" + i));
+
+    HashMap<Integer, String> scanned = new HashMap<>();
+    int result = map.scan(0, 10000, HashMap::put, scanned);
+    assertThat(result).isEqualTo(0);
+    assertThat(scanned).isEqualTo(map);
+  }
+
+  @Test
+  public void twoScansWithNoModifications_ReturnsExpectedElements() {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, String> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(NATURAL_HASH);
+    IntStream.range(0, 10).forEach(i -> map.put(i, "value-" + i));
+
+    HashMap<Integer, String> scanned = new HashMap<>();
+    int cursor = map.scan(0, 3, HashMap::put, scanned);
+    assertThat(scanned).hasSize(3);
+    cursor = map.scan(cursor, 3, HashMap::put, scanned);
+    assertThat(scanned).hasSize(6);
+    cursor = map.scan(cursor, 4, HashMap::put, scanned);
+    assertThat(scanned).hasSize(10);
+    cursor = map.scan(cursor, 4, HashMap::put, scanned);
+    assertThat(scanned).hasSize(10);
+    assertThat(cursor).isEqualTo(0);
+
+    assertThat(scanned).isEqualTo(map);
+  }
+
+  @Test
+  public void scanWithConcurrentRemoves_ReturnsExpectedElements() {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, String> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(NATURAL_HASH);
+    IntStream.range(0, 10).forEach(i -> map.put(i, "value-" + i));
+
+    HashMap<Integer, String> scanned = new HashMap<>();
+    int cursor = map.scan(0, 5, HashMap::put, scanned);
+    assertThat(scanned).hasSize(5);
+
+    // Remove some of the elements
+    map.remove(2);
+    map.remove(4);
+    map.remove(5);
+    map.remove(7);
+
+    cursor = map.scan(cursor, 5, HashMap::put, scanned);
+    assertThat(cursor).isEqualTo(0);
+
+    assertThat(scanned).containsKeys(0, 1, 3, 6, 8, 9);
+  }
+
+  @Test
+  public void scanWithHashcodeCollisions_ReturnsExpectedElements() {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, String> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(COLLIDING_HASH);
+    IntStream.range(0, 10).forEach(i -> map.put(i, "value-" + i));
+
+    // The colliding hash is just key % 5. So 0 and 5 will have the same hashcode, etc.
+    HashMap<Integer, String> scanned = new HashMap<>();
+    int cursor = map.scan(0, 1, HashMap::put, scanned);
+
+    // The scan had to ignore the count and return all of the elements with the same hash
+    assertThat(scanned).hasSize(2);
+
+    cursor = map.scan(cursor, 1, HashMap::put, scanned);
+    assertThat(scanned).hasSize(4);
+    cursor = map.scan(cursor, 1, HashMap::put, scanned);
+    assertThat(scanned).hasSize(6);
+    cursor = map.scan(cursor, 1, HashMap::put, scanned);
+    assertThat(scanned).hasSize(8);
+    cursor = map.scan(cursor, 1, HashMap::put, scanned);
+    assertThat(scanned).hasSize(10);
+    cursor = map.scan(cursor, 1, HashMap::put, scanned);
+    assertThat(scanned).hasSize(10);
+
+    assertThat(cursor).isEqualTo(0);
+    assertThat(scanned).isEqualTo(map);
+  }
+
+  @Test
+  public void scanWithHashcodeCollisionsAndConcurrentRemoves_ReturnsExpectedElements() {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, String> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(COLLIDING_HASH);
+    IntStream.range(0, 10).forEach(i -> map.put(i, "value-" + i));
+
+    HashMap<Integer, String> scanned = new HashMap<>();
+    int cursor = map.scan(0, 5, HashMap::put, scanned);
+    assertThat(scanned).hasSize(6);
+
+    // Remove some of the elements
+    map.remove(2);
+    map.remove(4);
+    map.remove(5);
+    map.remove(7);
+
+    cursor = map.scan(cursor, 5, HashMap::put, scanned);
+
+    assertThat(cursor).isEqualTo(0);
+    assertThat(scanned).containsKeys(0, 1, 3, 6, 8, 9);
+  }
+
+  @Test
+  public void scanWithGrowingTable_DoesNotMissElements() {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, String> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(NATURAL_HASH);
+    IntStream.range(0, 10).forEach(i -> map.put(i, "value-" + i));
+
+    HashMap<Integer, String> scanned = new HashMap<>();
+    int cursor = map.scan(0, 5, HashMap::put, scanned);
+    assertThat(scanned).hasSize(5);
+
+
+    // Add a lot of elements to trigger a resize
+    IntStream.range(10, 500).forEach(i -> map.put(i, "value-" + i));
+
+    cursor = map.scan(cursor, 500, HashMap::put, scanned);
+    assertThat(cursor).isEqualTo(0);
+
+    // We don't know that we will have all of the 500 new elements, only that
+    // we should have scanned all of the original elements
+    assertThat(scanned).containsKeys(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
+  }
+
+  @Test
+  public void scanWithShrinkingTable_DoesNotMissElements() {
+    Object2ObjectOpenCustomHashMapWithCursor<Integer, String> map =
+        new Object2ObjectOpenCustomHashMapWithCursor<>(NATURAL_HASH);
+    IntStream.range(0, 500).forEach(i -> map.put(i, "value-" + i));
+
+    HashMap<Integer, String> scanned = new HashMap<>();
+    int cursor = map.scan(0, 50, HashMap::put, scanned);
+    assertThat(scanned).hasSize(50);
+
+
+    // Remove a lot of elements to trigger a resize
+    IntStream.range(100, 500).forEach(map::remove);
+
+    cursor = map.scan(cursor, 500, HashMap::put, scanned);
+    assertThat(cursor).isEqualTo(0);
+
+    // Scan should at least have all of the remaining keys
+    assertThat(scanned).containsAllEntriesOf(map);
+  }
+
+  @Test
+  public void revWorksWhenSignBitIsSet() {
+    assertThat(Object2ObjectOpenCustomHashMapWithCursor.rev(0xFF000000)).isEqualTo(0xFF);
+    assertThat(Object2ObjectOpenCustomHashMapWithCursor.rev(0xFF)).isEqualTo(0xFF000000);
+  }
+
+
+}