You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/03/16 18:54:35 UTC
[geode] 02/36: Fix correct reporting of hosts and ports for SLOT
response
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch feature/redis-performance-testing
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 3df53f9b56898bb0d5f538425f0de725fea67806
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Wed Feb 17 11:57:30 2021 -0800
Fix correct reporting of hosts and ports for SLOT response
---
.../cluster/RedisPartitionResolverDUnitTest.java | 23 ++++++++++-
.../geode/redis/internal/GeodeRedisServer.java | 3 ++
.../geode/redis/internal/RedisCommandType.java | 2 +-
.../internal/cluster/BucketRetrievalFunction.java | 47 ++++++++++++++++++----
.../internal/executor/cluster/ClusterExecutor.java | 31 ++++++++++----
5 files changed, 89 insertions(+), 17 deletions(-)
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
index 3304932..e4df831 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
@@ -17,7 +17,6 @@ package org.apache.geode.redis.internal.cluster;
import static org.assertj.core.api.Assertions.assertThat;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -58,6 +57,7 @@ public class RedisPartitionResolverDUnitTest {
private static int redisServerPort1;
private static int redisServerPort2;
+ private static int redisServerPort3;
@BeforeClass
public static void classSetup() {
@@ -68,6 +68,7 @@ public class RedisPartitionResolverDUnitTest {
redisServerPort1 = cluster.getRedisPort(1);
redisServerPort2 = cluster.getRedisPort(2);
+ redisServerPort3 = cluster.getRedisPort(3);
jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
}
@@ -99,8 +100,28 @@ public class RedisPartitionResolverDUnitTest {
assertThat(buckets1.size() + buckets2.size() + buckets3.size())
.isEqualTo(RegionProvider.REDIS_REGION_BUCKETS);
+ }
+ @Test
+ public void testClusterSlotsReferencesAllServers() {
+ int numKeys = 1000;
+ for (int i = 0; i < numKeys; i++) {
+ String key = "key-" + i;
+ jedis1.set(key, "value-" + i);
+ }
+
+ List<Object> clusterSlots = jedis1.clusterSlots();
+
+ assertThat(clusterSlots).hasSize(RegionProvider.REDIS_REGION_BUCKETS);
+
+ // Gather all unique ports
+ Set<Long> ports = new HashSet<>();
+ for (Object slotObj : clusterSlots) {
+ ports.add((Long) (((List<Object>) ((List<Object>) slotObj).get(2))).get(1));
+ }
+ assertThat(ports).containsExactlyInAnyOrder((long) redisServerPort1, (long) redisServerPort2,
+ (long) redisServerPort3);
}
private Map<ByteArrayWrapper, Integer> getKeyToBucketMap(MemberVM vm) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 06fa6d6..880fee2 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -27,6 +27,7 @@ import org.apache.geode.internal.statistics.StatisticsClock;
import org.apache.geode.internal.statistics.StatisticsClockFactory;
import org.apache.geode.logging.internal.executors.LoggingExecutors;
import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction;
import org.apache.geode.redis.internal.executor.CommandFunction;
import org.apache.geode.redis.internal.executor.StripedExecutor;
import org.apache.geode.redis.internal.executor.SynchronizedStripedExecutor;
@@ -102,6 +103,8 @@ public class GeodeRedisServer {
regionProvider, pubSub,
this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats,
redisCommandExecutor);
+
+ BucketRetrievalFunction.register(nettyRedisServer.getPort());
}
@VisibleForTesting
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 33ca297..74ac471 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -277,7 +277,7 @@ public enum RedisCommandType {
SLOWLOG(new SlowlogExecutor(), UNSUPPORTED, new SlowlogParameterRequirements()),
TIME(new TimeExecutor(), UNSUPPORTED, new ExactParameterRequirements(1)),
- /*********** CLUSTER **********/
+ /*********** CLUSTER **********/
CLUSTER(new ClusterExecutor(), UNSUPPORTED, new MinimumParameterRequirements(1)),
/////////// UNIMPLEMENTED /////////////////////
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
index ed9144c..48c4fb6 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
@@ -19,16 +19,21 @@ import java.io.Serializable;
import java.net.InetAddress;
import java.util.Set;
-import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.partition.PartitionRegionHelper;
import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.internal.cache.execute.InternalFunction;
import org.apache.geode.internal.inet.LocalHostUtil;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
-public class BucketRetrievalFunction implements Function<Void> {
+public class BucketRetrievalFunction implements InternalFunction<Void> {
+ public static final String ID = "REDIS_BUCKET_SLOT_FUNCTION";
private static final String hostAddress;
+ private final int redisPort;
static {
InetAddress localhost = null;
@@ -40,28 +45,56 @@ public class BucketRetrievalFunction implements Function<Void> {
hostAddress = localhost == null ? "localhost" : localhost.getHostAddress();
}
+ public BucketRetrievalFunction(int redisPort) {
+ this.redisPort = redisPort;
+ }
+
+ public static void register(int redisPort) {
+ FunctionService.registerFunction(new BucketRetrievalFunction(redisPort));
+ }
+
@Override
public void execute(FunctionContext<Void> context) {
- LocalDataSet local = (LocalDataSet) PartitionRegionHelper
- .getLocalDataForContext((RegionFunctionContext) context);
+ Region<ByteArrayWrapper, ByteArrayWrapper> region =
+ context.getCache().getRegion(RegionProvider.REDIS_DATA_REGION);
+
+ LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region);
- MemberBuckets mb = new MemberBuckets(hostAddress, local.getBucketSet());
+ MemberBuckets mb =
+ new MemberBuckets(context.getMemberName(), hostAddress, redisPort, local.getBucketSet());
context.getResultSender().lastResult(mb);
}
+ @Override
+ public String getId() {
+ return ID;
+ }
+
public static class MemberBuckets implements Serializable {
+ private final String memberId;
private final String hostAddress;
+ private final int port;
private final Set<Integer> bucketIds;
- public MemberBuckets(String hostAddress, Set<Integer> bucketIds) {
+ public MemberBuckets(String memberId, String hostAddress, int port, Set<Integer> bucketIds) {
+ this.memberId = memberId;
this.hostAddress = hostAddress;
+ this.port = port;
this.bucketIds = bucketIds;
}
+ public String getMemberId() {
+ return memberId;
+ }
+
public String getHostAddress() {
return hostAddress;
}
+ public int getPort() {
+ return port;
+ }
+
public Set<Integer> getBucketIds() {
return bucketIds;
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
index 7fac868..f7e8315 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
@@ -22,18 +22,27 @@ import static org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.Me
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.Logger;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.partition.PartitionMemberInfo;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.partition.PartitionRegionInfo;
+import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
import org.apache.geode.redis.internal.executor.AbstractExecutor;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.netty.Command;
@@ -68,20 +77,25 @@ public class ClusterExecutor extends AbstractExecutor {
return response;
}
- // @SuppressWarnings("unchecked")
+ @SuppressWarnings("unchecked")
private RedisResponse getSlots(ExecutionHandlerContext ctx) {
- Region<?, ?> region = ctx.getRegionProvider().getDataRegion();
+ Region<ByteArrayWrapper, RedisData> dataRegion = ctx.getRegionProvider().getDataRegion();
+ PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(dataRegion);
+ Set<DistributedMember> membersWithDataRegion = new HashSet<>();
+ for (PartitionMemberInfo memberInfo : info.getPartitionMemberInfo()) {
+ membersWithDataRegion.add(memberInfo.getDistributedMember());
+ }
- Execution<Void, MemberBuckets, List<MemberBuckets>> execution =
- FunctionService.onRegion(region);
ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector =
- execution.execute(new BucketRetrievalFunction());
+ FunctionService.onMembers(membersWithDataRegion).execute(BucketRetrievalFunction.ID);
SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>();
+ Map<String, Pair<String, Integer>> memberToHostPortMap = new TreeMap<>();
int retrievedBucketCount = 0;
for (MemberBuckets m : resultCollector.getResult()) {
+ memberToHostPortMap.put(m.getMemberId(), Pair.of(m.getHostAddress(), m.getPort()));
for (Integer id : m.getBucketIds()) {
- bucketToMemberMap.put(id, m.getHostAddress());
+ bucketToMemberMap.put(id, m.getMemberId());
retrievedBucketCount++;
}
}
@@ -96,10 +110,11 @@ public class ClusterExecutor extends AbstractExecutor {
List<Object> slots = new ArrayList<>();
for (String member : bucketToMemberMap.values()) {
+ Pair<String, Integer> hostAndPort = memberToHostPortMap.get(member);
List<?> entry = Arrays.asList(
index * slotsPerBucket,
((index + 1) * slotsPerBucket) - 1,
- Arrays.asList(member, ctx.getServerPort()));
+ Arrays.asList(hostAndPort.getLeft(), hostAndPort.getRight()));
slots.add(entry);
index++;