You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by do...@apache.org on 2022/02/01 18:27:20 UTC

[geode] branch support/1.15 updated: GEODE-9835: Add SSCAN to Redis supported commands (#7278)

This is an automated email from the ASF dual-hosted git repository.

donalevans pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.15 by this push:
     new 3f56bd3  GEODE-9835: Add SSCAN to Redis supported commands (#7278)
3f56bd3 is described below

commit 3f56bd36a698f05ade37fa0eaa629dfaacf5148f
Author: Bala Kaza Venkata <43...@users.noreply.github.com>
AuthorDate: Tue Feb 1 12:31:22 2022 -0500

    GEODE-9835: Add SSCAN to Redis supported commands (#7278)
    
    Co-authored-by: Bala Kaza Venkata <bk...@vmware.com>
    Co-authored-by: Steve Sienkowski <ss...@vmware.com>
    (cherry picked from commit da6043239eb637994eaba64c3d59cf327aac0f7d)
---
 .../server/AbstractHitsMissesIntegrationTest.java  |  10 +-
 .../executor/set/AbstractSScanIntegrationTest.java | 466 +++++++++++++--------
 .../executor/set/SScanIntegrationTest.java         |  50 ++-
 .../apache/geode/codeAnalysis/excludedClasses.txt  |   2 +-
 .../redis/internal/commands/RedisCommandType.java  |   4 +-
 .../commands/executor/set/SScanExecutor.java       |  94 +----
 .../apache/geode/redis/internal/data/RedisSet.java |  51 +--
 .../SizeableObjectOpenCustomHashSet.java           |  81 ----
 .../SizeableObjectOpenCustomHashSetWithCursor.java | 194 +++++++++
 .../internal/netty/ExecutionHandlerContext.java    |  10 -
 ...ytes2ObjectOpenCustomHashMapWithCursorTest.java |  10 +-
 .../SizeableObjectOpenCustomHashSetTest.java       |  62 ---
 ...eableObjectOpenCustomHashSetWithCursorTest.java | 284 +++++++++++++
 13 files changed, 837 insertions(+), 481 deletions(-)

diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
index 0933ed6..4b07ff9 100644
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
@@ -426,6 +426,11 @@ public abstract class AbstractHitsMissesIntegrationTest implements RedisIntegrat
   }
 
   @Test
+  public void testSscan() {
+    runCommandAndAssertHitsAndMisses(SET_KEY, k -> jedis.sscan(k, "0"));
+  }
+
+  @Test
   public void testSunion() {
     runMultiKeyCommandAndAssertHitsAndMisses(SET_KEY, (k1, k2) -> jedis.sunion(k1, k2));
   }
@@ -566,11 +571,6 @@ public abstract class AbstractHitsMissesIntegrationTest implements RedisIntegrat
     runCommandAndAssertNoStatUpdates(SET_KEY, k -> jedis.spop(k));
   }
 
-  @Test
-  public void testSscan() {
-    runCommandAndAssertHitsAndMisses(SET_KEY, k -> jedis.sscan(k, "0"));
-  }
-
   /************* Helper Methods *************/
   private void runCommandAndAssertHitsAndMisses(String key, Consumer<String> command) {
     Map<String, String> info = RedisTestHelper.getInfo(jedis);
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java
index a39f77a..2e6adda 100755
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java
@@ -14,37 +14,55 @@
  */
 package org.apache.geode.redis.internal.commands.executor.set;
 
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
 import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
 import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
 import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
 import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
+import java.math.BigInteger;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
 import redis.clients.jedis.JedisCluster;
 import redis.clients.jedis.Protocol;
 import redis.clients.jedis.ScanParams;
 import redis.clients.jedis.ScanResult;
 
+import org.apache.geode.redis.ConcurrentLoopingThreads;
 import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.data.KeyHashUtil;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
 
 public abstract class AbstractSScanIntegrationTest implements RedisIntegrationTest {
   protected JedisCluster jedis;
-  private static final int REDIS_CLIENT_TIMEOUT =
-      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  public static final String KEY = "key";
+  public static final byte[] KEY_BYTES = KEY.getBytes();
+  public static final int SLOT_FOR_KEY = KeyHashUtil.slotForKey(KEY_BYTES);
+  public static final String ZERO_CURSOR = "0";
+  public static final byte[] ZERO_CURSOR_BYTES = ZERO_CURSOR.getBytes();
+  public static final BigInteger SIGNED_LONG_MAX = new BigInteger(Long.toString(Long.MAX_VALUE));
+  public static final BigInteger SIGNED_LONG_MIN = new BigInteger(Long.toString(Long.MIN_VALUE));
+  public static final String MEMBER_ONE = "1";
+  public static final String MEMBER_TWELVE = "12";
+  public static final String MEMBER_THREE = "3";
+  public static final String BASE_MEMBER_NAME = "baseMember_";
 
   @Before
   public void setUp() {
-    jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+    jedis = new JedisCluster(new HostAndPort(RedisClusterStartupRule.BIND_ADDRESS, getPort()),
+        RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT);
   }
 
   @After
@@ -54,295 +72,405 @@ public abstract class AbstractSScanIntegrationTest implements RedisIntegrationTe
   }
 
   @Test
-  public void givenNoKeyArgument_returnsWrongNumberOfArgumentsError() {
-    assertThatThrownBy(() -> jedis.sendCommand("key", Protocol.Command.SSCAN))
-        .hasMessageContaining("ERR wrong number of arguments for 'sscan' command");
+  public void givenLessThanTwoArguments_returnsWrongNumberOfArgumentsError() {
+    assertAtLeastNArgs(jedis, Protocol.Command.SSCAN, 2);
   }
 
   @Test
-  public void givenNoCursorArgument_returnsWrongNumberOfArgumentsError() {
-    assertThatThrownBy(() -> jedis.sendCommand("key", Protocol.Command.SSCAN, "key"))
-        .hasMessageContaining("ERR wrong number of arguments for 'sscan' command");
+  public void givenNonexistentKey_returnsEmptyArray() {
+    ScanResult<String> result = jedis.sscan("nonexistentKey", ZERO_CURSOR);
+    assertThat(result.isCompleteIteration()).isTrue();
+    assertThat(result.getResult()).isEmpty();
   }
 
   @Test
-  public void givenArgumentsAreNotOddAndKeyExists_returnsSyntaxError() {
-    jedis.sadd("a", "1");
-    assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "a*"))
-        .hasMessageContaining(ERROR_SYNTAX);
+  public void givenNonexistentKeyAndIncorrectOptionalArguments_returnsEmptyArray() {
+    ScanResult<byte[]> result =
+        sendCustomSscanCommand("nonexistentKey", "nonexistentKey", ZERO_CURSOR, "ANY");
+    assertThat(result.getResult()).isEmpty();
   }
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void givenArgumentsAreNotOddAndKeyDoesNotExist_returnsEmptyArray() {
-    List<Object> result =
-        (List<Object>) jedis.sendCommand("key!", Protocol.Command.SSCAN, "key!", "0", "a*");
+  public void givenIncorrectOptionalArgumentsAndKeyExists_returnsSyntaxError() {
+    jedis.sadd(KEY, MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "a*"))
+            .hasMessageContaining(ERROR_SYNTAX);
+  }
+
+  @Test
+  public void givenMatchArgumentWithoutPatternOnExistingKey_returnsSyntaxError() {
+    jedis.sadd(KEY, MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "MATCH"))
+            .hasMessageContaining(ERROR_SYNTAX);
+  }
 
-    assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
-    assertThat((List<Object>) result.get(1)).isEmpty();
+  @Test
+  public void givenCountArgumentWithoutNumberOnExistingKey_returnsSyntaxError() {
+    jedis.sadd(KEY, MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT"))
+            .hasMessageContaining(ERROR_SYNTAX);
   }
 
   @Test
-  public void givenMatchOrCountKeywordNotSpecified_returnsSyntaxError() {
-    jedis.sadd("a", "1");
-    assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "a*", "1"))
-        .hasMessageContaining(ERROR_SYNTAX);
+  public void givenAdditionalArgumentNotEqualToMatchOrCount_returnsSyntaxError() {
+    jedis.sadd(KEY, MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "a*", "1"))
+            .hasMessageContaining(ERROR_SYNTAX);
   }
 
   @Test
   public void givenCount_whenCountParameterIsNotAnInteger_returnsNotIntegerError() {
-    jedis.sadd("a", "1");
+    jedis.sadd(KEY, MEMBER_ONE);
     assertThatThrownBy(
-        () -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "MATCH"))
-            .hasMessageContaining(ERROR_NOT_INTEGER);
+        () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT",
+            "notAnInteger"))
+                .hasMessageContaining(ERROR_NOT_INTEGER);
   }
 
   @Test
   public void givenMultipleCounts_whenAnyCountParameterIsNotAnInteger_returnsNotIntegerError() {
-    jedis.sadd("a", "1");
-    assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "2",
-        "COUNT", "sjlfs", "COUNT", "1"))
-            .hasMessageContaining(ERROR_NOT_INTEGER);
+    jedis.sadd(KEY, MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT", "12",
+            "COUNT", "notAnInteger", "COUNT", "1"))
+                .hasMessageContaining(ERROR_NOT_INTEGER);
   }
 
   @Test
   public void givenMultipleCounts_whenAnyCountParameterIsLessThanOne_returnsSyntaxError() {
-    jedis.sadd("a", "1");
-    assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "2",
-        "COUNT", "0", "COUNT", "1"))
-            .hasMessageContaining(ERROR_SYNTAX);
+    jedis.sadd(KEY, MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT", "12",
+            "COUNT", "0", "COUNT", "1"))
+                .hasMessageContaining(ERROR_SYNTAX);
   }
 
   @Test
   public void givenCount_whenCountParameterIsZero_returnsSyntaxError() {
-    jedis.sadd("a", "1");
-
-    assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "0"))
-        .hasMessageContaining(ERROR_SYNTAX);
+    jedis.sadd(KEY, MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(0)))
+            .hasMessageContaining(ERROR_SYNTAX);
   }
 
   @Test
   public void givenCount_whenCountParameterIsNegative_returnsSyntaxError() {
-    jedis.sadd("a", "1");
-
+    jedis.sadd(KEY, MEMBER_ONE);
     assertThatThrownBy(
-        () -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "-37"))
+        () -> jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(-37)))
             .hasMessageContaining(ERROR_SYNTAX);
   }
 
   @Test
   public void givenKeyIsNotASet_returnsWrongTypeError() {
-    jedis.hset("a", "b", "1");
+    jedis.hset(KEY, "b", MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sscan(KEY, ZERO_CURSOR))
+            .hasMessageContaining(ERROR_WRONG_TYPE);
+  }
 
