You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/05/14 19:44:10 UTC
[geode] 02/02: GEODE-9256: Use a stateless hscan implementation
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit c407da1306990885052c411d3e679cafedda1e08
Author: Dan Smith <da...@vmware.com>
AuthorDate: Fri May 7 16:07:11 2021 -0700
GEODE-9256: Use a stateless hscan implementation
Replacing our stateful hscan implementation with a stateless implementation
based on the same algorim as redis. This uses the stateless cursor method
add Object2ObjectOpenCustomHashMap in the previous commit.
---
.../hash/AbstractHScanIntegrationTest.java | 2 +-
.../executor/hash/HScanIntegrationTest.java | 28 ----
.../hash/MemoryOverheadIntegrationTest.java | 2 +-
.../geode/redis/internal/data/RedisHash.java | 184 +++------------------
.../data/RedisHashCommandsFunctionExecutor.java | 5 +-
.../redis/internal/executor/CommandFunction.java | 4 +-
.../internal/executor/hash/HScanExecutor.java | 11 +-
.../internal/executor/hash/RedisHashCommands.java | 3 +-
.../hash/RedisHashCommandsFunctionInvoker.java | 5 +-
.../apache/geode/redis/internal/netty/Client.java | 8 -
.../internal/netty/ExecutionHandlerContext.java | 15 --
.../geode/redis/internal/data/RedisHashTest.java | 105 +++---------
12 files changed, 51 insertions(+), 321 deletions(-)
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
index c02538f..f0146b7 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/AbstractHScanIntegrationTest.java
@@ -453,7 +453,7 @@ public abstract class AbstractHScanIntegrationTest implements RedisIntegrationTe
ScanResult<Map.Entry<String, String>> result = jedis.hscan("colors", "5");
assertThat(new HashSet<>(result.getResult()))
- .containsExactlyInAnyOrderElementsOf(data.entrySet());
+ .isSubsetOf(data.entrySet());
}
/**** Concurrency ***/
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java
index 2a15f3c..4a8ba99 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/HScanIntegrationTest.java
@@ -17,17 +17,12 @@ package org.apache.geode.redis.internal.executor.hash;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_CURSOR;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
import org.junit.ClassRule;
import org.junit.Test;
-import redis.clients.jedis.ScanParams;
-import redis.clients.jedis.ScanResult;
import org.apache.geode.redis.GeodeRedisServerRule;
@@ -64,27 +59,4 @@ public class HScanIntegrationTest extends AbstractHScanIntegrationTest {
assertThatThrownBy(() -> jedis.hscan("a", tooSmallCursor.toString()))
.hasMessageContaining(ERROR_CURSOR);
}
-
-
- @Test
- public void givenCount_shouldReturnExpectedNumberOfEntries() {
- Map<byte[], byte[]> entryMap = new HashMap<>();
- entryMap.put("1".getBytes(), "yellow".getBytes());
- entryMap.put("2".getBytes(), "green".getBytes());
- entryMap.put("3".getBytes(), "orange".getBytes());
- jedis.hmset("colors".getBytes(), entryMap);
-
- int COUNT_PARAM = 2;
-
- ScanParams scanParams = new ScanParams();
- scanParams.count(COUNT_PARAM);
- ScanResult<Map.Entry<byte[], byte[]>> result;
-
- String cursor = "0";
-
- result = jedis.hscan("colors".getBytes(), cursor.getBytes(), scanParams);
-
- assertThat(result.getResult().size()).isEqualTo(COUNT_PARAM);
- }
-
}
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
index eae64bb..ac66c5a 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
@@ -56,7 +56,7 @@ public class MemoryOverheadIntegrationTest extends AbstractMemoryOverheadIntegra
result.put(Measurement.STRING, 185);
result.put(Measurement.SET, 386);
result.put(Measurement.SET_ENTRY, 72);
- result.put(Measurement.HASH, 490);
+ result.put(Measurement.HASH, 338);
result.put(Measurement.HASH_ENTRY, 50);
return result;
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
index eea510c..8550651 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
@@ -16,9 +16,6 @@
package org.apache.geode.redis.internal.data;
-import static java.lang.System.currentTimeMillis;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.apache.geode.logging.internal.executors.LoggingExecutors.newSingleThreadScheduledExecutor;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
import static org.apache.geode.redis.internal.RedisConstants.ERROR_OVERFLOW;
@@ -33,15 +30,10 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.regex.Pattern;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
-import it.unimi.dsi.fastutil.objects.Object2ObjectOpenCustomHashMap;
import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.VisibleForTesting;
@@ -49,100 +41,47 @@ import org.apache.geode.cache.Region;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.redis.internal.collections.Object2ObjectOpenCustomHashMapWithCursor;
import org.apache.geode.redis.internal.delta.AddsDeltaInfo;
import org.apache.geode.redis.internal.delta.DeltaInfo;
import org.apache.geode.redis.internal.delta.RemsDeltaInfo;
import org.apache.geode.redis.internal.netty.Coder;
public class RedisHash extends AbstractRedisData {
- private Object2ObjectOpenCustomHashMap<byte[], byte[]> hash;
- private final ConcurrentHashMap<UUID, List<byte[]>> hScanSnapShots = new ConcurrentHashMap<>();
- private final ConcurrentHashMap<UUID, Long> hScanSnapShotCreationTimes =
- new ConcurrentHashMap<>();
- private ScheduledExecutorService HSCANSnapshotExpirationExecutor = null;
-
- private int sizeInBytes = BASE_REDIS_HASH_OVERHEAD;
-
// the following constants were calculated using reflection and math. you can find the tests for
// these values in RedisHashTest, which show the way these numbers were calculated. the constants
// have the advantage of saving us a lot of computation that would happen every time a new key was
// added. if our internal implementation changes, these values may be incorrect. the tests will
// catch this change. an increase in overhead should be carefully considered.
- protected static final int BASE_REDIS_HASH_OVERHEAD = 336;
+ protected static final int BASE_REDIS_HASH_OVERHEAD = 184;
protected static final int HASH_MAP_VALUE_PAIR_OVERHEAD = 48;
- private static final int defaultHscanSnapshotsExpireCheckFrequency =
- Integer.getInteger("redis.hscan-snapshot-cleanup-interval", 30000);
-
- private static final int defaultHscanSnapshotsMillisecondsToLive =
- Integer.getInteger("redis.hscan-snapshot-expiry", 30000);
+ private Object2ObjectOpenCustomHashMapWithCursor<byte[], byte[]> hash;
- private int HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS =
- defaultHscanSnapshotsExpireCheckFrequency;
- private int MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE =
- defaultHscanSnapshotsMillisecondsToLive;
+ private int sizeInBytes = BASE_REDIS_HASH_OVERHEAD;
@VisibleForTesting
- public RedisHash(List<byte[]> fieldsToSet, int hscanSnapShotExpirationCheckFrequency,
- int minimumLifeForHscanSnaphot) {
+ public RedisHash(List<byte[]> fieldsToSet) {
final int numKeysAndValues = fieldsToSet.size();
if (numKeysAndValues % 2 != 0) {
throw new IllegalStateException(
"fieldsToSet should have an even number of elements but was size " + numKeysAndValues);
}
- HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS = hscanSnapShotExpirationCheckFrequency;
- MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE = minimumLifeForHscanSnaphot;
-
- hash = new Object2ObjectOpenCustomHashMap<>(numKeysAndValues / 2, ByteArrays.HASH_STRATEGY);
+ hash = new Object2ObjectOpenCustomHashMapWithCursor<>(numKeysAndValues / 2,
+ ByteArrays.HASH_STRATEGY);
Iterator<byte[]> iterator = fieldsToSet.iterator();
while (iterator.hasNext()) {
hashPut(iterator.next(), iterator.next());
}
}
- public RedisHash(List<byte[]> fieldsToSet) {
- this(fieldsToSet,
- defaultHscanSnapshotsExpireCheckFrequency,
- defaultHscanSnapshotsMillisecondsToLive);
- }
-
/**
* For deserialization only.
*/
public RedisHash() {}
- private void expireHScanSnapshots() {
- hScanSnapShotCreationTimes.forEach((client, creationTime) -> {
- long millisecondsSinceCreation = currentTimeMillis() - creationTime;
-
- if (millisecondsSinceCreation >= MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE) {
- removeHSCANSnapshot(client);
- }
- });
- }
-
- @VisibleForTesting
- public ConcurrentHashMap<UUID, List<byte[]>> getHscanSnapShots() {
- return hScanSnapShots;
- }
-
- private void startHscanSnapshotScheduledRemoval() {
- final int DELAY = HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS;
-
- HSCANSnapshotExpirationExecutor =
- newSingleThreadScheduledExecutor("GemFireRedis-HSCANSnapshotRemoval-");
-
- HSCANSnapshotExpirationExecutor.scheduleWithFixedDelay(
- this::expireHScanSnapshots, DELAY, DELAY, MILLISECONDS);
- }
-
- private void shutDownHscanSnapshotScheduledRemoval() {
- HSCANSnapshotExpirationExecutor.shutdown();
- HSCANSnapshotExpirationExecutor = null;
- }
-
/**
* Since GII (getInitialImage) can come in and call toData while other threads are modifying this
* object, the striped executor will not protect toData. So any methods that modify "hash" needs
@@ -166,7 +105,7 @@ public class RedisHash extends AbstractRedisData {
throws IOException, ClassNotFoundException {
super.fromData(in, context);
int size = DataSerializer.readInteger(in);
- hash = new Object2ObjectOpenCustomHashMap<>(size, ByteArrays.HASH_STRATEGY);
+ hash = new Object2ObjectOpenCustomHashMapWithCursor<>(size, ByteArrays.HASH_STRATEGY);
for (int i = 0; i < size; i++) {
hash.put(DataSerializer.readByteArray(in), DataSerializer.readByteArray(in));
}
@@ -332,107 +271,30 @@ public class RedisHash extends AbstractRedisData {
return new ArrayList<>(hash.keySet());
}
- public ImmutablePair<Integer, List<byte[]>> hscan(UUID clientID, Pattern matchPattern,
+ public ImmutablePair<Integer, List<byte[]>> hscan(Pattern matchPattern,
int count,
- int startCursor) {
-
- List<byte[]> keysToScan = getSnapShotOfKeySet(clientID);
+ int cursor) {
- Pair<Integer, List<byte[]>> resultsPair =
- getResultsPair(keysToScan, startCursor, count, matchPattern);
+ ArrayList<byte[]> resultList = new ArrayList<>(count + 2);
+ do {
+ cursor = hash.scan(cursor, 1,
+ (list, key, value) -> addIfMatching(matchPattern, list, key, value), resultList);
+ } while (cursor != 0 && resultList.size() < (count * 2));
- List<byte[]> resultList = resultsPair.getRight();
-
- Integer numberOfIterationsCompleted = resultsPair.getLeft();
-
- int returnCursorValueAsInt =
- getCursorValueToReturn(startCursor, numberOfIterationsCompleted, keysToScan);
-
- if (returnCursorValueAsInt == 0) {
- removeHSCANSnapshot(clientID);
- }
-
- return new ImmutablePair<>(returnCursorValueAsInt, resultList);
+ return new ImmutablePair<>(cursor, resultList);
}
- private void removeHSCANSnapshot(UUID clientID) {
- hScanSnapShots.remove(clientID);
- hScanSnapShotCreationTimes.remove(clientID);
-
- if (hScanSnapShots.isEmpty()) {
- shutDownHscanSnapshotScheduledRemoval();
- }
- }
-
- private Pair<Integer, List<byte[]>> getResultsPair(List<byte[]> keysSnapShot,
- int startCursor,
- int count,
- Pattern matchPattern) {
-
- int indexOfKeys = startCursor;
-
- List<byte[]> resultList = new ArrayList<>();
-
- for (int index = startCursor; index < keysSnapShot.size(); index++) {
- if ((index - startCursor) == count) {
- break;
- }
-
- byte[] key = keysSnapShot.get(index);
- indexOfKeys++;
-
- byte[] value = hash.get(key);
- if (value == null) {
- continue;
- }
-
- if (matchPattern != null) {
- if (matchPattern.matcher(Coder.bytesToString(key)).matches()) {
- resultList.add(key);
- resultList.add(value);
- }
- } else {
+ private void addIfMatching(Pattern matchPattern, List<byte[]> resultList, byte[] key,
+ byte[] value) {
+ if (matchPattern != null) {
+ if (matchPattern.matcher(Coder.bytesToString(key)).matches()) {
resultList.add(key);
resultList.add(value);
}
+ } else {
+ resultList.add(key);
+ resultList.add(value);
}
-
- Integer numberOfIterationsCompleted = indexOfKeys - startCursor;
-
- return new ImmutablePair<>(numberOfIterationsCompleted, resultList);
- }
-
- private int getCursorValueToReturn(int startCursor,
- int numberOfIterationsCompleted,
- List<byte[]> keySnapshot) {
-
- if (startCursor + numberOfIterationsCompleted >= keySnapshot.size()) {
- return 0;
- }
-
- return (startCursor + numberOfIterationsCompleted);
- }
-
- private List<byte[]> getSnapShotOfKeySet(UUID clientID) {
- List<byte[]> keySnapShot = this.hScanSnapShots.get(clientID);
-
- if (keySnapShot == null) {
- if (hScanSnapShots.isEmpty()) {
- startHscanSnapshotScheduledRemoval();
- }
- keySnapShot = createKeySnapShot(clientID);
- }
- return keySnapShot;
- }
-
- private List<byte[]> createKeySnapShot(UUID clientID) {
-
- List<byte[]> keySnapShot = new ArrayList<>(hash.keySet());
-
- hScanSnapShots.put(clientID, keySnapShot);
- hScanSnapShotCreationTimes.put(clientID, currentTimeMillis());
-
- return keySnapShot;
}
public long hincrby(Region<RedisKey, RedisData> region, RedisKey key,
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
index 0aa7c20..7fa4a05 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
@@ -19,7 +19,6 @@ package org.apache.geode.redis.internal.data;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.Pair;
@@ -94,10 +93,10 @@ public class RedisHashCommandsFunctionExecutor extends RedisDataCommandsFunction
@Override
public Pair<Integer, List<byte[]>> hscan(RedisKey key, Pattern matchPattern,
- int count, int cursor, UUID clientID) {
+ int count, int cursor) {
return stripedExecute(key,
() -> getRedisHash(key, true)
- .hscan(clientID, matchPattern, count, cursor));
+ .hscan(matchPattern, count, cursor));
}
@Override
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
index aab3c59..8c5c601 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
@@ -20,7 +20,6 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.geode.cache.Region;
@@ -261,8 +260,7 @@ public class CommandFunction extends SingleResultRedisFunction {
Pattern pattern = (Pattern) args[1];
int count = (int) args[2];
int cursor = (int) args[3];
- UUID clientID = (UUID) args[4];
- return hashCommands.hscan(key, pattern, count, cursor, clientID);
+ return hashCommands.hscan(key, pattern, count, cursor);
}
case HINCRBY: {
byte[] field = (byte[]) args[1];
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
index bde3e4c..e3a9fe8 100755
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
@@ -22,7 +22,6 @@ import static org.apache.geode.redis.internal.data.RedisDataType.REDIS_HASH;
import java.math.BigInteger;
import java.util.List;
-import java.util.UUID;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@@ -49,8 +48,6 @@ public class HScanExecutor extends AbstractScanExecutor {
public RedisResponse executeCommand(Command command,
ExecutionHandlerContext context) {
- final UUID CLIENT_ID = context.getClientUUID();
-
List<byte[]> commandElems = command.getProcessedCommand();
String cursorString = Coder.bytesToString(commandElems.get(2));
@@ -65,10 +62,6 @@ public class HScanExecutor extends AbstractScanExecutor {
return RedisResponse.error(ERROR_CURSOR);
}
- if (cursor != context.getHscanCursor()) {
- cursor = 0;
- }
-
RedisKey key = command.getKey();
if (!getDataRegion(context).containsKey(key)) {
context.getRedisStats().incKeyspaceMisses();
@@ -117,9 +110,7 @@ public class HScanExecutor extends AbstractScanExecutor {
new RedisHashCommandsFunctionInvoker(context.getRegionProvider().getDataRegion());
Pair<Integer, List<byte[]>> scanResult =
- redisHashCommands.hscan(key, matchPattern, count, cursor, CLIENT_ID);
-
- context.setHscanCursor(scanResult.getLeft());
+ redisHashCommands.hscan(key, matchPattern, count, cursor);
return RedisResponse.scan(new BigInteger(String.valueOf(scanResult.getLeft())),
scanResult.getRight());
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
index b4119d8..cfe5046 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
@@ -18,7 +18,6 @@ package org.apache.geode.redis.internal.executor.hash;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.Pair;
@@ -47,7 +46,7 @@ public interface RedisHashCommands {
Collection<byte[]> hkeys(RedisKey key);
Pair<Integer, List<byte[]>> hscan(RedisKey key, Pattern matchPattern, int count,
- int cursor, UUID clientID);
+ int cursor);
long hincrby(RedisKey key, byte[] field, long increment);
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
index 29cfcde..f6702fb 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
@@ -32,7 +32,6 @@ import static org.apache.geode.redis.internal.RedisCommandType.HVALS;
import java.math.BigDecimal;
import java.util.Collection;
import java.util.List;
-import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.commons.lang3.tuple.Pair;
@@ -107,8 +106,8 @@ public class RedisHashCommandsFunctionInvoker extends RedisCommandsFunctionInvok
@Override
public Pair<Integer, List<byte[]>> hscan(RedisKey key, Pattern matchPattern,
- int count, int cursor, UUID clientID) {
- return invokeCommandFunction(key, HSCAN, matchPattern, count, cursor, clientID);
+ int count, int cursor) {
+ return invokeCommandFunction(key, HSCAN, matchPattern, count, cursor);
}
@Override
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
index 4724ed6..4c1780a 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/Client.java
@@ -17,7 +17,6 @@
package org.apache.geode.redis.internal.netty;
import java.util.Objects;
-import java.util.UUID;
import io.netty.channel.Channel;
import io.netty.util.concurrent.Future;
@@ -27,11 +26,8 @@ import io.netty.util.concurrent.GenericFutureListener;
public class Client {
private Channel channel;
- private UUID id;
-
public Client(Channel remoteAddress) {
this.channel = remoteAddress;
- this.id = UUID.randomUUID();
}
@Override
@@ -63,8 +59,4 @@ public class Client {
public String toString() {
return channel.toString();
}
-
- public UUID getId() {
- return id;
- }
}
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
index 506056c..5abe6a3 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/netty/ExecutionHandlerContext.java
@@ -18,7 +18,6 @@ package org.apache.geode.redis.internal.netty;
import java.io.IOException;
import java.math.BigInteger;
-import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -87,7 +86,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
private final DistributedMember member;
private BigInteger scanCursor;
private BigInteger sscanCursor;
- private int hscanCursor;
private final AtomicBoolean channelInactive = new AtomicBoolean();
private final int MAX_QUEUED_COMMANDS =
Integer.getInteger("geode.redis.commandQueueSize", 1000);
@@ -131,7 +129,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
this.member = member;
this.scanCursor = new BigInteger("0");
this.sscanCursor = new BigInteger("0");
- this.hscanCursor = 0;
redisStats.addClient();
backgroundExecutor.submit(this::processCommandQueue);
@@ -407,10 +404,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
return client;
}
- public UUID getClientUUID() {
- return this.getClient().getId();
- }
-
public void shutdown() {
shutdownInvoker.run();
}
@@ -439,14 +432,6 @@ public class ExecutionHandlerContext extends ChannelInboundHandlerAdapter {
this.sscanCursor = sscanCursor;
}
- public int getHscanCursor() {
- return this.hscanCursor;
- }
-
- public void setHscanCursor(int hscanCursor) {
- this.hscanCursor = hscanCursor;
- }
-
public String getMemberName() {
return member.getUniqueId();
}
diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
index 48d8077..de71499 100644
--- a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
+++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
@@ -17,7 +17,6 @@
package org.apache.geode.redis.internal.data;
import static java.lang.Math.round;
-import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.redis.internal.data.RedisHash.BASE_REDIS_HASH_OVERHEAD;
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
import static org.assertj.core.api.Assertions.assertThat;
@@ -34,10 +33,10 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.ImmutablePair;
import org.assertj.core.data.Offset;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -52,7 +51,6 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.size.ReflectionObjectSizer;
import org.apache.geode.redis.internal.netty.Coder;
-import org.apache.geode.test.awaitility.GeodeAwaitility;
public class RedisHashTest {
private final ReflectionObjectSizer reflectionObjectSizer = ReflectionObjectSizer.getInstance();
@@ -180,85 +178,26 @@ public class RedisHashTest {
/************* HSCAN *************/
@Test
- public void hscanSnaphots_shouldBeEmpty_givenHscanHasNotBeenCalled() {
- RedisHash subject = createRedisHash(100);
- assertThat(subject.getHscanSnapShots()).isEmpty();
- }
-
- @Test
- public void hscanSnaphots_shouldContainSnapshot_givenHscanHasBeenCalled() {
-
- final List<byte[]> FIELDS_AND_VALUES_FOR_HASH = createListOfDataElements(100);
- RedisHash subject = new RedisHash(FIELDS_AND_VALUES_FOR_HASH);
- UUID clientID = UUID.randomUUID();
-
- subject.hscan(clientID, null, 10, 0);
-
- ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap = subject.getHscanSnapShots();
-
- assertThat(hscanSnapShotMap.containsKey(clientID)).isTrue();
-
- List<byte[]> keyList = hscanSnapShotMap.get(clientID);
- assertThat(keyList).isNotEmpty();
-
- FIELDS_AND_VALUES_FOR_HASH.forEach((entry) -> {
- if (Coder.bytesToString(entry).contains("field")) {
- assertThat(keyList).contains(entry);
- } else if (Coder.bytesToString(entry).contains("value")) {
- assertThat(keyList).doesNotContain(entry);
- }
- });
-
- }
-
- @Test
- public void hscanSnaphots_shouldContainSnapshot_givenHscanHasBeenCalled_WithNonZeroCursor() {
+ public void hscanReturnsCorrectNumberOfElements() throws IOException, ClassNotFoundException {
+ RedisHash hash = createRedisHash("k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4");
+ ImmutablePair<Integer, List<byte[]>> result =
+ hash.hscan(null, 2, 0);
- final List<byte[]> FIELDS_AND_VALUES_FOR_HASH = createListOfDataElements(100);
- RedisHash subject = new RedisHash(FIELDS_AND_VALUES_FOR_HASH);
- UUID clientID = UUID.randomUUID();
-
- subject.hscan(clientID, null, 10, 10);
-
- ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap = subject.getHscanSnapShots();
-
- assertThat(hscanSnapShotMap.containsKey(clientID)).isTrue();
-
- List<byte[]> keyList = hscanSnapShotMap.get(clientID);
- assertThat(keyList).isNotEmpty();
-
- FIELDS_AND_VALUES_FOR_HASH.forEach((entry) -> {
- if (Coder.bytesToString(entry).contains("field")) {
- assertThat(keyList).contains(entry);
- } else if (Coder.bytesToString(entry).contains("value")) {
- assertThat(keyList).doesNotContain(entry);
- }
- });
+ assertThat(result.left).isNotEqualTo(0);
+ assertThat(result.right).hasSize(4);
+ result = hash.hscan(null, 3, result.left);
+ assertThat(result.left).isEqualTo(0);
+ assertThat(result.right).hasSize(4);
}
@Test
- public void hscanSnaphots_shouldBeRemoved_givenCompleteIteration() {
- RedisHash subject = createRedisHashWithExpiration(1, 100000);
- UUID client_ID = UUID.randomUUID();
-
- subject.hscan(client_ID, null, 10, 0);
+ public void hscanOnlyReturnsElementsMatchingPattern() throws IOException, ClassNotFoundException {
+ RedisHash hash = createRedisHash("ak1", "v1", "k2", "v2", "ak3", "v3", "k4", "v4");
+ ImmutablePair<Integer, List<byte[]>> result =
+ hash.hscan(Pattern.compile("a.*"), 3, 0);
- ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap = subject.getHscanSnapShots();
- assertThat(hscanSnapShotMap).isEmpty();
- }
-
- @Test
- public void hscanSnaphots_shouldExpireAfterExpiryPeriod() {
- RedisHash subject = createRedisHashWithExpiration(1000, 1);
- UUID client_ID = UUID.randomUUID();
-
- subject.hscan(client_ID, null, 1, 0);
-
- GeodeAwaitility.await().atMost(4, SECONDS).untilAsserted(() -> {
- ConcurrentHashMap<UUID, List<byte[]>> hscanSnapShotMap =
- subject.getHscanSnapShots();
- assertThat(hscanSnapShotMap).isEmpty();
- });
+ assertThat(result.left).isEqualTo(0);
+ assertThat(toListOfStrings(result.right)).containsExactly("ak1", "v1", "ak3", "v3");
}
/************* Hash Size *************/
@@ -594,9 +533,8 @@ public class RedisHashTest {
}
/************* Helper Methods *************/
- private RedisHash createRedisHash(int NumberOfFields) {
- ArrayList<byte[]> elements = createListOfDataElements(NumberOfFields);
- return new RedisHash(elements);
+ private List<String> toListOfStrings(List<byte[]> byteList) {
+ return byteList.stream().map(Coder::bytesToString).collect(Collectors.toList());
}
private RedisHash createRedisHash(String... keysAndValues) {
@@ -616,11 +554,6 @@ public class RedisHashTest {
return elements;
}
- private RedisHash createRedisHashWithExpiration(int NumberOfFields, int hcanSnapshotExpiry) {
- ArrayList<byte[]> elements = createListOfDataElements(NumberOfFields);
- return new RedisHash(elements, hcanSnapshotExpiry, hcanSnapshotExpiry);
- }
-
private byte[] toBytes(String str) {
return Coder.stringToBytes(str);
}