You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by do...@apache.org on 2022/02/01 18:27:20 UTC
[geode] branch support/1.15 updated: GEODE-9835: Add SSCAN to Redis supported commands (#7278)
This is an automated email from the ASF dual-hosted git repository.
donalevans pushed a commit to branch support/1.15
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/support/1.15 by this push:
new 3f56bd3 GEODE-9835: Add SSCAN to Redis supported commands (#7278)
3f56bd3 is described below
commit 3f56bd36a698f05ade37fa0eaa629dfaacf5148f
Author: Bala Kaza Venkata <43...@users.noreply.github.com>
AuthorDate: Tue Feb 1 12:31:22 2022 -0500
GEODE-9835: Add SSCAN to Redis supported commands (#7278)
Co-authored-by: Bala Kaza Venkata <bk...@vmware.com>
Co-authored-by: Steve Sienkowski <ss...@vmware.com>
(cherry picked from commit da6043239eb637994eaba64c3d59cf327aac0f7d)
---
.../server/AbstractHitsMissesIntegrationTest.java | 10 +-
.../executor/set/AbstractSScanIntegrationTest.java | 466 +++++++++++++--------
.../executor/set/SScanIntegrationTest.java | 50 ++-
.../apache/geode/codeAnalysis/excludedClasses.txt | 2 +-
.../redis/internal/commands/RedisCommandType.java | 4 +-
.../commands/executor/set/SScanExecutor.java | 94 +----
.../apache/geode/redis/internal/data/RedisSet.java | 51 +--
.../SizeableObjectOpenCustomHashSet.java | 81 ----
.../SizeableObjectOpenCustomHashSetWithCursor.java | 194 +++++++++
.../internal/netty/ExecutionHandlerContext.java | 10 -
...ytes2ObjectOpenCustomHashMapWithCursorTest.java | 10 +-
.../SizeableObjectOpenCustomHashSetTest.java | 62 ---
...eableObjectOpenCustomHashSetWithCursorTest.java | 284 +++++++++++++
13 files changed, 837 insertions(+), 481 deletions(-)
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
index 0933ed6..4b07ff9 100644
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/server/AbstractHitsMissesIntegrationTest.java
@@ -426,6 +426,11 @@ public abstract class AbstractHitsMissesIntegrationTest implements RedisIntegrat
}
@Test
+ public void testSscan() {
+ runCommandAndAssertHitsAndMisses(SET_KEY, k -> jedis.sscan(k, "0"));
+ }
+
+ @Test
public void testSunion() {
runMultiKeyCommandAndAssertHitsAndMisses(SET_KEY, (k1, k2) -> jedis.sunion(k1, k2));
}
@@ -566,11 +571,6 @@ public abstract class AbstractHitsMissesIntegrationTest implements RedisIntegrat
runCommandAndAssertNoStatUpdates(SET_KEY, k -> jedis.spop(k));
}
- @Test
- public void testSscan() {
- runCommandAndAssertHitsAndMisses(SET_KEY, k -> jedis.sscan(k, "0"));
- }
-
/************* Helper Methods *************/
private void runCommandAndAssertHitsAndMisses(String key, Consumer<String> command) {
Map<String, String> info = RedisTestHelper.getInfo(jedis);
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java
index a39f77a..2e6adda 100755
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/AbstractSScanIntegrationTest.java
@@ -14,37 +14,55 @@
*/
package org.apache.geode.redis.internal.commands.executor.set;
+import static org.apache.geode.redis.RedisCommandArgumentsTestHelper.assertAtLeastNArgs;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import java.math.BigInteger;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import redis.clients.jedis.HostAndPort;
+import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisCluster;
import redis.clients.jedis.Protocol;
import redis.clients.jedis.ScanParams;
import redis.clients.jedis.ScanResult;
+import org.apache.geode.redis.ConcurrentLoopingThreads;
import org.apache.geode.redis.RedisIntegrationTest;
+import org.apache.geode.redis.internal.data.KeyHashUtil;
import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
public abstract class AbstractSScanIntegrationTest implements RedisIntegrationTest {
protected JedisCluster jedis;
- private static final int REDIS_CLIENT_TIMEOUT =
- Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+ public static final String KEY = "key";
+ public static final byte[] KEY_BYTES = KEY.getBytes();
+ public static final int SLOT_FOR_KEY = KeyHashUtil.slotForKey(KEY_BYTES);
+ public static final String ZERO_CURSOR = "0";
+ public static final byte[] ZERO_CURSOR_BYTES = ZERO_CURSOR.getBytes();
+ public static final BigInteger SIGNED_LONG_MAX = new BigInteger(Long.toString(Long.MAX_VALUE));
+ public static final BigInteger SIGNED_LONG_MIN = new BigInteger(Long.toString(Long.MIN_VALUE));
+ public static final String MEMBER_ONE = "1";
+ public static final String MEMBER_TWELVE = "12";
+ public static final String MEMBER_THREE = "3";
+ public static final String BASE_MEMBER_NAME = "baseMember_";
@Before
public void setUp() {
- jedis = new JedisCluster(new HostAndPort("localhost", getPort()), REDIS_CLIENT_TIMEOUT);
+ jedis = new JedisCluster(new HostAndPort(RedisClusterStartupRule.BIND_ADDRESS, getPort()),
+ RedisClusterStartupRule.REDIS_CLIENT_TIMEOUT);
}
@After
@@ -54,295 +72,405 @@ public abstract class AbstractSScanIntegrationTest implements RedisIntegrationTe
}
@Test
- public void givenNoKeyArgument_returnsWrongNumberOfArgumentsError() {
- assertThatThrownBy(() -> jedis.sendCommand("key", Protocol.Command.SSCAN))
- .hasMessageContaining("ERR wrong number of arguments for 'sscan' command");
+ public void givenLessThanTwoArguments_returnsWrongNumberOfArgumentsError() {
+ assertAtLeastNArgs(jedis, Protocol.Command.SSCAN, 2);
}
@Test
- public void givenNoCursorArgument_returnsWrongNumberOfArgumentsError() {
- assertThatThrownBy(() -> jedis.sendCommand("key", Protocol.Command.SSCAN, "key"))
- .hasMessageContaining("ERR wrong number of arguments for 'sscan' command");
+ public void givenNonexistentKey_returnsEmptyArray() {
+ ScanResult<String> result = jedis.sscan("nonexistentKey", ZERO_CURSOR);
+ assertThat(result.isCompleteIteration()).isTrue();
+ assertThat(result.getResult()).isEmpty();
}
@Test
- public void givenArgumentsAreNotOddAndKeyExists_returnsSyntaxError() {
- jedis.sadd("a", "1");
- assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "a*"))
- .hasMessageContaining(ERROR_SYNTAX);
+ public void givenNonexistentKeyAndIncorrectOptionalArguments_returnsEmptyArray() {
+ ScanResult<byte[]> result =
+ sendCustomSscanCommand("nonexistentKey", "nonexistentKey", ZERO_CURSOR, "ANY");
+ assertThat(result.getResult()).isEmpty();
}
@Test
- @SuppressWarnings("unchecked")
- public void givenArgumentsAreNotOddAndKeyDoesNotExist_returnsEmptyArray() {
- List<Object> result =
- (List<Object>) jedis.sendCommand("key!", Protocol.Command.SSCAN, "key!", "0", "a*");
+ public void givenIncorrectOptionalArgumentsAndKeyExists_returnsSyntaxError() {
+ jedis.sadd(KEY, MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "a*"))
+ .hasMessageContaining(ERROR_SYNTAX);
+ }
+
+ @Test
+ public void givenMatchArgumentWithoutPatternOnExistingKey_returnsSyntaxError() {
+ jedis.sadd(KEY, MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "MATCH"))
+ .hasMessageContaining(ERROR_SYNTAX);
+ }
- assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
- assertThat((List<Object>) result.get(1)).isEmpty();
+ @Test
+ public void givenCountArgumentWithoutNumberOnExistingKey_returnsSyntaxError() {
+ jedis.sadd(KEY, MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT"))
+ .hasMessageContaining(ERROR_SYNTAX);
}
@Test
- public void givenMatchOrCountKeywordNotSpecified_returnsSyntaxError() {
- jedis.sadd("a", "1");
- assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "a*", "1"))
- .hasMessageContaining(ERROR_SYNTAX);
+ public void givenAdditionalArgumentNotEqualToMatchOrCount_returnsSyntaxError() {
+ jedis.sadd(KEY, MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "a*", "1"))
+ .hasMessageContaining(ERROR_SYNTAX);
}
@Test
public void givenCount_whenCountParameterIsNotAnInteger_returnsNotIntegerError() {
- jedis.sadd("a", "1");
+ jedis.sadd(KEY, MEMBER_ONE);
assertThatThrownBy(
- () -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "MATCH"))
- .hasMessageContaining(ERROR_NOT_INTEGER);
+ () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT",
+ "notAnInteger"))
+ .hasMessageContaining(ERROR_NOT_INTEGER);
}
@Test
public void givenMultipleCounts_whenAnyCountParameterIsNotAnInteger_returnsNotIntegerError() {
- jedis.sadd("a", "1");
- assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "2",
- "COUNT", "sjlfs", "COUNT", "1"))
- .hasMessageContaining(ERROR_NOT_INTEGER);
+ jedis.sadd(KEY, MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT", "12",
+ "COUNT", "notAnInteger", "COUNT", "1"))
+ .hasMessageContaining(ERROR_NOT_INTEGER);
}
@Test
public void givenMultipleCounts_whenAnyCountParameterIsLessThanOne_returnsSyntaxError() {
- jedis.sadd("a", "1");
- assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "2",
- "COUNT", "0", "COUNT", "1"))
- .hasMessageContaining(ERROR_SYNTAX);
+ jedis.sadd(KEY, MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sendCommand(KEY, Protocol.Command.SSCAN, KEY, ZERO_CURSOR, "COUNT", "12",
+ "COUNT", "0", "COUNT", "1"))
+ .hasMessageContaining(ERROR_SYNTAX);
}
@Test
public void givenCount_whenCountParameterIsZero_returnsSyntaxError() {
- jedis.sadd("a", "1");
-
- assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "0"))
- .hasMessageContaining(ERROR_SYNTAX);
+ jedis.sadd(KEY, MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(0)))
+ .hasMessageContaining(ERROR_SYNTAX);
}
@Test
public void givenCount_whenCountParameterIsNegative_returnsSyntaxError() {
- jedis.sadd("a", "1");
-
+ jedis.sadd(KEY, MEMBER_ONE);
assertThatThrownBy(
- () -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "-37"))
+ () -> jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(-37)))
.hasMessageContaining(ERROR_SYNTAX);
}
@Test
public void givenKeyIsNotASet_returnsWrongTypeError() {
- jedis.hset("a", "b", "1");
+ jedis.hset(KEY, "b", MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sscan(KEY, ZERO_CURSOR))
+ .hasMessageContaining(ERROR_WRONG_TYPE);
+ }
+ @Test
+ public void givenKeyIsNotASetAndCountIsNegative_returnsWrongTypeError() {
+ jedis.hset(KEY, "b", MEMBER_ONE);
assertThatThrownBy(
- () -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0", "COUNT", "-37"))
+ () -> jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(-37)))
.hasMessageContaining(ERROR_WRONG_TYPE);
}
@Test
public void givenKeyIsNotASet_andCursorIsNotAnInteger_returnsInvalidCursorError() {
- jedis.hset("a", "b", "1");
-
- assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "sjfls"))
- .hasMessageContaining(ERROR_CURSOR);
+ jedis.hset(KEY, "b", MEMBER_ONE);
+ assertThatThrownBy(
+ () -> jedis.sscan(KEY, "notAnInteger"))
+ .hasMessageContaining(ERROR_CURSOR);
}
@Test
public void givenNonexistentKey_andCursorIsNotInteger_returnsInvalidCursorError() {
assertThatThrownBy(
- () -> jedis.sendCommand("notReal", Protocol.Command.SSCAN, "notReal", "notReal", "sjfls"))
+ () -> jedis.sscan("nonexistentKey", "notAnInteger"))
.hasMessageContaining(ERROR_CURSOR);
}
@Test
public void givenExistentSetKey_andCursorIsNotAnInteger_returnsInvalidCursorError() {
- jedis.set("a", "b");
-
- assertThatThrownBy(() -> jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "sjfls"))
- .hasMessageContaining(ERROR_CURSOR);
+ jedis.set(KEY, "b");
+ assertThatThrownBy(
+ () -> jedis.sscan(KEY, "notAnInteger"))
+ .hasMessageContaining(ERROR_CURSOR);
}
@Test
- public void givenNonexistentKey_returnsEmptyArray() {
- ScanResult<String> result = jedis.sscan("nonexistent", "0");
-
- assertThat(result.isCompleteIteration()).isTrue();
- assertThat(result.getResult()).isEmpty();
+ public void givenNegativeCursor_doesNotError() {
+ initializeThousandMemberSet();
+ assertThatNoException().isThrownBy(() -> jedis.sscan(KEY, "-1"));
}
@Test
public void givenSetWithOneMember_returnsMember() {
- jedis.sadd("a", "1");
- ScanResult<String> result = jedis.sscan("a", "0");
+ jedis.sadd(KEY, MEMBER_ONE);
+
+ ScanResult<String> result = jedis.sscan(KEY, ZERO_CURSOR);
assertThat(result.isCompleteIteration()).isTrue();
- assertThat(result.getResult()).containsExactly("1");
+ assertThat(result.getResult()).containsOnly(MEMBER_ONE);
}
@Test
- public void givenSetWithMultipleMembers_returnsAllMembers() {
- jedis.sadd("a", "1", "2", "3");
- ScanResult<String> result = jedis.sscan("a", "0");
+ public void givenSetWithMultipleMembers_returnsSubsetOfMembers() {
+ Set<String> initialMemberData = initializeThousandMemberSet();
- assertThat(result.isCompleteIteration()).isTrue();
- assertThat(result.getResult()).containsExactlyInAnyOrder("1", "2", "3");
+ ScanResult<String> result = jedis.sscan(KEY, ZERO_CURSOR);
+
+ assertThat(result.getResult()).isSubsetOf(initialMemberData);
}
@Test
public void givenCount_returnsAllMembersWithoutDuplicates() {
- jedis.sadd("a", "1", "2", "3");
+ Set<byte[]> initialTotalSet = initializeThousandMemberByteSet();
+ int count = 99;
- ScanParams scanParams = new ScanParams();
- scanParams.count(1);
- String cursor = "0";
- ScanResult<byte[]> result;
- List<byte[]> allMembersFromScan = new ArrayList<>();
+ ScanResult<byte[]> result =
+ jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, new ScanParams().count(count));
- do {
- result = jedis.sscan("a".getBytes(), cursor.getBytes(), scanParams);
- allMembersFromScan.addAll(result.getResult());
- cursor = result.getCursor();
- } while (!result.isCompleteIteration());
+ assertThat(result.getResult().size()).isGreaterThanOrEqualTo(count);
+ assertThat(result.getResult()).isSubsetOf(initialTotalSet);
+ }
- assertThat(allMembersFromScan).containsExactlyInAnyOrder("1".getBytes(),
- "2".getBytes(),
- "3".getBytes());
+ @Test
+ public void givenMultipleCounts_usesLastCountSpecified() {
+ Set<byte[]> initialMemberData = initializeThousandMemberByteSet();
+ // Choose two COUNT arguments with a large difference, so that it's extremely unlikely that if
+ // the first COUNT is used, a number of members greater than or equal to the second COUNT will
+ // be returned.
+ int firstCount = 1;
+ int secondCount = 500;
+ ScanResult<byte[]> result = sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR,
+ "COUNT", String.valueOf(firstCount),
+ "COUNT", String.valueOf(secondCount));
+
+ List<byte[]> returnedMembers = result.getResult();
+ assertThat(returnedMembers.size()).isGreaterThanOrEqualTo(secondCount);
+ assertThat(returnedMembers).isSubsetOf(initialMemberData);
}
@Test
- @SuppressWarnings("unchecked")
- public void givenMultipleCounts_returnsAllEntriesWithoutDuplicates() {
- jedis.sadd("a", "1", "12", "3");
+ public void givenSetWithThreeEntriesAndMatch_returnsOnlyMatchingElements() {
+ jedis.sadd(KEY, MEMBER_ONE, MEMBER_TWELVE, MEMBER_THREE);
+ ScanParams scanParams = new ScanParams().match("1*");
+
+ ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
- List<Object> result;
+ assertThat(result.isCompleteIteration()).isTrue();
+ assertThat(result.getResult()).containsOnly(MEMBER_ONE.getBytes(),
+ MEMBER_TWELVE.getBytes());
+ }
- List<byte[]> allEntries = new ArrayList<>();
- String cursor = "0";
+ @Test
+ public void givenSetWithThreeEntriesAndMultipleMatchArguments_returnsOnlyElementsMatchingLastMatchArgument() {
+ jedis.sadd(KEY, MEMBER_ONE, MEMBER_TWELVE, MEMBER_THREE);
- do {
- result =
- (List<Object>) jedis.sendCommand("a", Protocol.Command.SSCAN, "a", cursor, "COUNT", "2",
- "COUNT", "1");
- allEntries.addAll((List<byte[]>) result.get(1));
- cursor = new String((byte[]) result.get(0));
- } while (!Arrays.equals((byte[]) result.get(0), "0".getBytes()));
+ ScanResult<byte[]> result =
+ sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR, "MATCH", "3*", "MATCH", "1*");
- assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
- assertThat(allEntries).containsExactlyInAnyOrder("1".getBytes(),
- "12".getBytes(),
- "3".getBytes());
+ assertThat(result.getCursor()).isEqualTo(ZERO_CURSOR);
+ assertThat(result.getResult()).containsOnly(MEMBER_ONE.getBytes(),
+ MEMBER_TWELVE.getBytes());
}
@Test
- public void givenMatch_returnsAllMatchingMembersWithoutDuplicates() {
- jedis.sadd("a", "1", "12", "3");
-
+ public void givenLargeCountAndMatch_returnsOnlyMatchingMembers() {
+ Set<byte[]> initialMemberData = initializeThousandMemberByteSet();
ScanParams scanParams = new ScanParams();
- scanParams.match("1*");
+ // There are 111 matching members in the set 0..999
+ scanParams.match("9*");
+ // Choose a large COUNT to ensure that some matching members are returned
+ scanParams.count(950);
+
+ ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
+
+ List<byte[]> returnedMembers = result.getResult();
+ // We know that we must have found at least 61 matching members, given the size of COUNT and the
+ // number of matching members in the set
+ assertThat(returnedMembers.size()).isGreaterThanOrEqualTo(61);
+ assertThat(returnedMembers).isSubsetOf(initialMemberData);
+ assertThat(returnedMembers).allSatisfy(bytes -> assertThat(new String(bytes)).startsWith("9"));
+ }
- ScanResult<byte[]> result =
- jedis.sscan("a".getBytes(), "0".getBytes(), scanParams);
+ @Test
+ public void givenMultipleCountAndMatch_usesLastSpecified() {
+ Set<byte[]> initialMemberData = initializeThousandMemberByteSet();
+ // Choose a large COUNT to ensure that some matching members are returned
+ // There are 111 matching members in the set 0..999
+
+ ScanResult<byte[]> result = sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR,
+ "COUNT", "20",
+ "MATCH", "1*",
+ "COUNT", "950",
+ "MATCH", "9*");
+
+ List<byte[]> returnedMembers = result.getResult();
+ // We know that we must have found at least 61 matching members, given the size of COUNT and the
+ // number of matching members in the set
+ assertThat(returnedMembers.size()).isGreaterThanOrEqualTo(61);
+ assertThat(returnedMembers).isSubsetOf(initialMemberData);
+ assertThat(returnedMembers).allSatisfy(bytes -> assertThat(new String(bytes)).startsWith("9"));
+ }
- assertThat(result.isCompleteIteration()).isTrue();
- assertThat(result.getResult()).containsExactlyInAnyOrder("1".getBytes(),
- "12".getBytes());
+ @Test
+ public void givenNonMatchingPattern_returnsEmptyResult() {
+ jedis.sadd(KEY, "cat dog elk");
+ ScanParams scanParams = new ScanParams().match("*fish*");
+
+ ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
+
+ assertThat(result.getResult()).isEmpty();
}
@Test
- @SuppressWarnings("unchecked")
- public void givenMultipleMatches_returnsMembersMatchingLastMatchParameter() {
- jedis.sadd("a", "1", "12", "3");
+ public void should_notReturnValue_givenValueWasRemovedBeforeSscanIsCalled() {
+ initializeThreeMemberSet();
+ jedis.srem(KEY, MEMBER_THREE);
+ GeodeAwaitility.await().untilAsserted(
+ () -> assertThat(jedis.sismember(KEY, MEMBER_THREE)).isFalse());
- List<Object> result = (List<Object>) jedis.sendCommand("a", Protocol.Command.SSCAN, "a", "0",
- "MATCH", "3*", "MATCH", "1*");
+ ScanResult<String> result = jedis.sscan(KEY, ZERO_CURSOR);
- assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
- assertThat((List<byte[]>) result.get(1)).containsExactlyInAnyOrder("1".getBytes(),
- "12".getBytes());
+ assertThat(result.getResult()).doesNotContain(MEMBER_THREE);
}
@Test
- public void givenMatchAndCount_returnsAllMembersWithoutDuplicates() {
- jedis.sadd("a", "1", "12", "3");
+ public void should_notErrorGivenNonzeroCursorOnFirstCall() {
+ initializeThreeMemberSet();
+ assertThatNoException().isThrownBy(() -> jedis.sscan(KEY, "5"));
+ }
- ScanParams scanParams = new ScanParams();
- scanParams.count(1);
- scanParams.match("1*");
- ScanResult<byte[]> result;
- List<byte[]> allMembersFromScan = new ArrayList<>();
- String cursor = "0";
+ @Test
+ public void should_notErrorGivenCountEqualToIntegerMaxValue() {
+ Set<byte[]> set = initializeThreeMemberByteSet();
+ ScanParams scanParams = new ScanParams().count(Integer.MAX_VALUE);
- do {
- result = jedis.sscan("a".getBytes(), cursor.getBytes(), scanParams);
- allMembersFromScan.addAll(result.getResult());
- cursor = result.getCursor();
- } while (!result.isCompleteIteration());
+ ScanResult<byte[]> result = jedis.sscan(KEY_BYTES, ZERO_CURSOR_BYTES, scanParams);
- assertThat(allMembersFromScan).containsExactlyInAnyOrder("1".getBytes(),
- "12".getBytes());
+ assertThat(result.getResult())
+ .containsExactlyInAnyOrderElementsOf(set);
}
@Test
- @SuppressWarnings("unchecked")
- public void givenMultipleCountsAndMatches_returnsAllEntriesWithoutDuplicates() {
- jedis.sadd("a", "1", "12", "3");
+ public void should_notErrorGivenCountGreaterThanIntegerMaxValue() {
+ initializeThreeMemberByteSet();
+ String greaterThanInt = String.valueOf(2L * Integer.MAX_VALUE);
+
+ ScanResult<byte[]> result =
+ sendCustomSscanCommand(KEY, KEY, ZERO_CURSOR, "COUNT", greaterThanInt);
- List<Object> result;
- List<byte[]> allEntries = new ArrayList<>();
- String cursor = "0";
+ assertThat(result.getCursor()).isEqualTo(ZERO_CURSOR);
+ assertThat(result.getResult()).containsExactlyInAnyOrder(
+ MEMBER_ONE.getBytes(),
+ MEMBER_TWELVE.getBytes(),
+ MEMBER_THREE.getBytes());
+ }
- do {
- result =
- (List<Object>) jedis.sendCommand("a", Protocol.Command.SSCAN, "a", cursor, "COUNT", "37",
- "MATCH", "3*", "COUNT", "2", "COUNT", "1", "MATCH", "1*");
- allEntries.addAll((List<byte[]>) result.get(1));
- cursor = new String((byte[]) result.get(0));
- } while (!Arrays.equals((byte[]) result.get(0), "0".getBytes()));
+ /**** Concurrency ***/
- assertThat((byte[]) result.get(0)).isEqualTo("0".getBytes());
- assertThat(allEntries).containsExactlyInAnyOrder("1".getBytes(),
- "12".getBytes());
+ @Test
+ public void should_returnAllConsistentlyPresentMembers_givenConcurrentThreadsAddingAndRemovingMembers() {
+ final Set<String> initialMemberData = initializeThousandMemberSet();
+ final int iterationCount = 500;
+ Jedis jedis1 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+ Jedis jedis2 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+
+ new ConcurrentLoopingThreads(iterationCount,
+ (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis1, initialMemberData),
+ (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis2, initialMemberData),
+ (i) -> {
+ String member = "new_" + BASE_MEMBER_NAME + i;
+ jedis.sadd(KEY, member);
+ jedis.srem(KEY, member);
+ }).run();
+
+ jedis1.close();
+ jedis2.close();
}
@Test
- public void givenNegativeCursor_returnsMembersUsingAbsoluteValueOfCursor() {
- jedis.sadd("b", "green", "orange", "yellow");
+ public void should_notAlterUnderlyingData_givenMultipleConcurrentSscans() {
+ final Set<String> initialMemberData = initializeThousandMemberSet();
+ jedis.sadd(KEY, initialMemberData.toArray(new String[0]));
+ final int iterationCount = 500;
+ Jedis jedis1 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+ Jedis jedis2 = jedis.getConnectionFromSlot(SLOT_FOR_KEY);
+
+ new ConcurrentLoopingThreads(iterationCount,
+ (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis1, initialMemberData),
+ (i) -> multipleSScanAndAssertOnContentOfResultSet(i, jedis2, initialMemberData))
+ .run();
+ assertThat(jedis.smembers(KEY)).containsExactlyInAnyOrderElementsOf(initialMemberData);
+
+ jedis1.close();
+ jedis2.close();
+ }
+ private void multipleSScanAndAssertOnContentOfResultSet(int iteration, Jedis jedis,
+ final Set<String> initialMemberData) {
List<String> allEntries = new ArrayList<>();
-
- String cursor = "-100";
+ String cursor = ZERO_CURSOR;
ScanResult<String> result;
+
do {
- result = jedis.sscan("b", cursor);
- allEntries.addAll(result.getResult());
+ result = jedis.sscan(KEY, cursor);
cursor = result.getCursor();
+ List<String> resultEntries = result.getResult();
+ allEntries.addAll(resultEntries);
} while (!result.isCompleteIteration());
- assertThat(allEntries).containsExactlyInAnyOrder("green", "orange", "yellow");
+ assertThat(allEntries).as("failed on iteration " + iteration)
+ .containsAll(initialMemberData);
}
- @Test
- public void givenCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
- assertThatThrownBy(() -> jedis.sscan("a", "18446744073709551616"))
- .hasMessageContaining(ERROR_CURSOR);
+ @SuppressWarnings("unchecked")
+ private ScanResult<byte[]> sendCustomSscanCommand(String key, String... args) {
+ List<Object> result = (List<Object>) (jedis.sendCommand(key, Protocol.Command.SSCAN, args));
+ return new ScanResult<>((byte[]) result.get(0), (List<byte[]>) result.get(1));
}
- @Test
- public void givenNegativeCursorGreaterThanUnsignedLongCapacity_returnsCursorError() {
- assertThatThrownBy(() -> jedis.sscan("a", "-18446744073709551616"))
- .hasMessageContaining(ERROR_CURSOR);
+ private void initializeThreeMemberSet() {
+ jedis.sadd(KEY, MEMBER_ONE, MEMBER_TWELVE, MEMBER_THREE);
}
- @Test
- public void givenInvalidRegexSyntax_returnsEmptyArray() {
- jedis.sadd("a", "1");
- ScanParams scanParams = new ScanParams();
- scanParams.count(1);
- scanParams.match("\\p");
+ private Set<byte[]> initializeThreeMemberByteSet() {
+ Set<byte[]> set = new HashSet<>();
+ set.add(MEMBER_ONE.getBytes());
+ set.add(MEMBER_TWELVE.getBytes());
+ set.add(MEMBER_THREE.getBytes());
+ jedis.sadd(KEY_BYTES, MEMBER_ONE.getBytes(), MEMBER_TWELVE.getBytes(),
+ MEMBER_THREE.getBytes());
+ return set;
+ }
- ScanResult<byte[]> result =
- jedis.sscan("a".getBytes(), "0".getBytes(), scanParams);
+ private Set<String> initializeThousandMemberSet() {
+ Set<String> set = new HashSet<>();
+ int sizeOfSet = 1000;
+ for (int i = 0; i < sizeOfSet; i++) {
+ set.add((BASE_MEMBER_NAME + i));
+ }
+ jedis.sadd(KEY, set.toArray(new String[0]));
+ return set;
+ }
- assertThat(result.getResult()).isEmpty();
+ private Set<byte[]> initializeThousandMemberByteSet() {
+ Set<byte[]> set = new HashSet<>();
+ int sizeOfSet = 1000;
+ for (int i = 0; i < sizeOfSet; i++) {
+ byte[] memberToAdd = Integer.toString(i).getBytes();
+ set.add(memberToAdd);
+ jedis.sadd(KEY_BYTES, memberToAdd);
+ }
+ return set;
}
+
}
diff --git a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java
index 2e552d9..6ce3c89 100755
--- a/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java
+++ b/geode-for-redis/src/integrationTest/java/org/apache/geode/redis/internal/commands/executor/set/SScanIntegrationTest.java
@@ -14,15 +14,14 @@
*/
package org.apache.geode.redis.internal.commands.executor.set;
-import static org.assertj.core.api.Assertions.assertThat;
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import java.util.ArrayList;
-import java.util.List;
+import java.math.BigInteger;
import org.junit.ClassRule;
import org.junit.Test;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
import org.apache.geode.redis.GeodeRedisServerRule;
@@ -37,21 +36,30 @@ public class SScanIntegrationTest extends AbstractSScanIntegrationTest {
}
@Test
- public void givenDifferentCursorThanSpecifiedByPreviousSscan_returnsAllMembers() {
- List<byte[]> memberList = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- jedis.sadd("a", String.valueOf(i));
- memberList.add(String.valueOf(i).getBytes());
- }
-
- ScanParams scanParams = new ScanParams();
- scanParams.count(5);
- ScanResult<byte[]> result =
- jedis.sscan("a".getBytes(), "0".getBytes(), scanParams);
- assertThat(result.isCompleteIteration()).isFalse();
-
- result = jedis.sscan("a".getBytes(), "100".getBytes());
-
- assertThat(result.getResult()).containsExactlyInAnyOrderElementsOf(memberList);
+ public void givenCursorGreaterThanSignedLongMaxValue_returnsCursorError() {
+ assertThatThrownBy(
+ () -> jedis.sscan(KEY, SIGNED_LONG_MAX.add(BigInteger.ONE).toString()))
+ .hasMessageContaining(ERROR_CURSOR);
+ }
+
+ @Test
+ public void givenNegativeCursorLessThanSignedLongMinValue_returnsCursorError() {
+ assertThatThrownBy(
+ () -> jedis.sscan(KEY, SIGNED_LONG_MIN.subtract(BigInteger.ONE).toString()))
+ .hasMessageContaining(ERROR_CURSOR);
+ }
+
+ @Test
+ public void givenCursorEqualToSignedLongMinValue_doesNotError() {
+ jedis.sadd(KEY, "1");
+ assertThatNoException()
+ .isThrownBy(() -> jedis.sscan(KEY, SIGNED_LONG_MAX.toString()));
+ }
+
+ @Test
+ public void givenNegativeCursorEqualToSignedLongMinValue_doesNotError() {
+ jedis.sadd(KEY, "1");
+ assertThatNoException()
+ .isThrownBy(() -> jedis.sscan(KEY, SIGNED_LONG_MIN.toString()));
}
}
diff --git a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
index 3d87db1..a51c9a0 100644
--- a/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
+++ b/geode-for-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/excludedClasses.txt
@@ -1,7 +1,7 @@
org/apache/geode/redis/internal/services/cluster/RedisMemberInfoRetrievalFunction
org/apache/geode/redis/internal/data/collections/Bytes2ObjectOpenHashMap
org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursor
-org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet
+org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor
org/apache/geode/redis/internal/data/RedisCrossSlotException
org/apache/geode/redis/internal/data/RedisDataMovedException
org/apache/geode/redis/internal/data/RedisDataTypeMismatchException
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
index c11155b..856ea9a 100755
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/RedisCommandType.java
@@ -255,6 +255,8 @@ public enum RedisCommandType {
SRANDMEMBER(new SRandMemberExecutor(), SUPPORTED,
new Parameter().min(2).max(3, ERROR_SYNTAX).flags(READONLY, RANDOM)),
SREM(new SRemExecutor(), SUPPORTED, new Parameter().min(3).flags(WRITE, FAST)),
+ SSCAN(new SScanExecutor(), SUPPORTED, new Parameter().min(3).flags(READONLY, RANDOM),
+ new Parameter().odd(ERROR_SYNTAX)),
SUNION(new SUnionExecutor(), SUPPORTED,
new Parameter().min(2).lastKey(-1).flags(READONLY, SORT_FOR_SCRIPT)),
SUNIONSTORE(new SUnionStoreExecutor(), SUPPORTED,
@@ -357,8 +359,6 @@ public enum RedisCommandType {
SPOP(new SPopExecutor(), UNSUPPORTED,
new Parameter().min(2).max(3, ERROR_SYNTAX).flags(WRITE, RANDOM, FAST)),
- SSCAN(new SScanExecutor(), UNSUPPORTED, new Parameter().min(3).flags(READONLY, RANDOM),
- new Parameter().odd(ERROR_SYNTAX).firstKey(0)),
/*************** Server ****************/
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java
index feb9914..e6df7b7 100755
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/commands/executor/set/SScanExecutor.java
@@ -15,117 +15,29 @@
package org.apache.geode.redis.internal.commands.executor.set;
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
-import static org.apache.geode.redis.internal.RedisConstants.ERROR_WRONG_TYPE;
import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_SET;
-import static org.apache.geode.redis.internal.netty.Coder.bytesToLong;
-import static org.apache.geode.redis.internal.netty.Coder.bytesToString;
-import static org.apache.geode.redis.internal.netty.Coder.equalsIgnoreCaseBytes;
-import static org.apache.geode.redis.internal.netty.Coder.narrowLongToInt;
-import static org.apache.geode.redis.internal.netty.StringBytesGlossary.COUNT;
-import static org.apache.geode.redis.internal.netty.StringBytesGlossary.MATCH;
-import java.math.BigInteger;
import java.util.List;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.geode.redis.internal.commands.Command;
import org.apache.geode.redis.internal.commands.executor.GlobPattern;
-import org.apache.geode.redis.internal.commands.executor.RedisResponse;
import org.apache.geode.redis.internal.commands.executor.key.AbstractScanExecutor;
-import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.data.RedisDataType;
-import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
import org.apache.geode.redis.internal.data.RedisKey;
import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
public class SScanExecutor extends AbstractScanExecutor {
- private static final BigInteger UNSIGNED_LONG_CAPACITY = new BigInteger("18446744073709551615");
@Override
- public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
- List<byte[]> commandElems = command.getProcessedCommand();
-
- String cursorString = bytesToString(commandElems.get(2));
- BigInteger cursor;
- byte[] globPattern = null;
- int count = DEFAULT_COUNT;
-
- try {
- cursor = new BigInteger(cursorString).abs();
- } catch (NumberFormatException e) {
- return RedisResponse.error(ERROR_CURSOR);
- }
-
- if (cursor.compareTo(UNSIGNED_LONG_CAPACITY) > 0) {
- return RedisResponse.error(ERROR_CURSOR);
- }
-
- RedisKey key = command.getKey();
-
- RedisData value = context.getRegionProvider().getRedisData(key);
- if (value.isNull()) {
- context.getRedisStats().incKeyspaceMisses();
- return RedisResponse.emptyScan();
- }
-
- if (value.getType() != REDIS_SET) {
- throw new RedisDataTypeMismatchException(ERROR_WRONG_TYPE);
- }
-
- command.getCommandType().checkDeferredParameters(command, context);
-
- if (!cursor.equals(context.getSscanCursor())) {
- cursor = new BigInteger("0");
- }
-
- for (int i = 3; i < commandElems.size(); i = i + 2) {
- byte[] commandElemBytes = commandElems.get(i);
- if (equalsIgnoreCaseBytes(commandElemBytes, MATCH)) {
- commandElemBytes = commandElems.get(i + 1);
- globPattern = commandElemBytes;
-
- } else if (equalsIgnoreCaseBytes(commandElemBytes, COUNT)) {
- commandElemBytes = commandElems.get(i + 1);
- try {
- count = narrowLongToInt(bytesToLong(commandElemBytes));
- } catch (NumberFormatException e) {
- return RedisResponse.error(ERROR_NOT_INTEGER);
- }
-
- if (count < 1) {
- return RedisResponse.error(ERROR_SYNTAX);
- }
-
- } else {
- return RedisResponse.error(ERROR_SYNTAX);
- }
- }
-
- GlobPattern pattern = convertGlobToRegex(globPattern);
- int lCount = count;
- BigInteger lCursor = cursor;
- Pair<BigInteger, List<Object>> scanResult =
- context.setLockedExecute(key, true,
- set -> set.sscan(pattern, lCount, lCursor));
-
- context.setSscanCursor(scanResult.getLeft());
-
- return RedisResponse.scan(scanResult.getLeft().intValue(), scanResult.getRight());
- }
-
- // TODO: When SSCAN is supported, refactor to use these methods and not override executeCommand()
- @Override
protected Pair<Integer, List<byte[]>> executeScan(ExecutionHandlerContext context, RedisKey key,
GlobPattern pattern, int count, int cursor) {
- return null;
+ return context.setLockedExecute(key, true,
+ set -> set.sscan(pattern, count, cursor));
}
@Override
protected RedisDataType getDataType() {
- return null;
+ return REDIS_SET;
}
}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
index 04654da..3fd6352 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
@@ -24,7 +24,6 @@ import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_SET;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -45,7 +44,7 @@ import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.redis.internal.commands.executor.GlobPattern;
-import org.apache.geode.redis.internal.data.collections.SizeableObjectOpenCustomHashSet;
+import org.apache.geode.redis.internal.data.collections.SizeableObjectOpenCustomHashSetWithCursor;
import org.apache.geode.redis.internal.data.delta.AddByteArrays;
import org.apache.geode.redis.internal.data.delta.RemoveByteArrays;
import org.apache.geode.redis.internal.data.delta.ReplaceByteArrays;
@@ -207,42 +206,26 @@ public class RedisSet extends AbstractRedisData {
return result.size();
}
- public Pair<BigInteger, List<Object>> sscan(GlobPattern matchPattern, int count,
- BigInteger cursor) {
- List<Object> returnList = new ArrayList<>();
- int size = members.size();
- BigInteger beforeCursor = new BigInteger("0");
- int numElements = 0;
- int i = -1;
- for (byte[] value : members) {
- i++;
- if (beforeCursor.compareTo(cursor) < 0) {
- beforeCursor = beforeCursor.add(new BigInteger("1"));
- continue;
- }
+ public Pair<Integer, List<byte[]>> sscan(GlobPattern matchPattern, int count,
+ int cursor) {
+ int maximumCapacity = Math.min(count, scard() + 1);
+ List<byte[]> resultList = new ArrayList<>(maximumCapacity);
- if (matchPattern != null) {
- if (matchPattern.matches(value)) {
- returnList.add(value);
- numElements++;
- }
- } else {
- returnList.add(value);
- numElements++;
- }
+ cursor = members.scan(cursor, count,
+ (list, key) -> addIfMatching(matchPattern, list, key), resultList);
- if (numElements == count) {
- break;
- }
- }
+ return new ImmutablePair<>(cursor, resultList);
+
+ }
- Pair<BigInteger, List<Object>> scanResult;
- if (i >= size - 1) {
- scanResult = new ImmutablePair<>(new BigInteger("0"), returnList);
+ private void addIfMatching(GlobPattern matchPattern, List<byte[]> list, byte[] key) {
+ if (matchPattern != null) {
+ if (matchPattern.matches(key)) {
+ list.add(key);
+ }
} else {
- scanResult = new ImmutablePair<>(new BigInteger(String.valueOf(i + 1)), returnList);
+ list.add(key);
}
- return scanResult;
}
public Collection<byte[]> spop(Region<RedisKey, RedisData> region, RedisKey key, int popCount) {
@@ -495,7 +478,7 @@ public class RedisSet extends AbstractRedisData {
return REDIS_SET_OVERHEAD + members.getSizeInBytes();
}
- public static class MemberSet extends SizeableObjectOpenCustomHashSet<byte[]> {
+ public static class MemberSet extends SizeableObjectOpenCustomHashSetWithCursor<byte[]> {
public MemberSet() {
super(ByteArrays.HASH_STRATEGY);
}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet.java
deleted file mode 100644
index 1d57d8f..0000000
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSet.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.redis.internal.data.collections;
-
-
-import static org.apache.geode.internal.JvmSizeUtils.memoryOverhead;
-
-import java.util.Collection;
-
-import it.unimi.dsi.fastutil.objects.ObjectOpenCustomHashSet;
-
-import org.apache.geode.internal.size.Sizeable;
-
-public abstract class SizeableObjectOpenCustomHashSet<K> extends ObjectOpenCustomHashSet<K>
- implements Sizeable {
- private static final long serialVersionUID = 9174920505089089517L;
- private static final int OPEN_HASH_SET_OVERHEAD =
- memoryOverhead(SizeableObjectOpenCustomHashSet.class);
-
- private int memberOverhead;
-
- public SizeableObjectOpenCustomHashSet(int expected, Strategy<? super K> strategy) {
- super(expected, strategy);
- }
-
- public SizeableObjectOpenCustomHashSet(Strategy<? super K> strategy) {
- super(strategy);
- }
-
- public SizeableObjectOpenCustomHashSet(Collection<? extends K> c, Strategy<? super K> strategy) {
- super(c, strategy);
- }
-
- @Override
- public boolean add(K k) {
- boolean added = super.add(k);
- if (added) {
- memberOverhead += sizeElement(k);
- }
- return added;
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public boolean remove(Object k) {
- boolean removed = super.remove(k);
- if (removed) {
- memberOverhead -= sizeElement((K) k);
- }
- return removed;
- }
-
- public K getFromBackingArray(final int pos) {
- return key[pos];
- }
-
- public int getBackingArrayLength() {
- return key.length;
- }
-
- @Override
- public int getSizeInBytes() {
- // The object referenced by the "strategy" field is not sized
- // since it is usually a singleton instance.
- return OPEN_HASH_SET_OVERHEAD + memoryOverhead(key) + memberOverhead;
- }
-
- protected abstract int sizeElement(K element);
-}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor.java
new file mode 100644
index 0000000..dfb077b
--- /dev/null
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursor.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.data.collections;
+
+
+import static it.unimi.dsi.fastutil.HashCommon.mix;
+import static org.apache.geode.internal.JvmSizeUtils.memoryOverhead;
+
+import java.util.Collection;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenCustomHashSet;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.internal.size.Sizeable;
+
+public abstract class SizeableObjectOpenCustomHashSetWithCursor<E>
+ extends ObjectOpenCustomHashSet<E>
+ implements Sizeable {
+ private static final long serialVersionUID = 9174920505089089517L;
+ private static final int OPEN_HASH_SET_OVERHEAD =
+ memoryOverhead(SizeableObjectOpenCustomHashSetWithCursor.class);
+
+ private int memberOverhead;
+
+ public SizeableObjectOpenCustomHashSetWithCursor(int expected, Strategy<? super E> strategy) {
+ super(expected, strategy);
+ }
+
+ public SizeableObjectOpenCustomHashSetWithCursor(Strategy<? super E> strategy) {
+ super(strategy);
+ }
+
+ public SizeableObjectOpenCustomHashSetWithCursor(Collection<? extends E> c,
+ Strategy<? super E> strategy) {
+ super(c, strategy);
+ }
+
+ @Override
+ public boolean add(E e) {
+ boolean added = super.add(e);
+ if (added) {
+ memberOverhead += sizeElement(e);
+ }
+ return added;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public boolean remove(Object e) {
+ boolean removed = super.remove(e);
+ if (removed) {
+ memberOverhead -= sizeElement((E) e);
+ }
+ return removed;
+ }
+
+ public E getFromBackingArray(final int pos) {
+ return key[pos];
+ }
+
+ public int getBackingArrayLength() {
+ return key.length;
+ }
+
+ @Override
+ public int getSizeInBytes() {
+ // The object referenced by the "strategy" field is not sized
+ // since it is usually a singleton instance.
+ return OPEN_HASH_SET_OVERHEAD + memoryOverhead(key) + memberOverhead;
+ }
+
+ /**
+ * Scan entries and pass them to the given consumer function, starting at the passed in
+ * cursor. This method will scan until at least count entries are returned, or the entire
+ * set has been scanned. Once the returned cursor is 0, the entire set is scanned.
+ *
+ * This method may emit more than *count* number of elements if there are hash collisions.
+ *
+ * @param cursor The cursor to start from. Should be 0 for the initial scan. Subsequent calls
+ * should use the cursor returned by the previous scan call.
+ * @param count The number of elements to scan
+ * @param consumer A function to pass the scanned members
+ * @param privateData Some data to pass to the function, for example a set to collect elements in.
+ * This
+ * allows the function to be stateless.
+ * @param <D> The type of the data passed to the function/
+ * @return The next cursor to scan from, or 0 if the scan has touched all elements.
+ */
+ public <D> int scan(int cursor, int count,
+ SizeableObjectOpenCustomHashSetWithCursor.EntryConsumer<E, D> consumer, D privateData) {
+ // Implementation notes
+ //
+ // This stateless scan cursor algorithm is based on the dictScan cursor
+ // implementation from dict.c in redis. Please see the comments in that class for the full
+ // details. That iteration algorithm was designed by Pieter Noordhuis.
+ //
+ // There is one wrinkle due to the fact that we are using a different type of hashtable here.
+ // The parent class, ObjectOpenCustomHashSet, uses an open addressing with a linear
+ // probe. What that means is that when there is a hash collision, instead of putting
+ // a linked list of hash entries into a single hash bucket, this implementation simply
+ // moves on to the next element to the right in the array and tries to put the inserted
+ // object there, continuing until it finds a null slot.
+ //
+ // So in order to use the redis cursor algorithm, our scan needs to probe ahead to
+ // subsequent positions to find any hash entries that match the position we are scanning.
+ // This is logically equivalent to iterating over the linked list in a hashtable bucket
+ // for a redis style closed addressing hashtable.
+ //
+
+ do {
+ // Emit all the entries at the cursor. This means looking forward in the hash
+ // table for any non-null entries that might hash to the current cursor and emitting
+ // those as well. This may even wrap around to the front of the hashtable.
+ int position = cursor;
+ while (key[position & mask] != null) {
+ E currentElement = key[position & mask];
+ if (elementHashesTo(currentElement, position, cursor & mask)) {
+ consumer.consume(privateData, currentElement);
+ count--;
+ }
+ position++;
+ }
+
+ // Increment the reversed cursor
+ cursor |= ~mask;
+ cursor = rev(cursor);
+ cursor++;
+ cursor = rev(cursor);
+
+
+ } while (count > 0 && cursor != 0);
+
+ return cursor;
+ }
+
+ /**
+ * reverse the bits in a cursor.
+ *
+ * Package scope to allow for unit testing to make sure we don't have some silly
+ * java signed int issues
+ *
+ * @param value the value to reverse
+ * @return the reversed bits.
+ */
+ static int rev(int value) {
+ // This implementation is also based on dict.c from redis, which was originally from
+ // http://graphics.stanford.edu/~seander/bithacks.html#ReverseParallel
+ int s = 32;
+ int mask = ~0;
+ while ((s >>>= 1) > 0) {
+ mask ^= (mask << s);
+ value = ((value >>> s) & mask) | ((value << s) & ~mask);
+ }
+ return value;
+ }
+
+ public interface EntryConsumer<E, D> {
+ void consume(D privateData, E element);
+ }
+
+ /**
+ * Check to see if given element hashes to the expected hash.
+ *
+ * @param currentElement The element to key
+ * @param currentPosition The position of the element in the element[] array
+ * @param expectedHash - the expected hash of the element.
+ */
+ private boolean elementHashesTo(E currentElement, int currentPosition, int expectedHash) {
+ // There is a small optimization here. If the previous element
+ // is null, we know that the element at position does hash to the expected
+ // hash because it is not here as a result of a collision at some previous position.
+ E previousElement = key[(currentPosition - 1) & mask];
+ return previousElement == null || hash(currentElement) == expectedHash;
+ }
+
+ @VisibleForTesting
+ public int hash(E element) {
+ return mix(strategy().hashCode(element)) & mask;
+ }
+
+ protected abstract int sizeElement(E element);
+}
diff --git a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 000adf5..5177fea 100644
--- a/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-for-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -103,7 +103,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
private final DistributedMember member;
private final RedisSecurityService securityService;
private BigInteger scanCursor;
- private BigInteger sscanCursor;
private final AtomicBoolean channelInactive = new AtomicBoolean();
private final ChannelId channelId;
@@ -133,7 +132,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
this.member = member;
this.securityService = securityService;
this.scanCursor = new BigInteger("0");
- this.sscanCursor = new BigInteger("0");
this.channelId = channel.id();
redisStats.addClient();
@@ -385,14 +383,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
this.scanCursor = scanCursor;
}
- public BigInteger getSscanCursor() {
- return sscanCursor;
- }
-
- public void setSscanCursor(BigInteger sscanCursor) {
- this.sscanCursor = sscanCursor;
- }
-
public String getMemberName() {
return member.getUniqueId();
}
diff --git a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java
index 0d4a722..cd68c06 100644
--- a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java
+++ b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableBytes2ObjectOpenCustomHashMapWithCursorTest.java
@@ -223,9 +223,9 @@ public class SizeableBytes2ObjectOpenCustomHashMapWithCursorTest {
@Test
public void scanWithShrinkingTable_DoesNotMissElements() {
- final int MAP_SIZE = 500;
- Bytes2StringMap map = new Bytes2StringMap(MAP_SIZE * 2); // *2 to prevent rehash
- fillMapWithUniqueHashKeys(map, MAP_SIZE);
+ final int initialMapSize = 500;
+ Bytes2StringMap map = new Bytes2StringMap(1); // 1 to ensure resizing back down
+ fillMapWithUniqueHashKeys(map, initialMapSize);
Map<byte[], String> scanned = new Object2ObjectOpenCustomHashMap<>(ByteArrays.HASH_STRATEGY);
int cursor = map.scan(0, 50, Map::put, scanned);
@@ -234,14 +234,14 @@ public class SizeableBytes2ObjectOpenCustomHashMapWithCursorTest {
// Remove a lot of elements to trigger a resize
// Remove some of the elements
Iterator<Map.Entry<byte[], String>> iterator = map.entrySet().iterator();
- int removeCount = MAP_SIZE - 100;
+ int removeCount = initialMapSize - 100;
while (removeCount > 0 && iterator.hasNext()) {
iterator.next();
iterator.remove();
removeCount--;
}
- cursor = map.scan(cursor, MAP_SIZE, Map::put, scanned);
+ cursor = map.scan(cursor, initialMapSize, Map::put, scanned);
assertThat(cursor).isEqualTo(0);
// Scan should at least have all of the remaining keys
diff --git a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetTest.java b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetTest.java
deleted file mode 100644
index a39fffa..0000000
--- a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-package org.apache.geode.redis.internal.data.collections;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import it.unimi.dsi.fastutil.bytes.ByteArrays;
-import org.junit.Test;
-
-import org.apache.geode.cache.util.ObjectSizer;
-import org.apache.geode.internal.size.ReflectionObjectSizer;
-import org.apache.geode.redis.internal.data.RedisSet;
-
-public class SizeableObjectOpenCustomHashSetTest {
- private final ObjectSizer sizer = ReflectionObjectSizer.getInstance();
-
- private int expectedSize(RedisSet.MemberSet set) {
- return sizer.sizeof(set) - sizer.sizeof(ByteArrays.HASH_STRATEGY);
- }
-
- @Test
- public void getSizeInBytesIsAccurateForByteArrays() {
- List<byte[]> initialElements = new ArrayList<>();
- int initialNumberOfElements = 20;
- int elementsToAdd = 100;
- for (int i = 0; i < initialNumberOfElements; ++i) {
- initialElements.add(new byte[] {(byte) i});
- }
- // Create a set with an initial size and confirm that it correctly reports its size
- RedisSet.MemberSet set = new RedisSet.MemberSet(initialElements);
- assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
-
- // Add enough members to force a resize and assert that the size is correct after each add
- for (int i = initialNumberOfElements; i < initialNumberOfElements + elementsToAdd; ++i) {
- set.add(new byte[] {(byte) i});
- assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
- }
- assertThat(set.size()).isEqualTo(initialNumberOfElements + elementsToAdd);
-
- // Remove all the members and assert that the size is correct after each remove
- for (int i = 0; i < initialNumberOfElements + elementsToAdd; ++i) {
- set.remove(new byte[] {(byte) i});
- assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
- }
- assertThat(set.size()).isEqualTo(0);
- }
-}
diff --git a/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursorTest.java b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursorTest.java
new file mode 100644
index 0000000..a02788d
--- /dev/null
+++ b/geode-for-redis/src/test/java/org/apache/geode/redis/internal/data/collections/SizeableObjectOpenCustomHashSetWithCursorTest.java
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.redis.internal.data.collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.IntStream;
+
+import it.unimi.dsi.fastutil.bytes.ByteArrays;
+import it.unimi.dsi.fastutil.objects.ObjectIterator;
+import org.junit.Test;
+
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.internal.size.ReflectionObjectSizer;
+import org.apache.geode.redis.internal.data.RedisSet;
+
+public class SizeableObjectOpenCustomHashSetWithCursorTest {
+ private static final int INITIAL_SIZE = 20;
+ private static final int SIZE_OF_NEW_ELEMENTS_TO_ADD = 100;
+ private final ObjectSizer sizer = ReflectionObjectSizer.getInstance();
+
+ private int expectedSize(RedisSet.MemberSet set) {
+ return sizer.sizeof(set) - sizer.sizeof(ByteArrays.HASH_STRATEGY);
+ }
+
+
+ private static class ByteSet
+ extends SizeableObjectOpenCustomHashSetWithCursor<byte[]> {
+ public ByteSet() {
+ super(ByteArrays.HASH_STRATEGY);
+ }
+
+ public ByteSet(int initialSize) {
+ super(initialSize, ByteArrays.HASH_STRATEGY);
+ }
+
+ @Override
+ protected int sizeElement(byte[] element) {
+ return 0;
+ }
+ }
+
+
+ private List<byte[]> initializeMemberSet() {
+ List<byte[]> initialElements = new ArrayList<>();
+ for (int i = 0; i < INITIAL_SIZE; ++i) {
+ initialElements.add(new byte[] {(byte) i});
+ }
+ return initialElements;
+ }
+
+ @Test
+ public void getSizeInBytesIsAccurateForByteArrays() {
+ RedisSet.MemberSet set = new RedisSet.MemberSet(initializeMemberSet());
+ assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
+ }
+
+ @Test
+ public void addMoreMembers_assertSetSizeIsAccurate() {
+ RedisSet.MemberSet set = new RedisSet.MemberSet(initializeMemberSet());
+ for (int i = INITIAL_SIZE; i < INITIAL_SIZE + SIZE_OF_NEW_ELEMENTS_TO_ADD; ++i) {
+ set.add(new byte[] {(byte) i});
+ assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
+ }
+ assertThat(set.size()).isEqualTo(INITIAL_SIZE + SIZE_OF_NEW_ELEMENTS_TO_ADD);
+ }
+
+ @Test
+ public void removeAllMembers_assertSetSizeIsAccurate() {
+ RedisSet.MemberSet set = new RedisSet.MemberSet(initializeMemberSet());
+ for (int i = 0; i < INITIAL_SIZE + SIZE_OF_NEW_ELEMENTS_TO_ADD; ++i) {
+ set.remove(new byte[] {(byte) i});
+ assertThat(set.getSizeInBytes()).isEqualTo(expectedSize(set));
+ }
+ assertThat(set.size()).isEqualTo(0);
+ }
+
+ @Test
+ public void revWorksWhenSignBitIsSet() {
+ assertThat(SizeableObjectOpenCustomHashSetWithCursor.rev(0xFF000000)).isEqualTo(0xFF);
+ assertThat(SizeableObjectOpenCustomHashSetWithCursor.rev(0xFF)).isEqualTo(0xFF000000);
+ }
+
+ @Test
+ public void scanEntireSet_ReturnsExpectedElements() {
+ ByteSet set = new ByteSet();
+ IntStream.range(0, 10).forEach(i -> set.add(makeKey(i)));
+
+ List<byte[]> scanned = new ArrayList<>();
+ int result = set.scan(0, 10000, List::add, scanned);
+ assertThat(result).isZero();
+ assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+ }
+
+ private static byte[] makeKey(Integer i) {
+ return i.toString().getBytes();
+ }
+
+ private void fillSetWithUniqueHashKeys(ByteSet set, int keysToAdd) {
+ int keyCounter = 0;
+ Set<Integer> hashesAdded = new HashSet<>();
+ while (keysToAdd > 0) {
+ byte[] key = makeKey(keyCounter);
+ int keyHash = set.hash(key);
+ if (!hashesAdded.contains(keyHash)) {
+ hashesAdded.add(keyHash);
+ set.add(key);
+ keysToAdd--;
+ }
+ keyCounter++;
+ }
+ }
+
+ private void fillSetWithCollidingHashKeys(ByteSet set, int keysToAdd) {
+ int keyCounter = 0;
+ Set<Integer> hashesAdded = new HashSet<>();
+ while (keysToAdd > 0) {
+ byte[] key = makeKey(keyCounter);
+ int keyHash = set.hash(key);
+ if (hashesAdded.isEmpty() || hashesAdded.contains(keyHash)) {
+ hashesAdded.add(keyHash);
+ set.add(key);
+ keysToAdd--;
+ }
+ keyCounter++;
+ }
+ }
+
+ @Test
+ public void twoScansWithNoModifications_ReturnsExpectedElements() {
+ final int setSize = 10;
+ ByteSet set = new ByteSet(setSize * 2); // *2 to prevent rehash
+ fillSetWithUniqueHashKeys(set, setSize);
+ List<byte[]> scanned = new ArrayList<>();
+ int scanSize = 1 + set.size() / 2;
+ // Scan part way through the set
+ int cursor = set.scan(0, scanSize, List::add, scanned);
+ assertThat(scanned).hasSize(scanSize);
+
+ // Scan past the end of the set
+ cursor = set.scan(cursor, scanSize, List::add, scanned);
+ assertThat(scanned).hasSize(set.size());
+ assertThat(cursor).isEqualTo(0);
+
+ assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+ }
+
+ @Test
+ public void scanWithConcurrentRemoves_ReturnsExpectedElements() {
+ final int initialSetSize = 10;
+ ByteSet set = new ByteSet(
+ initialSetSize * 2); // *2 to prevent rehash
+ fillSetWithUniqueHashKeys(set, initialSetSize);
+ List<byte[]> scanned = new ArrayList<>();
+ int cursor = set.scan(0, initialSetSize / 2, List::add, scanned);
+ assertThat(scanned).hasSize(initialSetSize / 2);
+
+ // Remove some elements
+ ObjectIterator<byte[]> iterator = set.iterator();
+ int removeCount = initialSetSize / 2 - 1;
+ while (removeCount > 0 && iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ removeCount--;
+ }
+
+ cursor = set.scan(cursor, initialSetSize / 2, List::add, scanned);
+ assertThat(cursor).isZero();
+
+ assertThat(scanned).containsAll(set);
+ }
+
+ @Test
+ public void scanWithHashcodeCollisions_ReturnsExpectedElements() {
+ final int setSize = 10;
+ ByteSet set = new ByteSet(
+ setSize * 2); // *2 to prevent rehash
+ fillSetWithCollidingHashKeys(set, setSize);
+ List<byte[]> scanned = new ArrayList<>();
+ int cursor = set.scan(0, 1, List::add, scanned);
+
+ // The scan had to ignore the count and return all the elements with the same hash
+ assertThat(scanned).hasSize(setSize);
+ assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+ cursor = set.scan(cursor, 1, List::add, scanned);
+ assertThat(cursor).isZero();
+ assertThat(scanned).hasSize(setSize);
+ assertThat(scanned).containsExactlyInAnyOrderElementsOf(set);
+ }
+
+ @Test
+ public void scanWithHashcodeCollisionsAndConcurrentRemoves_ReturnsExpectedElements() {
+ final int initialSetSize = 10;
+ ByteSet set = new ByteSet(
+ initialSetSize * 2); // *2 to prevent rehash
+ fillSetWithCollidingHashKeys(set, initialSetSize);
+ List<byte[]> scanned = new ArrayList<>();
+
+ int cursor = set.scan(0, initialSetSize / 2, List::add, scanned);
+ assertThat(scanned).hasSize(initialSetSize);
+
+ // Remove some elements
+ ObjectIterator<byte[]> iterator = set.iterator();
+ int removeCount = initialSetSize / 2 - 1;
+ while (removeCount > 0 && iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ removeCount--;
+ }
+
+ cursor = set.scan(cursor, initialSetSize / 2, List::add, scanned);
+
+ assertThat(cursor).isZero();
+ assertThat(scanned).hasSize(initialSetSize);
+ }
+
+ @Test
+ public void scanWithGrowingTable_DoesNotMissElements() {
+ final int initialSetSize = 10;
+ ByteSet set =
+ new ByteSet(initialSetSize * 2); // *2 to prevent rehash
+ fillSetWithUniqueHashKeys(set, initialSetSize);
+ List<byte[]> scanned = new ArrayList<>();
+ List<byte[]> initialKeys = new ArrayList<>(set.size());
+ initialKeys.addAll(set);
+
+ int cursor = set.scan(0, initialSetSize / 2, List::add, scanned);
+ assertThat(scanned).hasSize(initialSetSize / 2);
+
+ // Add a lot of elements to trigger a resize
+ IntStream.range(10, 500).forEach(i -> set.add(makeKey(i)));
+
+ cursor = set.scan(cursor, 500, List::add, scanned);
+ assertThat(cursor).isEqualTo(0);
+
+ // We don't know that we will have all the 500 new elements, only that
+ // we should have scanned all the original elements
+ assertThat(scanned).containsAll(initialKeys);
+ }
+
+ @Test
+ public void scanWithShrinkingTable_DoesNotMissElements() {
+ final int initialSetSize = 500;
+ ByteSet set = new ByteSet(1); // 1 to ensure resizing back down
+
+ fillSetWithUniqueHashKeys(set, initialSetSize);
+ List<byte[]> scanned = new ArrayList<>();
+ int cursor = set.scan(0, 50, List::add, scanned);
+ assertThat(scanned).hasSize(50);
+
+ // Remove a lot of elements to trigger a resize
+ // Remove some elements
+ ObjectIterator<byte[]> iterator = set.iterator();
+ int removeCount = initialSetSize - 100;
+ while (removeCount > 0 && iterator.hasNext()) {
+ iterator.next();
+ iterator.remove();
+ removeCount--;
+ }
+
+ cursor = set.scan(cursor, initialSetSize, List::add, scanned);
+ assertThat(cursor).isZero();
+
+ // Scan should at least have all the remaining keys
+ assertThat(scanned).containsAll(set);
+ }
+
+}