+  @Test
+  public void givenKeyIsNotASetAndCountIsNegative_returnsWrongTypeError() {
+    jedis.hset(KEY, "b", MEMBER_ONE);
     assertThatThrownBy(
-        () -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "-37"))
+        () -> jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(-37)))
             .hasMessageContaining(ERROR_WRONG_TYPE);
   }
 
   @Test
   public void givenKeyIsNotASet_andCursorIsNotAnInteger_returnsInvalidCursorError() {
-    jedis.hset("a", "b", "1");
-
-    assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "sjfls"))
-        .hasMessageContaining(ERROR_CURSOR);
+    jedis.hset(KEY, "b", MEMBER_ONE);
+    assertThatThrownBy(
+        () -> jedis.sscan(KEY, "notAnInteger"))
+            .hasMessageContaining(ERROR_CURSOR);
   }
 
   @Test
   public void givenNonexistentKey_andCursorIsNotInteger_returnsInvalidCursorError() {
     assertThatThrownBy(
-        () -> jedis.sendCommand("notReal", Protocol.Command.SSCAN, "notReal", "notReal", "sjfls"))
+        () -> jedis.sscan("nonexistentKey", "notAnInteger"))
             .hasMessageContaining(ERROR_CURSOR);
   }
 
   @Test
   public void givenExistentSetKey_andCursorIsNotAnInteger_returnsInvalidCursorError() {
-    jedis.set("a", "b");
-
-    assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "sjfls"))
-        .hasMessageContaining(ERROR_CURSOR);
+    jedis.set(KEY, "b");
+    assertThatThrownBy(
+        () -> jedis.sscan(KEY, "notAnInteger"))
+            .hasMessageContaining(ERROR_CURSOR);
   }
 
   @Test
-  public void givenNonexistentKey_returnsEmptyArray() {
-    ScanResult<String> result = jedis.sscan("nonexistent", "0");
-
-    assertThat(result.isCompleteIteration()).isTrue();
-    assertThat(result.getResult()).isEmpty();
+  public void givenNegativeCursor_doesNotError() {
+    initializeThousandMemberSet();
+    assertThatNoException().isThrownBy(() -> jedis.sscan(KEY, "-1"));
   }
 
   @Test
   public void givenSetWithOneMember_returnsMember() {
-    jedis.sadd("a", "1");
-    ScanResult<String> result = jedis.sscan("a", "0");
+    jedis.sadd(KEY, MEMBER_ONE);
+
+    ScanResult<String> result = jedis.sscan(KEY, ZERO_CURSOR);
 
     assertThat(result.isCompleteIteration()).isTrue();
-    assertThat(result.getResult()).containsExactly("1");
+    assertThat(result.getResult()).containsOnly(MEMBER_ONE);
   }
 
   @Test
-  public void givenSetWithMultipleMembers_returnsAllMembers() {
-    jedis.sadd("a", "1", "2", "3");
-    ScanResult<String> result = jedis.sscan("a", "0");
+  public void givenSetWithMultipleMembers_returnsSubsetOfMembers() {
+    Set<String> initialMemberData = initializeThousandMemberSet();
 
-    assertThat(result.isCompleteIteration()).isTrue();
-    assertThat(result.getResult()).containsExactlyInAnyOrder("1", "2", "3");
+    ScanResult<String> result = jedis.sscan(KEY, ZERO_CURSOR);
+
+    assertThat(result.getResult()).isSubsetOf(initialMemberData);
   }
 
   @Test
   public void givenCount_returnsAllMembersWithoutDuplicates() {
-    jedis.sadd("a", "1", "2", "3");
+    Set<byte[]> initialTotalSet = initializeThousandMemberByteSet();
+    int count = 99;
 
-    ScanParams scanParams = new ScanParams();
-    scanParams.count(1);
-    String cursor = "0";
-    ScanResult<byte[]> result;
-    List<byte[]> allMembersFromScan = new ArrayList<>();
+    ScanResult<byte[]> result =
+        jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(count));
 
-    do {
-      result = jedis.sscan("a".getBytes(), cursor.getBytes(), scanParams);
-      allMembersFromScan.addAll(result.getResult());
-      cursor = result.getCursor();
-    } while (!result.isCompleteIteration());
+    assertThat(result.getResult().size()).isGreaterThanOrEqualTo(count);
+    assertThat(result.getResult()).isSubsetOf(initialTotalSet);
+  }
 
-    assertThat(allMembersFromScan).containsExactlyInAnyOrder("1".getBytes(),
-        "2".getBytes(),
-        "3".getBytes());
+  @Test
+  public void givenMultipleCounts_usesLastCountSpecified() {
+    Set<byte[]> initialMemberData = initializeThousandMemberByteSet();
+    // Choose two COUNT arguments with a large difference, so that it's extremely unlikely that if
+    // the first COUNT is used, a number of members greater than or equal to the second COUNT will
+    // be returned.
+    int firstCount = 1;
+    int secondCount = 500;
+    ScanResult<byte[]> result = sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR,
+        "COUNT", String.valueOf(firstCount),
+        "COUNT", String.valueOf(secondCount));
+
+    List<byte[]> returnedMembers = result.getResult();
+    assertThat(returnedMembers.size()).isGreaterThanOrEqualTo(secondCount);
+    assertThat(returnedMembers).isSubsetOf(initialMemberData);
   }
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void givenMultipleCounts_returnsAllEntriesWithoutDuplicates() {
-    jedis.sadd("a", "1", "12", "3");
+  public void givenSetWithThreeEntriesAndMatch_returnsOnlyMatchingElements() {
+    jedis.sadd(KEY, MEMBER_ONE, MEMBER_TWELVE, MEMBER_THREE);
+    ScanParams scanParams = new ScanParams().match("1*");
+
+    ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
 
-    List<Object> result;
+    assertThat(result.isCompleteIteration()).isTrue();
+    assertThat(result.getResult()).containsOnly(MEMBER_ONE.getBytes(),
+        MEMBER_TWELVE.getBytes());
+  }
 
-    List<byte[]> allEntries = new ArrayList<>();
-    String cursor = "0";
+  @Test
+  public void givenSetWithThreeEntriesAndMultipleMatchArguments_returnsOnlyElementsMatchingLastMatchArgument() {
+    jedis.sadd(KEY, MEMBER_ONE, MEMBER_TWELVE, MEMBER_THREE);
 
-    do {
-      result =
-          (List<Object>) jedis.sendCommand("a", Protocol.Command.SSCAN, "a", cursor, "COUNT", "2",
-              "COUNT", "1");
-      allEntries.addAll((List<byte[]>) result.get(1));
-      cursor = new String((byte[]) result.get(0));
-    } while (!Arrays.equals((byte[]) result.get(0), "0".getBytes()));
+    ScanResult<byte[]> result =
+        sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR, "MATCH", "3*", "MATCH", "1*");
 
-    assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
-    assertThat(allEntries).containsExactlyInAnyOrder("1".getBytes(),
-        "12".getBytes(),
-        "3".getBytes());
+    assertThat(result.getCursor()).isEqualTo(ZERO_CURSOR);
+    assertThat(result.getResult()).containsOnly(MEMBER_ONE.getBytes(),
+        MEMBER_TWELVE.getBytes());
   }
 
   @Test
