You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/03/16 18:54:35 UTC

[geode] 02/36: Fix correct reporting of hosts and ports for SLOT response

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/redis-performance-testing
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 3df53f9b56898bb0d5f538425f0de725fea67806
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Wed Feb 17 11:57:30 2021 -0800

    Fix correct reporting of hosts and ports for SLOT response
---
 .../cluster/RedisPartitionResolverDUnitTest.java   | 23 ++++++++++-
 .../geode/redis/internal/GeodeRedisServer.java     |  3 ++
 .../geode/redis/internal/RedisCommandType.java     |  2 +-
 .../internal/cluster/BucketRetrievalFunction.java  | 47 ++++++++++++++++++----
 .../internal/executor/cluster/ClusterExecutor.java | 31 ++++++++++----
 5 files changed, 89 insertions(+), 17 deletions(-)

diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
index 3304932..e4df831 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
@@ -17,7 +17,6 @@ package org.apache.geode.redis.internal.cluster;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -58,6 +57,7 @@ public class RedisPartitionResolverDUnitTest {
 
   private static int redisServerPort1;
   private static int redisServerPort2;
+  private static int redisServerPort3;
 
   @BeforeClass
   public static void classSetup() {
@@ -68,6 +68,7 @@ public class RedisPartitionResolverDUnitTest {
 
     redisServerPort1 = cluster.getRedisPort(1);
     redisServerPort2 = cluster.getRedisPort(2);
+    redisServerPort3 = cluster.getRedisPort(3);
 
     jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
   }
@@ -99,8 +100,28 @@ public class RedisPartitionResolverDUnitTest {
 
     assertThat(buckets1.size() + buckets2.size() + buckets3.size())
         .isEqualTo(RegionProvider.REDIS_REGION_BUCKETS);
+  }
 
+  @Test
+  public void testClusterSlotsReferencesAllServers() {
+    int numKeys = 1000;
+    for (int i = 0; i < numKeys; i++) {
+      String key = "key-" + i;
+      jedis1.set(key, "value-" + i);
+    }
+
+    List<Object> clusterSlots = jedis1.clusterSlots();
+
+    assertThat(clusterSlots).hasSize(RegionProvider.REDIS_REGION_BUCKETS);
+
+    // Gather all unique ports
+    Set<Long> ports = new HashSet<>();
+    for (Object slotObj : clusterSlots) {
+      ports.add((Long) (((List<Object>) ((List<Object>) slotObj).get(2))).get(1));
+    }
 
+    assertThat(ports).containsExactlyInAnyOrder((long) redisServerPort1, (long) redisServerPort2,
+        (long) redisServerPort3);
   }
 
   private Map<ByteArrayWrapper, Integer> getKeyToBucketMap(MemberVM vm) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index 06fa6d6..880fee2 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -27,6 +27,7 @@ import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.internal.statistics.StatisticsClockFactory;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction;
 import org.apache.geode.redis.internal.executor.CommandFunction;
 import org.apache.geode.redis.internal.executor.StripedExecutor;
 import org.apache.geode.redis.internal.executor.SynchronizedStripedExecutor;
@@ -102,6 +103,8 @@ public class GeodeRedisServer {
         regionProvider, pubSub,
         this::allowUnsupportedCommands, this::shutdown, port, bindAddress, redisStats,
         redisCommandExecutor);
+
+    BucketRetrievalFunction.register(nettyRedisServer.getPort());
   }
 
   @VisibleForTesting
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 33ca297..74ac471 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -277,7 +277,7 @@ public enum RedisCommandType {
   SLOWLOG(new SlowlogExecutor(), UNSUPPORTED, new SlowlogParameterRequirements()),
   TIME(new TimeExecutor(), UNSUPPORTED, new ExactParameterRequirements(1)),
 
-  /***********  CLUSTER  **********/
+  /*********** CLUSTER **********/
   CLUSTER(new ClusterExecutor(), UNSUPPORTED, new MinimumParameterRequirements(1)),
 
   /////////// UNIMPLEMENTED /////////////////////
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
index ed9144c..48c4fb6 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
@@ -19,16 +19,21 @@ import java.io.Serializable;
 import java.net.InetAddress;
 import java.util.Set;
 
-import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.FunctionContext;
-import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.inet.LocalHostUtil;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 
-public class BucketRetrievalFunction implements Function<Void> {
+public class BucketRetrievalFunction implements InternalFunction<Void> {
 
+  public static final String ID = "REDIS_BUCKET_SLOT_FUNCTION";
   private static final String hostAddress;
+  private final int redisPort;
 
   static {
     InetAddress localhost = null;
@@ -40,28 +45,56 @@ public class BucketRetrievalFunction implements Function<Void> {
     hostAddress = localhost == null ? "localhost" : localhost.getHostAddress();
   }
 
+  public BucketRetrievalFunction(int redisPort) {
+    this.redisPort = redisPort;
+  }
+
+  public static void register(int redisPort) {
+    FunctionService.registerFunction(new BucketRetrievalFunction(redisPort));
+  }
+
   @Override
   public void execute(FunctionContext<Void> context) {
-    LocalDataSet local = (LocalDataSet) PartitionRegionHelper
-        .getLocalDataForContext((RegionFunctionContext) context);
+    Region<ByteArrayWrapper, ByteArrayWrapper> region =
+        context.getCache().getRegion(RegionProvider.REDIS_DATA_REGION);
+
+    LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region);
 
-    MemberBuckets mb = new MemberBuckets(hostAddress, local.getBucketSet());
+    MemberBuckets mb =
+        new MemberBuckets(context.getMemberName(), hostAddress, redisPort, local.getBucketSet());
     context.getResultSender().lastResult(mb);
   }
 
+  @Override
+  public String getId() {
+    return ID;
+  }
+
   public static class MemberBuckets implements Serializable {
+    private final String memberId;
     private final String hostAddress;
+    private final int port;
     private final Set<Integer> bucketIds;
 
-    public MemberBuckets(String hostAddress, Set<Integer> bucketIds) {
+    public MemberBuckets(String memberId, String hostAddress, int port, Set<Integer> bucketIds) {
+      this.memberId = memberId;
       this.hostAddress = hostAddress;
+      this.port = port;
       this.bucketIds = bucketIds;
     }
 
+    public String getMemberId() {
+      return memberId;
+    }
+
     public String getHostAddress() {
       return hostAddress;
     }
 
+    public int getPort() {
+      return port;
+    }
+
     public Set<Integer> getBucketIds() {
       return bucketIds;
     }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
index 7fac868..f7e8315 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
@@ -22,18 +22,27 @@ import static org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.Me
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.cache.Region;
-import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.partition.PartitionMemberInfo;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.cache.partition.PartitionRegionInfo;
+import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
@@ -68,20 +77,25 @@ public class ClusterExecutor extends AbstractExecutor {
     return response;
   }
 
-  // @SuppressWarnings("unchecked")
+  @SuppressWarnings("unchecked")
   private RedisResponse getSlots(ExecutionHandlerContext ctx) {
-    Region<?, ?> region = ctx.getRegionProvider().getDataRegion();
+    Region<ByteArrayWrapper, RedisData> dataRegion = ctx.getRegionProvider().getDataRegion();
+    PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(dataRegion);
+    Set<DistributedMember> membersWithDataRegion = new HashSet<>();
+    for (PartitionMemberInfo memberInfo : info.getPartitionMemberInfo()) {
+      membersWithDataRegion.add(memberInfo.getDistributedMember());
+    }
 
-    Execution<Void, MemberBuckets, List<MemberBuckets>> execution =
-        FunctionService.onRegion(region);
     ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector =
-        execution.execute(new BucketRetrievalFunction());
+        FunctionService.onMembers(membersWithDataRegion).execute(BucketRetrievalFunction.ID);
 
     SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>();
+    Map<String, Pair<String, Integer>> memberToHostPortMap = new TreeMap<>();
     int retrievedBucketCount = 0;
     for (MemberBuckets m : resultCollector.getResult()) {
+      memberToHostPortMap.put(m.getMemberId(), Pair.of(m.getHostAddress(), m.getPort()));
       for (Integer id : m.getBucketIds()) {
-        bucketToMemberMap.put(id, m.getHostAddress());
+        bucketToMemberMap.put(id, m.getMemberId());
         retrievedBucketCount++;
       }
     }
@@ -96,10 +110,11 @@ public class ClusterExecutor extends AbstractExecutor {
     List<Object> slots = new ArrayList<>();
 
     for (String member : bucketToMemberMap.values()) {
+      Pair<String, Integer> hostAndPort = memberToHostPortMap.get(member);
       List<?> entry = Arrays.asList(
           index * slotsPerBucket,
           ((index + 1) * slotsPerBucket) - 1,
-          Arrays.asList(member, ctx.getServerPort()));
+          Arrays.asList(hostAndPort.getLeft(), hostAndPort.getRight()));
 
       slots.add(entry);
       index++;