-  public void givenMatch_returnsAllMatchingMembersWithoutDuplicates() {
-    jedis.sadd("a", "1", "12", "3");
-
+  public void givenLargeCountAndMatch_returnsOnlyMatchingMembers() {
+    Set<byte[]> initialMemberData = initializeThousandMemberByteSet();
     ScanParams scanParams = new ScanParams();
-    scanParams.match("1*");
+    // There are 111 matching members in the set 0..999
+    scanParams.match("9*");
+    // Choose a large COUNT to ensure that some matching members are returned
+    scanParams.count(950);
+
+    ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
+
+    List<byte[]> returnedMembers = result.getResult();
+    // We know that we must have found at least 61 matching members, given the size of COUNT and the
+    // number of matching members in the set
+    assertThat(returnedMembers.size()).isGreaterThanOrEqualTo(61);
+    assertThat(returnedMembers).isSubsetOf(initialMemberData);
+    assertThat(returnedMembers).allSatisfy(bytes -> assertThat(new String(bytes)).startsWith("9"));
+  }
 
-    ScanResult<byte[]> result =
-        jedis.sscan("a".getBytes(), "0".getBytes(), scanParams);
+  @Test
+  public void givenMultipleCountAndMatch_usesLastSpecified() {
+    Set<byte[]> initialMemberData = initializeThousandMemberByteSet();
+    // Choose a large COUNT to ensure that some matching members are returned
+    // There are 111 matching members in the set 0..999
+
+    ScanResult<byte[]> result = sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR,
+        "COUNT", "20",
+        "MATCH", "1*",
+        "COUNT", "950",
+        "MATCH", "9*");
+
+    List<byte[]> returnedMembers = result.getResult();
+    // We know that we must have found at least 61 matching members, given the size of COUNT and the
+    // number of matching members in the set
+    assertThat(returnedMembers.size()).isGreaterThanOrEqualTo(61);
+    assertThat(returnedMembers).isSubsetOf(initialMemberData);
+    assertThat(returnedMembers).allSatisfy(bytes -> assertThat(new String(bytes)).startsWith("9"));
+  }
 
-    assertThat(result.isCompleteIteration()).isTrue();
-    assertThat(result.getResult()).containsExactlyInAnyOrder("1".getBytes(),
-        "12".getBytes());
+  @Test
+  public void givenNonMatchingPattern_returnsEmptyResult() {
+    jedis.sadd(KEY, "cat dog elk");
+    ScanParams scanParams = new ScanParams().match("*fish*");
+
+    ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
+
+    assertThat(result.getResult()).isEmpty();
   }
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void givenMultipleMatches_returnsMembersMatchingLastMatchParameter() {
-    jedis.sadd("a", "1", "12", "3");
+  public void should_notReturnValue_givenValueWasRemovedBeforeSscanIsCalled() {
+    initializeThreeMemberSet();
+    jedis.srem(KEY, MEMBER_THREE);
+    GeodeAwaitility.await().untilAsserted(
+        () -> assertThat(jedis.sismember(KEY, MEMBER_THREE)).isFalse());
 
-    List<Object> result = (List<Object>) jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0",
-        "MATCH", "3*", "MATCH", "1*");
+    ScanResult<String> result = jedis.sscan(KEY, ZERO_CURSOR);
 
-    assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
-    assertThat((List<byte[]>) result.get(1)).containsExactlyInAnyOrder("1".getBytes(),
-        "12".getBytes());
+    assertThat(result.getResult()).doesNotContain(MEMBER_THREE);
   }
 
   @Test
-  public void givenMatchAndCount_returnsAllMembersWithoutDuplicates() {
-    jedis.sadd("a", "1", "12", "3");
+  public void should_notErrorGivenNonzeroCursorOnFirstCall() {
+    initializeThreeMemberSet();
+    assertThatNoException().isThrownBy(() -> jedis.sscan(KEY, "5"));
+  }
 
-    ScanParams scanParams = new ScanParams();
-    scanParams.count(1);
-    scanParams.match("1*");
-    ScanResult<byte[]> result;
-    List<byte[]> allMembersFromScan = new ArrayList<>();
-    String cursor = "0";
+  @Test
+  public void should_notErrorGivenCountEqualToIntegerMaxValue() {
+    Set<byte[]> set = initializeThreeMemberByteSet();
+    ScanParams scanParams = new ScanParams().count(Integer.MAX_VALUE);
 
-    do {
-      result = jedis.sscan("a".getBytes(), cursor.getBytes(), scanParams);
-      allMembersFromScan.addAll(result.getResult());
-      cursor = result.getCursor();
-    } while (!result.isCompleteIteration());
+    ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
 
-    assertThat(allMembersFromScan).containsExactlyInAnyOrder("1".getBytes(),
-        "12".getBytes());
+    assertThat(result.getResult())
+        .containsExactlyInAnyOrderElementsOf(set);
   }
 
   @Test
-  @SuppressWarnings("unchecked")
-  public void givenMultipleCountsAndMatches_returnsAllEntriesWithoutDuplicates() {
-    jedis.sadd("a", "1", "12", "3");
+  public void should_notErrorGivenCountGreaterThanIntegerMaxValue() {
+    initializeThreeMemberByteSet();
+    String greaterThanInt = String.valueOf(2L * Integer.MAX_VALUE);
+
+    ScanResult<byte[]> result =
+        sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR, "COUNT", greaterThanInt);
 
-    List<Object> result;
-    List<byte[]> allEntries = new ArrayList<>();
-    String cursor = "0";
+    assertThat(result.getCursor()).isEqualTo(ZERO_CURSOR);
+    assertThat(result.getResult()).containsExactlyInAnyOrder(
+        MEMBER_ONE.getBytes(),
+        MEMBER_TWELVE.getBytes(),
+        MEMBER_THREE.getBytes());
+  }
 
-    do {
-      result =
-          (List<Object>) jedis.sendCommand("a", Protocol.Command.SSCAN, "a", cursor, "COUNT", "37",
-              "MATCH", "3*", "COUNT", "2", "COUNT", "1", "MATCH", "1*");
-      allEntries.addAll((List<byte[]>) result.get(1));
-      cursor = new String((byte[]) result.get(0));
-    } while (!Arrays.equals((byte[]) result.get(0), "0".getBytes()));
+  /**** Concurrency ***/
 
-    assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
-    assertThat(allEntries).containsExactlyInAnyOrder("1".getBytes(),
-        "12".getBytes());
+  @Test
+  public void should_returnAllConsistentlyPresentMembers_givenConcurrentThreadsAddingAndRemovingMembers() {
+    final Set<String> initialMemberData = initializeThousandMemberSet();
+    final int iterationCount = 500;
+    Jedis jedis1 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+    Jedis jedis2 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+
+    new ConcurrentLoopingThreads(iterationCount,
+        (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis1, initialMemberData),
+        (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis2, initialMemberData),
+        (i) -> {
+          String member = "new_" + BASE_MEMBER_NAME + i;
+          jedis.sadd(KEY, member);
+          jedis.srem(KEY, member);
+        }).run();
+
+    jedis1.close();
+    jedis2.close();
   }
 
   @Test
-  public void givenNegativeCursor_returnsMembersUsingAbsoluteValueOfCursor() {
-    jedis.sadd("b", "green", "orange", "yellow");
+  public void should_notAlterUnderlyingData_givenMultipleConcurrentSscans() {
+    final Set<String> initialMemberData = initializeThousandMemberSet();
+    jedis.sadd(KEY, initialMemberData.toArray(new String[0]));
+    final int iterationCount = 500;
+    Jedis jedis1 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+    Jedis jedis2 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+
+    new ConcurrentLoopingThreads(iterationCount,
+        (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis1, initialMemberData),
+        (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis2, initialMemberData))
+            .run();
+    assertThat(jedis.smembers(KEY)).containsExactlyInAnyOrderElementsOf(initialMemberData);
+
+    jedis1.close();
+    jedis2.close();
+  }
 
+  private void multipleSScanAndAssertOnContentOfResultSet(int iteration, Jedis jedis,
+      final Set<String> initialMemberData) {
     List<String> allEntries = new ArrayList<>();
-
-    String cursor = "-100";
+    String cursor = ZERO_CURSOR;
     ScanResult<String> result;
+
     do {
-      result = jedis.sscan("b", cursor);
-      allEntries.addAll(result.getResult());
+      result = jedis.sscan(KEY, cursor);
       cursor = result.getCursor();
+      List<String> resultEntries = result.getResult();
+      allEntries.addAll(resultEntries);
     } while (!result.isCompleteIteration());
 
-    assertThat(allEntries).containsExactlyInAnyOrder("green", "orange", "yellow");
+    assertThat(allEntries).as("failed on iteration " + iteration)
+        .containsAll(initialMemberData);
   }
 
-  @Test
-  public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.sscan("a", "18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  @SuppressWarnings("unchecked")
+  private ScanResult<byte[]> sendCustomSscanCommand(String key, String... args) {
+    List<Object> result = (List<Object>) (jedis.sendCommand(key, Protocol.Command.SSCAN, args));
+    return new ScanResult<>((byte[]) result.get(0), (List<byte[]>) result.get(1));
   }
 
-  @Test
-  public void givenNegativeCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
-    assertThatThrownBy(() -> jedis.sscan("a", "-18446744073709551616"))
-        .hasMessageContaining(ERROR_CURSOR);
+  private void initializeThreeMemberSet() {
+    jedis.sadd(KEY, MEMBER_ONE, MEMBER_TWELVE, MEMBER_THREE);
   }
 
-  @Test
-  public void givenInvalidRegexSyntax_returnsEmptyArray() {
-    jedis.sadd("a", "1");
-    ScanParams scanParams = new ScanParams();
-    scanParams.count(1);
-    scanParams.match("\\p");
+  private Set<byte[]> initializeThreeMemberByteSet() {
+    Set<byte[]> set = new HashSet<>();
+    set.add(MEMBER_ONE.getBytes());
+    set.add(MEMBER_TWELVE.getBytes());
+    set.add(MEMBER_THREE.getBytes());
+    jedis.sadd(KEY_BYTES, MEMBER_ONE.getBytes(), MEMBER_TWELVE.getBytes(),
+        MEMBER_THREE.getBytes());
+    return set;
+  }
 
-    ScanResult<byte[]> result =
-        jedis.sscan("a".getBytes(), "0".getBytes(), scanParams);
+  private Set<String> initializeThousandMemberSet() {
+    Set<String> set = new HashSet<>();
+    int sizeOfSet = 1000;
+    for (int i = 0; i < sizeOfSet; i++) {
+      set.add((BASE_MEMBER_NAME + i));
+    }
+    jedis.sadd(KEY, set.toArray(new String[0]));
+    return set;
+  }
 
-    assertThat(result.getResult()).isEmpty();
+  private Set<byte[]> initializeThousandMemberByteSet() {
+    Set<byte[]> set = new HashSet<>();
+    int sizeOfSet = 1000;
+    for (int i = 0; i < sizeOfSet; i++) {
+      byte[] memberToAdd = Integer.toString(i).getBytes();
+      set.add(memberToAdd);
+      jedis.sadd(KEY_BYTES, memberToAdd);
+    }
+    return set;
   }
+
 }
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java
index 2e552d9..6ce3c89 100755
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java
@@ -14,15 +14,14 @@
  */
 package org.apache.geode.redis.internal.commands.executor.set;
 
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.math.BigInteger;
 
 import org.junit.ClassRule;
 import org.junit.Test;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
 
 import org.apache.geode.redis.GeodeRedisServerRule;
 
@@ -37,21 +36,30 @@ public class SScanIntegrationTest extends AbstractSScanIntegrationTest {
   }
 
   @Test
-  public void givenDifferentCursorThanSpecifiedByPreviousSscan_returnsAllMembers() {
-    List<byte[]> memberList = new ArrayList<>();
-    for (int i = 0; i < 10; i++) {
-      jedis.sadd("a", String.valueOf(i));
-      memberList.add(String.valueOf(i).getBytes());
-    }
-
-    ScanParams scanParams = new ScanParams();
-    scanParams.count(5);
-    ScanResult<byte[]> result =
-        jedis.sscan("a".getBytes(), "0".getBytes(), scanParams);
-    assertThat(result.isCompleteIteration()).isFalse();
-
-    result = jedis.sscan("a".getBytes(), "100".getBytes());
-
-    assertThat(result.getResult()).containsExactlyInAnyOrderElementsOf(memberList);
+  public void givenCursorGreaterThanSignedLongMaxValue_returnsCursorError() {
+    assertThatThrownBy(
+        () -> jedis.sscan(KEY, SIGNED_LONG_MAX.add(BigInteger.ONE).toString()))
+            .hasMessageContaining(ERROR_CURSOR);
+  }
+
+  @Test
+  public void givenNegativeCursorLessThanSignedLongMinValue_returnsCursorError() {
+    assertThatThrownBy(
+        () -> jedis.sscan(KEY, SIGNED_LONG_MIN.subtract(BigInteger.ONE).toString()))
+            .hasMessageContaining(ERROR_CURSOR);
+  }
+
+  @Test
+  public void givenCursorEqualToSignedLongMinValue_doesNotError() {
+    jedis.sadd(KEY, "1");
+    assertThatNoException()
+        .isThrownBy(() -> jedis.sscan(KEY, SIGNED_LONG_MAX.toString()));
+  }
+
+  @Test
+  public void givenNegativeCursorEqualToSignedLongMinValue_doesNotError() {
+    jedis.sadd(KEY, "1");
+    assertThatNoException()
+        .isThrownBy(() -> jedis.sscan(KEY, SIGNED_LONG_MIN.toString()));
   }
 }
diff --git a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 3d87db1..a51c9a0 100644
--- a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -1,7 +1,7 @@
 org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction
 org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap
 org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor
-org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet
+org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor
 org/apache/geode/redis/internal/data/RedisCrossSlotException
 org/apache/geode/redis/internal/data/RedisDataMovedException
 org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
index c11155b..856ea9a 100755
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
@@ -255,6 +255,8 @@ public enum RedisCommandType {
   SRANDMEMBER(new SRandMemberExecutor(), SUPPORTED,
       new Parameter().min(2).max(3, ERROR_SYNTAX).flags(READONLY, RANDOM)),
   SREM(new SRemExecutor(), SUPPORTED, new Parameter().min(3).flags(WRITE, FAST)),
+  SSCAN(new SScanExecutor(), SUPPORTED, new Parameter().min(3).flags(READONLY, RANDOM),
+      new Parameter().odd(ERROR_SYNTAX)),
   SUNION(new SUnionExecutor(), SUPPORTED,
       new Parameter().min(2).lastKey(-1).flags(READONLY, SORT_FOR_SCRIPT)),
   SUNIONSTORE(new SUnionStoreExecutor(), SUPPORTED,
@@ -357,8 +359,6 @@ public enum RedisCommandType {
 
   SPOP(new SPopExecutor(), UNSUPPORTED,
       new Parameter().min(2).max(3, ERROR_SYNTAX).flags(WRITE, RANDOM, FAST)),
-  SSCAN(new SScanExecutor(), UNSUPPORTED, new Parameter().min(3).flags(READONLY, RANDOM),
-      new Parameter().odd(ERROR_SYNTAX).firstKey(0)),
 
   /*************** Server ****************/
 
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java
index feb9914..e6df7b7 100755
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java
@@ -15,117 +15,29 @@
 package org.apache.geode.redis.internal.commands.executor.set;
 
 
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
 import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_SET;
-import static org.apache.geode.redis.internal.netty.Coder.bytesToLong;
-import static org.apache.geode.redis.internal.netty.Coder.bytesToString;
-import static org.apache.geode.redis.internal.netty.Coder.equalsIgnoreCaseBytes;
-import static org.apache.geode.redis.internal.netty.Coder.narrowLongToInt;
-import static org.apache.geode.redis.internal.netty.StringBytesGlossary.COUNT;
-import static org.apache.geode.redis.internal.netty.StringBytesGlossary.MATCH;
 
-import java.math.BigInteger;
 import java.util.List;
 
 import org.apache.commons.lang3.tuple.Pair;
 
-import org.apache.geode.redis.internal.commands.Command;
 import org.apache.geode.redis.internal.commands.executor.GlobPattern;
-import org.apache.geode.redis.internal.commands.executor.RedisResponse;
 import org.apache.geode.redis.internal.commands.executor.key.AbstractScanExecutor;
-import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisDataType;
-import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
 import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
 
 public class SScanExecutor extends AbstractScanExecutor {
-  private static final BigInteger UNSIGNED_LONG_CAPACITY = new BigInteger("18446744073709551615");
 
   @Override
-  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
-    List<byte[]> commandElems = command.getProcessedCommand();
-
-    String cursorString = bytesToString(commandElems.get(2));
-    BigInteger cursor;
-    byte[] globPattern = null;
-    int count = DEFAULT_COUNT;
-
-    try {
-      cursor = new BigInteger(cursorString).abs();
-    } catch (NumberFormatException e) {
-      return RedisResponse.error(ERROR_CURSOR);
-    }
-
-    if (cursor.compareTo(UNSIGNED_LONG_CAPACITY) > 0) {
-      return RedisResponse.error(ERROR_CURSOR);
-    }
-
-    RedisKey key = command.getKey();
-
-    RedisData value = context.getRegionProvider().getRedisData(key);
-    if (value.isNull()) {
-      context.getRedisStats().incKeyspaceMisses();
-      return RedisResponse.emptyScan();
-    }
-
-    if (value.getType() != REDIS_SET) {
-      throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
-    }
-
-    command.getCommandType().checkDeferredParameters(command, context);
-
-    if (!cursor.equals(context.getSscanCursor())) {
-      cursor = new BigInteger("0");
-    }
-
-    for (int i = 3; i < commandElems.size(); i = i + 2) {
-      byte[] commandElemBytes = commandElems.get(i);
-      if (equalsIgnoreCaseBytes(commandElemBytes, MATCH)) {
-        commandElemBytes = commandElems.get(i + 1);
-        globPattern = commandElemBytes;
-
-      } else if (equalsIgnoreCaseBytes(commandElemBytes, COUNT)) {
-        commandElemBytes = commandElems.get(i + 1);
-        try {
-          count = narrowLongToInt(bytesToLong(commandElemBytes));
-        } catch (NumberFormatException e) {
-          return RedisResponse.error(ERROR_NOT_INTEGER);
-        }
-
-        if (count < 1) {
-          return RedisResponse.error(ERROR_SYNTAX);
-        }
-
-      } else {
-        return RedisResponse.error(ERROR_SYNTAX);
-      }
-    }
-
-    GlobPattern pattern = convertGlobToRegex(globPattern);
-    int lCount = count;
-    BigInteger lCursor = cursor;
-    Pair<BigInteger, List<Object>> scanResult =
-        context.setLockedExecute(key, true,
-            set -> set.sscan(pattern, lCount, lCursor));
-
-    context.setSscanCursor(scanResult.getLeft());
-
-    return RedisResponse.scan(scanResult.getLeft().intValue(), scanResult.getRight());
-  }
-
-  // TODO: When SSCAN is supported, refactor to use these methods and not override executeCommand()
-  @Override
   protected Pair<Integer, List<byte[]>> executeScan(ExecutionHandlerContext context, RedisKey key,
       GlobPattern pattern, int count, int cursor) {
-    return null;
+    return context.setLockedExecute(key, true,
+        set -> set.sscan(pattern, count, cursor));
   }
 
   @Override
   protected RedisDataType getDataType() {
-    return null;
+    return REDIS_SET;
   }
 }
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
index 04654da..3fd6352 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
@@ -24,7 +24,6 @@ import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_SET;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -45,7 +44,7 @@ import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
 import org.apache.geode.redis.internal.commands.executor.GlobPattern;
-import org.apache.geode.redis.internal.data.collections.SizeableObjectOpenCustomHashSet;
+import org.apache.geode.redis.internal.data.collections.SizeableObjectOpenCustomHashSetWithCursor;
 import org.apache.geode.redis.internal.data.delta.AddByteArrays;
 import org.apache.geode.redis.internal.data.delta.RemoveByteArrays;
 import org.apache.geode.redis.internal.data.delta.ReplaceByteArrays;
@@ -207,42 +206,26 @@ public class RedisSet extends AbstractRedisData {
     return result.size();
   }
 
-  public Pair<BigInteger, List<Object>> sscan(GlobPattern matchPattern, int count,
-      BigInteger cursor) {
-    List<Object> returnList = new ArrayList<>();
-    int size = members.size();
-    BigInteger beforeCursor = new BigInteger("0");
-    int numElements = 0;
-    int i = -1;
-    for (byte[] value : members) {
-      i++;
-      if (beforeCursor.compareTo(cursor) < 0) {
-        beforeCursor = beforeCursor.add(new BigInteger("1"));
-        continue;
-      }
+  public Pair<Integer, List<byte[]>> sscan(GlobPattern matchPattern, int count,
+      int cursor) {
+    int maximumCapacity = Math.min(count, scard() + 1);
+    List<byte[]> resultList = new ArrayList<>(maximumCapacity);
 
-      if (matchPattern != null) {
-        if (matchPattern.matches(value)) {
-          returnList.add(value);
-          numElements++;
-        }
-      } else {
-        returnList.add(value);
-        numElements++;
-      }
+    cursor = members.scan(cursor, count,
+        (list, key) -> addIfMatching(matchPattern, list, key), resultList);
 
-      if (numElements == count) {
-        break;
-      }
-    }
+    return new ImmutablePair<>(cursor, resultList);
+
+  }
 
-    Pair<BigInteger, List<Object>> scanResult;
-    if (i >= size - 1) {
-      scanResult = new ImmutablePair<>(new BigInteger("0"), returnList);
+  private void addIfMatching(GlobPattern matchPattern, List<byte[]> list, byte[] key) {
+    if (matchPattern != null) {
+      if (matchPattern.matches(key)) {
+        list.add(key);
+      }
     } else {
-      scanResult = new ImmutablePair<>(new BigInteger(String.valueOf(i + 1)), returnList);
+      list.add(key);
     }
-    return scanResult;
   }
 
   public Collection<byte[]> spop(Region<RedisKey, RedisData> region, RedisKey key, int popCount) {
@@ -495,7 +478,7 @@ public class RedisSet extends AbstractRedisData {
     return REDIS_SET_OVERHEAD + members.getSizeInBytes();
   }
 
-  public static class MemberSet extends SizeableObjectOpenCustomHashSet<byte[]> {
+  public static class MemberSet extends SizeableObjectOpenCustomHashSetWithCursor<byte[]> {
     public MemberSet() {
       super(ByteArrays.HASH_STRATEGY);
     }
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet.java
deleted file mode 100644
index 1d57d8f..0000000
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.redis.internal.data.collections;
-
-
-import static org.apache.geode.internal.JvmSizeUtils.memoryOverhead;
-
-import java.util.Collection;
-
-import it.unimi.dsi.fastutil.objects.ObjectOpenCustomHashSet;
-
-import org.apache.geode.internal.size.Sizeable;
-
-public abstract class SizeableObjectOpenCustomHashSet<K> extends ObjectOpenCustomHashSet<K>
-    implements Sizeable {
-  private static final long serialVersionUID = 9174920505089089517L;
-  private static final int OPEN_HASH_SET_OVERHEAD =
-      memoryOverhead(SizeableObjectOpenCustomHashSet.class);
-
-  private int memberOverhead;
-
-  public SizeableObjectOpenCustomHashSet(int expected, Strategy<? super K> strategy) {
-    super(expected, strategy);
-  }
-
-  public SizeableObjectOpenCustomHashSet(Strategy<? super K> strategy) {
-    super(strategy);
-  }
-
-  public SizeableObjectOpenCustomHashSet(Collection<? extends K> c, Strategy<? super K> strategy) {
-    super(c, strategy);
-  }
-
-  @Override
-  public boolean add(K k) {
-    boolean added = super.add(k);
-    if (added) {
-      memberOverhead += sizeElement(k);
-    }
-    return added;
-  }
-
-  @SuppressWarnings("unchecked")
-  @Override
-  public boolean remove(Object k) {
-    boolean removed = super.remove(k);
-    if (removed) {
-      memberOverhead -= sizeElement((K) k);
-    }
-    return removed;
-  }
-
-  public K getFromBackingArray(final int pos) {
-    return key[pos];
-  }
-
-  public int getBackingArrayLength() {
-    return key.length;
-  }
-
-  @Override
-  public int getSizeInBytes() {
-    // The object referenced by the "strategy" field is not sized
-    // since it is usually a singleton instance.
-    return OPEN_HASH_SET_OVERHEAD + memoryOverhead(key) + memberOverhead;
-  }
-
-  protected abstract int sizeElement(K element);
-}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor.java
new file mode 100644
index 0000000..dfb077b
--- /dev/null
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.data.collections;
+
+
+import static it.unimi.dsi.fastutil.HashCommon.mix;
+import static org.apache.geode.internal.JvmSizeUtils.memoryOverhead;
+
+import java.util.Collection;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenCustomHashSet;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.size.Sizeable;
+
+public abstract class SizeableObjectOpenCustomHashSetWithCursor<E>
+    extends ObjectOpenCustomHashSet<E>
+    implements Sizeable {
+  private static final long serialVersionUID = 9174920505089089517L;
+  private static final int OPEN_HASH_SET_OVERHEAD =
+      memoryOverhead(SizeableObjectOpenCustomHashSetWithCursor.class);
+
+  private int memberOverhead;
+
+  public SizeableObjectOpenCustomHashSetWithCursor(int expected, Strategy<? super E> strategy) {
+    super(expected, strategy);
+  }
+
+  public SizeableObjectOpenCustomHashSetWithCursor(Strategy<? super E> strategy) {
+    super(strategy);
+  }
+
+  public SizeableObjectOpenCustomHashSetWithCursor(Collection<? extends E> c,
+      Strategy<? super E> strategy) {
+    super(c, strategy);
+  }
+
+  @Override
+  public boolean add(E e) {
+    boolean added = super.add(e);
+    if (added) {
+      memberOverhead += sizeElement(e);
+    }
+    return added;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean remove(Object e) {
+    boolean removed = super.remove(e);
+    if (removed) {
+      memberOverhead -= sizeElement((E) e);
+    }
+    return removed;
+  }
+
+  public E getFromBackingArray(final int pos) {
+    return key[pos];
+  }
+
+  public int getBackingArrayLength() {
+    return key.length;
+  }
+
+  @Override
+  public int getSizeInBytes() {
+    // The object referenced by the "strategy" field is not sized
+    // since it is usually a singleton instance.
+    return OPEN_HASH_SET_OVERHEAD + memoryOverhead(key) + memberOverhead;
+  }
+
+  /**
+   * Scan entries and pass them to the given consumer function, starting at the passed in
+   * cursor. This method will scan until at least count entries are returned, or the entire
+   * set has been scanned. Once the returned cursor is 0, the entire set is scanned.
+   *
+   * This method may emit more than *count* number of elements if there are hash collisions.
+   *
+   * @param cursor The cursor to start from. Should be 0 for the initial scan. Subsequent calls
+   *        should use the cursor returned by the previous scan call.
+   * @param count The number of elements to scan
+   * @param consumer A function to pass the scanned members
+   * @param privateData Some data to pass to the function, for example a set to collect elements in.
+   *        This
+   *        allows the function to be stateless.
+   * @param <D> The type of the data passed to the function/
+   * @return The next cursor to scan from, or 0 if the scan has touched all elements.
+   */
+  public <D> int scan(int cursor, int count,
+      SizeableObjectOpenCustomHashSetWithCursor.EntryConsumer<E, D> consumer, D privateData) {
+    // Implementation notes
+    //
+    // This stateless scan cursor algorithm is based on the dictScan cursor
+    // implementation from dict.c in redis. Please see the comments in that class for the full
+    // details. That iteration algorithm was designed by Pieter Noordhuis.
+    //
+    // There is one wrinkle due to the fact that we are using a different type of hashtable here.
+    // The parent class, ObjectOpenCustomHashSet, uses an open addressing with a linear
+    // probe. What that means is that when there is a hash collision, instead of putting
+    // a linked list of hash entries into a single hash bucket, this implementation simply
+    // moves on to the next element to the right in the array and tries to put the inserted
+    // object there, continuing until it finds a null slot.
+    //
+    // So in order to use the redis cursor algorithm, our scan needs to probe ahead to
+    // subsequent positions to find any hash entries that match the position we are scanning.
+    // This is logically equivalent to iterating over the linked list in a hashtable bucket
+    // for a redis style closed addressing hashtable.
+    //
+
+    do {
+      // Emit all the entries at the cursor. This means looking forward in the hash
+      // table for any non-null entries that might hash to the current cursor and emitting
+      // those as well. This may even wrap around to the front of the hashtable.
+      int position = cursor;
+      while (key[position & mask] != null) {
+        E currentElement = key[position & mask];
+        if (elementHashesTo(currentElement, position, cursor & mask)) {
+          consumer.consume(privateData, currentElement);
+          count--;
+        }
+        position++;
+      }
+
+      // Increment the reversed cursor
+      cursor |= ~mask;
+      cursor = rev(cursor);
+      cursor++;
+      cursor = rev(cursor);
+
+
+    } while (count > 0 && cursor != 0);
+
+    return cursor;
+  }
+
+  /**
+   * reverse the bits in a cursor.
+   *
+   * Package scope to allow for unit testing to make sure we don't have some silly
+   * java signed int issues
+   *
+   * @param value the value to reverse
+   * @return the reversed bits.
+   */
+  static int rev(int value) {
+    // This implementation is also based on dict.c from redis, which was originally from
+    // http://graphics.stanford.edu/~seander/bithacks.html#ReverseParallel
+    int s = 32;
+    int mask = ~0;
+    while ((s >>>= 1) > 0) {
+      mask ^= (mask << s);
+      value = ((value >>> s) & mask) | ((value << s) & ~mask);
+    }
+    return value;
+  }
+
+  public interface EntryConsumer<E, D> {
+    void consume(D privateData, E element);
+  }
+
+  /**
+   * Check to see if given element hashes to the expected hash.
+   *
+   * @param currentElement The element to key
+   * @param currentPosition The position of the element in the element[] array
+   * @param expectedHash - the expected hash of the element.
+   */
+  private boolean elementHashesTo(E currentElement, int currentPosition, int expectedHash) {
+    // There is a small optimization here. If the previous element
+    // is null, we know that the element at position does hash to the expected
+    // hash because it is not here as a result of a collision at some previous position.
+    E previousElement = key[(currentPosition - 1) & mask];
+    return previousElement == null || hash(currentElement) == expectedHash;
+  }
+
+  @VisibleForTesting
+  public int hash(E element) {
+    return mix(strategy().hashCode(element)) & mask;
+  }
+
+  protected abstract int sizeElement(E element);
+}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 000adf5..5177fea 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -103,7 +103,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
   private final DistributedMember member;
   private final RedisSecurityService securityService;
   private BigInteger scanCursor;
-  private BigInteger sscanCursor;
   private final AtomicBoolean channelInactive = new AtomicBoolean();
   private final ChannelId channelId;
 
@@ -133,7 +132,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     this.member = member;
     this.securityService = securityService;
     this.scanCursor = new BigInteger("0");
-    this.sscanCursor = new BigInteger("0");
     this.channelId = channel.id();
     redisStats.addClient();
 
@@ -385,14 +383,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
     this.scanCursor = scanCursor;
   }
 
-  public BigInteger getSscanCursor() {
-    return sscanCursor;
-  }
-
-  public void setSscanCursor(BigInteger sscanCursor) {
-    this.sscanCursor = sscanCursor;
-  }
-
   public String getMemberName() {
     return member.getUniqueId();
   }
diff --git a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java
index 0d4a722..cd68c06 100644
--- a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java
+++ b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java
@@ -223,9 +223,9 @@ public class SizeableBytes2ObjectOpenCustomHashMapWithCursorTest {
 
   @Test
   public void scanWithShrinkingTable_DoesNotMissElements() {
-    final int MAP_SIZE = 500;
-    Bytes2StringMap map = new Bytes2StringMap(MAP_SIZE * 2); // *2 to prevent rehash
-    fillMapWithUniqueHashKeys(map, MAP_SIZE);
+    final int initialMapSize = 500;
+    Bytes2StringMap map = new Bytes2StringMap(1); // 1 to ensure resizing back down
+    fillMapWithUniqueHashKeys(map, initialMapSize);
     Map<byte[], String> scanned = new Object2ObjectOpenCustomHashMap<>(ByteArrays.HASH_STRATEGY);
 
     int cursor = map.scan(0, 50, Map::put, scanned);
@@ -234,14 +234,14 @@ public class SizeableBytes2ObjectOpenCustomHashMapWithCursorTest {
     // Remove a lot of elements to trigger a resize
     // Remove some of the elements
     Iterator<Map.Entry<byte[], String>> iterator = map.entrySet().iterator();
-    int removeCount = MAP_SIZE - 100;
+    int removeCount = initialMapSize - 100;
     while (removeCount > 0 && iterator.hasNext()) {
       iterator.next();
       iterator.remove();
       removeCount--;
     }
 
-    cursor = map.scan(cursor, MAP_SIZE, Map::put, scanned);
+    cursor = map.scan(cursor, initialMapSize, Map::put, scanned);
     assertThat(cursor).isEqualTo(0);
 
     // Scan should at least have all of the remaining keys
diff --git a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetTest.java b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetTest.java
deleted file mode 100644
index a39fffa..0000000
--- a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.redis.internal.data.collections;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import it.unimi.dsi.fastutil.bytes.ByteArrays;
-import org.junit.Test;
-
-import org.apache.geode.cache.util.ObjectSizer;
-import org.apache.geode.internal.size.ReflectionObjectSizer;
-import org.apache.geode.redis.internal.data.RedisSet;
-
-public class SizeableObjectOpenCustomHashSetTest {
-  private final ObjectSizer sizer = ReflectionObjectSizer.getInstance();
-
-  private int expectedSize(RedisSet.MemberSet set) {
-    return sizer.sizeof(set) - sizer.sizeof(ByteArrays.HASH_STRATEGY);
-  }
-
-  @Test
-  public void getSizeInBytesIsAccurateForByteArrays() {
-    List<byte[]> initialElements = new ArrayList<>();
-    int initialNumberOfElements = 20;
-    int elementsToAdd = 100;
-    for (int i = 0; i < initialNumberOfElements; ++i) {
-      initialElements.add(new byte[] {(byte) i});
-    }
-    // Create a set with an initial size and confirm that it correctly reports its size
-    RedisSet.MemberSet set = new RedisSet.MemberSet(initialElements);
-    assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
-
-    // Add enough members to force a resize and assert that the size is correct after each add
-    for (int i = initialNumberOfElements; i < initialNumberOfElements + elementsToAdd; ++i) {
-      set.add(new byte[] {(byte) i});
-      assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
-    }
-    assertThat(set.size()).isEqualTo(initialNumberOfElements + elementsToAdd);
-
-    // Remove all the members and assert that the size is correct after each remove
-    for (int i = 0; i < initialNumberOfElements + elementsToAdd; ++i) {
-      set.remove(new byte[] {(byte) i});
-      assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
-    }
-    assertThat(set.size()).isEqualTo(0);
-  }
-}
diff --git a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursorTest.java b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursorTest.java
new file mode 100644
index 0000000..a02788d
--- /dev/null
+++ b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursorTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.data.collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import org.junit.Test;
+
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.internal.size.ReflectionObjectSizer;
+import org.apache.geode.redis.internal.data.RedisSet;
+
+public class SizeableObjectOpenCustomHashSetWithCursorTest {
+  private static final int INITIAL_SIZE = 20;
+  private static final int SIZE_OF_NEW_ELEMENTS_TO_ADD = 100;
+  private final ObjectSizer sizer = ReflectionObjectSizer.getInstance();
+
+  private int expectedSize(RedisSet.MemberSet set) {
+    return sizer.sizeof(set) - sizer.sizeof(ByteArrays.HASH_STRATEGY);
+  }
+
+
+  private static class ByteSet
+      extends SizeableObjectOpenCustomHashSetWithCursor<byte[]> {
+    public ByteSet() {
+      super(ByteArrays.HASH_STRATEGY);
+    }
+
+    public ByteSet(int initialSize) {
+      super(initialSize, ByteArrays.HASH_STRATEGY);
+    }
+
+    @Override
+    protected int sizeElement(byte[] element) {
+      return 0;
+    }
+  }
+
+
+  private List<byte[]> initializeMemberSet() {
+    List<byte[]> initialElements = new ArrayList<>();
+    for (int i = 0; i < INITIAL_SIZE; ++i) {
+      initialElements.add(new byte[] {(byte) i});
+    }
+    return initialElements;
+  }
+
+  @Test
+  public void getSizeInBytesIsAccurateForByteArrays() {
+    RedisSet.MemberSet set = new RedisSet.MemberSet(initializeMemberSet());
+    assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
+  }
+
+  @Test
+  public void addMoreMembers_assertSetSizeIsAccurate() {
+    RedisSet.MemberSet set = new RedisSet.MemberSet(initializeMemberSet());
+    for (int i = INITIAL_SIZE; i < INITIAL_SIZE + SIZE_OF_NEW_ELEMENTS_TO_ADD; ++i) {
+      set.add(new byte[] {(byte) i});
+      assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
+    }
+    assertThat(set.size()).isEqualTo(INITIAL_SIZE + SIZE_OF_NEW_ELEMENTS_TO_ADD);
+  }
+
+  @Test
+  public void removeAllMembers_assertSetSizeIsAccurate() {
+    RedisSet.MemberSet set = new RedisSet.MemberSet(initializeMemberSet());
+    for (int i = 0; i < INITIAL_SIZE + SIZE_OF_NEW_ELEMENTS_TO_ADD; ++i) {
+      set.remove(new byte[] {(byte) i});
+      assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
+    }
+    assertThat(set.size()).isEqualTo(0);
+  }
+
+  @Test
+  public void revWorksWhenSignBitIsSet() {
+    assertThat(SizeableObjectOpenCustomHashSetWithCursor.rev(0xFF000000)).isEqualTo(0xFF);
+    assertThat(SizeableObjectOpenCustomHashSetWithCursor.rev(0xFF)).isEqualTo(0xFF000000);
+  }
+
+  @Test
+  public void scanEntireSet_ReturnsExpectedElements() {
+    ByteSet set = new ByteSet();
+    IntStream.range(0, 10).forEach(i -> set.add(makeKey(i)));
+
+    List<byte[]> scanned = new ArrayList<>();
+    int result = set.scan(0, 10000, List::add, scanned);
+    assertThat(result).isZero();
+    assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+  }
+
+  private static byte[] makeKey(Integer i) {
+    return i.toString().getBytes();
+  }
+
+  private void fillSetWithUniqueHashKeys(ByteSet set, int keysToAdd) {
+    int keyCounter = 0;
+    Set<Integer> hashesAdded = new HashSet<>();
+    while (keysToAdd > 0) {
+      byte[] key = makeKey(keyCounter);
+      int keyHash = set.hash(key);
+      if (!hashesAdded.contains(keyHash)) {
+        hashesAdded.add(keyHash);
+        set.add(key);
+        keysToAdd--;
+      }
+      keyCounter++;
+    }
+  }
+
+  private void fillSetWithCollidingHashKeys(ByteSet set, int keysToAdd) {
+    int keyCounter = 0;
+    Set<Integer> hashesAdded = new HashSet<>();
+    while (keysToAdd > 0) {
+      byte[] key = makeKey(keyCounter);
+      int keyHash = set.hash(key);
+      if (hashesAdded.isEmpty() || hashesAdded.contains(keyHash)) {
+        hashesAdded.add(keyHash);
+        set.add(key);
+        keysToAdd--;
+      }
+      keyCounter++;
+    }
+  }
+
+  @Test
+  public void twoScansWithNoModifications_ReturnsExpectedElements() {
+    final int setSize = 10;
+    ByteSet set = new ByteSet(setSize * 2); // *2 to prevent rehash
+    fillSetWithUniqueHashKeys(set, setSize);
+    List<byte[]> scanned = new ArrayList<>();
+    int scanSize = 1 + set.size() / 2;
+    // Scan part way through the set
+    int cursor = set.scan(0, scanSize, List::add, scanned);
+    assertThat(scanned).hasSize(scanSize);
+
+    // Scan past the end of the set
+    cursor = set.scan(cursor, scanSize, List::add, scanned);
+    assertThat(scanned).hasSize(set.size());
+    assertThat(cursor).isEqualTo(0);
+
+    assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+  }
+
+  @Test
+  public void scanWithConcurrentRemoves_ReturnsExpectedElements() {
+    final int initialSetSize = 10;
+    ByteSet set = new ByteSet(
+        initialSetSize * 2); // *2 to prevent rehash
+    fillSetWithUniqueHashKeys(set, initialSetSize);
+    List<byte[]> scanned = new ArrayList<>();
+    int cursor = set.scan(0, initialSetSize / 2, List::add, scanned);
+    assertThat(scanned).hasSize(initialSetSize / 2);
+
+    // Remove some elements
+    ObjectIterator<byte[]> iterator = set.iterator();
+    int removeCount = initialSetSize / 2 - 1;
+    while (removeCount > 0 && iterator.hasNext()) {
+      iterator.next();
+      iterator.remove();
+      removeCount--;
+    }
+
+    cursor = set.scan(cursor, initialSetSize / 2, List::add, scanned);
+    assertThat(cursor).isZero();
+
+    assertThat(scanned).containsAll(set);
+  }
+
+  @Test
+  public void scanWithHashcodeCollisions_ReturnsExpectedElements() {
+    final int setSize = 10;
+    ByteSet set = new ByteSet(
+        setSize * 2); // *2 to prevent rehash
+    fillSetWithCollidingHashKeys(set, setSize);
+    List<byte[]> scanned = new ArrayList<>();
+    int cursor = set.scan(0, 1, List::add, scanned);
+
+    // The scan had to ignore the count and return all the elements with the same hash
+    assertThat(scanned).hasSize(setSize);
+    assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+    cursor = set.scan(cursor, 1, List::add, scanned);
+    assertThat(cursor).isZero();
+    assertThat(scanned).hasSize(setSize);
+    assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+  }
+
+  @Test
+  public void scanWithHashcodeCollisionsAndConcurrentRemoves_ReturnsExpectedElements() {
+    final int initialSetSize = 10;
+    ByteSet set = new ByteSet(
+        initialSetSize * 2); // *2 to prevent rehash
+    fillSetWithCollidingHashKeys(set, initialSetSize);
+    List<byte[]> scanned = new ArrayList<>();
+
+    int cursor = set.scan(0, initialSetSize / 2, List::add, scanned);
+    assertThat(scanned).hasSize(initialSetSize);
+
+    // Remove some elements
+    ObjectIterator<byte[]> iterator = set.iterator();
+    int removeCount = initialSetSize / 2 - 1;
+    while (removeCount > 0 && iterator.hasNext()) {
+      iterator.next();
+      iterator.remove();
+      removeCount--;
+    }
+
+    cursor = set.scan(cursor, initialSetSize / 2, List::add, scanned);
+
+    assertThat(cursor).isZero();
+    assertThat(scanned).hasSize(initialSetSize);
+  }
+
+  @Test
+  public void scanWithGrowingTable_DoesNotMissElements() {
+    final int initialSetSize = 10;
+    ByteSet set =
+        new ByteSet(initialSetSize * 2); // *2 to prevent rehash
+    fillSetWithUniqueHashKeys(set, initialSetSize);
+    List<byte[]> scanned = new ArrayList<>();
+    List<byte[]> initialKeys = new ArrayList<>(set.size());
+    initialKeys.addAll(set);
+
+    int cursor = set.scan(0, initialSetSize / 2, List::add, scanned);
+    assertThat(scanned).hasSize(initialSetSize / 2);
+
+    // Add a lot of elements to trigger a resize
+    IntStream.range(10, 500).forEach(i -> set.add(makeKey(i)));
+
+    cursor = set.scan(cursor, 500, List::add, scanned);
+    assertThat(cursor).isEqualTo(0);
+
+    // We don't know that we will have all the 500 new elements, only that
+    // we should have scanned all the original elements
+    assertThat(scanned).containsAll(initialKeys);
+  }
+
+  @Test
+  public void scanWithShrinkingTable_DoesNotMissElements() {
+    final int initialSetSize = 500;
+    ByteSet set = new ByteSet(1); // 1 to ensure resizing back down
+
+    fillSetWithUniqueHashKeys(set, initialSetSize);
+    List<byte[]> scanned = new ArrayList<>();
+    int cursor = set.scan(0, 50, List::add, scanned);
+    assertThat(scanned).hasSize(50);
+
+    // Remove a lot of elements to trigger a resize
+    // Remove some elements
+    ObjectIterator<byte[]> iterator = set.iterator();
+    int removeCount = initialSetSize - 100;
+    while (removeCount > 0 && iterator.hasNext()) {
+      iterator.next();
+      iterator.remove();
+      removeCount--;
+    }
+
+    cursor = set.scan(cursor, initialSetSize, List::add, scanned);
+    assertThat(cursor).isZero();
+
+    // Scan should at least have all the remaining keys
+    assertThat(scanned).containsAll(set);
+  }
+
+}