You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/03/16 18:54:34 UTC

[geode] 01/36: WIP add clustering support

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/redis-performance-testing
in repository https://gitbox.apache.org/repos/asf/geode.git

commit cbe19e467f3d854a471e5b9857db6c5c9f1bebe0
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Tue Feb 16 11:01:54 2021 -0800

    WIP add clustering support
---
 .../org/apache/geode/cache/PartitionResolver.java  |   4 +-
 .../geode/internal/cache/PartitionedRegion.java    |   8 +-
 .../internal/cache/PartitionedRegionDataStore.java |  28 ++---
 .../cluster/RedisPartitionResolverDUnitTest.java   | 123 +++++++++++++++++++++
 .../executor/cluster/ClusterIntegrationTest.java   |  48 ++++++++
 .../geode/redis/internal/RedisCommandType.java     |   5 +-
 .../geode/redis/internal/RegionProvider.java       |  15 ++-
 .../internal/cluster/BucketRetrievalFunction.java  |  69 ++++++++++++
 .../redis/internal/data/ByteArrayWrapper.java      |  18 +++
 .../redis/internal/executor/cluster/CRC16.java     |  59 ++++++++++
 .../internal/executor/cluster/ClusterExecutor.java | 110 ++++++++++++++++++
 .../executor/cluster/RedisPartitionResolver.java   |  30 +++++
 .../sanctioned-geode-redis-serializables.txt       |   2 +
 13 files changed, 493 insertions(+), 26 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java b/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java
index 3e9981c..7ad3cb9 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java
@@ -76,5 +76,7 @@ public interface PartitionResolver<K, V> extends CacheCallback {
    *
    * @return String name
    */
-  String getName();
+  default String getName() {
+    return getClass().getName();
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index cd89363..68a039c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -8149,8 +8149,8 @@ public class PartitionedRegion extends LocalRegion
    * A test method to get the list of all the bucket ids for the partitioned region in the data
    * Store.
    */
-  public List getLocalBucketsListTestOnly() {
-    List localBucketList = null;
+  public List<Integer> getLocalBucketsListTestOnly() {
+    List<Integer> localBucketList = null;
     if (this.dataStore != null) {
       localBucketList = this.dataStore.getLocalBucketsListTestOnly();
     }
@@ -8161,8 +8161,8 @@ public class PartitionedRegion extends LocalRegion
    * A test method to get the list of all the primary bucket ids for the partitioned region in the
    * data Store.
    */
-  public List getLocalPrimaryBucketsListTestOnly() {
-    List localPrimaryList = null;
+  public List<Integer> getLocalPrimaryBucketsListTestOnly() {
+    List<Integer> localPrimaryList = null;
     if (this.dataStore != null) {
       localPrimaryList = this.dataStore.getLocalPrimaryBucketsListTestOnly();
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index d1d3dba..d1cb4e6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2604,14 +2604,9 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
    * <i>Test Method</i> Return the list of all the bucket names in this data store.
    *
    */
-  public List getLocalBucketsListTestOnly() {
-    final List bucketList = new ArrayList();
-    visitBuckets(new BucketVisitor() {
-      @Override
-      public void visit(Integer bucketId, Region r) {
-        bucketList.add(bucketId);
-      }
-    });
+  public List<Integer> getLocalBucketsListTestOnly() {
+    final List<Integer> bucketList = new ArrayList<>();
+    visitBuckets((bucketId, r) -> bucketList.add(bucketId));
     return bucketList;
   }
 
@@ -2635,16 +2630,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
    * <i>Test Method</i> Return the list of all the non primary bucket ids in this data store.
    *
    */
-  public List getLocalNonPrimaryBucketsListTestOnly() {
-    final List nonPrimaryBucketList = new ArrayList();
-    visitBuckets(new BucketVisitor() {
-      @Override
-      public void visit(Integer bucketId, Region r) {
-        BucketRegion br = (BucketRegion) r;
-        BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor();
-        if (!ba.isPrimary()) {
-          nonPrimaryBucketList.add(bucketId);
-        }
+  public List<Integer> getLocalNonPrimaryBucketsListTestOnly() {
+    final List<Integer> nonPrimaryBucketList = new ArrayList<>();
+    visitBuckets((bucketId, r) -> {
+      BucketRegion br = (BucketRegion) r;
+      BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor();
+      if (!ba.isPrimary()) {
+        nonPrimaryBucketList.add(bucketId);
       }
     });
     return nonPrimaryBucketList;
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
new file mode 100644
index 0000000..3304932
--- /dev/null
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.cluster;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class RedisPartitionResolverDUnitTest {
+
+  @ClassRule
+  public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(4);
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static Jedis jedis1;
+
+  private static MemberVM locator;
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  private static int redisServerPort1;
+  private static int redisServerPort2;
+
+  @BeforeClass
+  public static void classSetup() {
+    locator = cluster.startLocatorVM(0);
+    server1 = cluster.startRedisVM(1, locator.getPort());
+    server2 = cluster.startRedisVM(2, locator.getPort());
+    server3 = cluster.startRedisVM(3, locator.getPort());
+
+    redisServerPort1 = cluster.getRedisPort(1);
+    redisServerPort2 = cluster.getRedisPort(2);
+
+    jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void testSetup() {
+    jedis1.flushAll();
+  }
+
+  @Test
+  public void testRedisHashesMapToCorrectBuckets() {
+    int numKeys = 1000;
+    for (int i = 0; i < numKeys; i++) {
+      String key = "key-" + i;
+      jedis1.set(key, "value-" + i);
+    }
+
+    Map<ByteArrayWrapper, Integer> keyToBucketMap1 = getKeyToBucketMap(server1);
+    Map<ByteArrayWrapper, Integer> keyToBucketMap2 = getKeyToBucketMap(server2);
+    Map<ByteArrayWrapper, Integer> keyToBucketMap3 = getKeyToBucketMap(server3);
+
+    Set<Integer> buckets1 = new HashSet<>(keyToBucketMap1.values());
+    Set<Integer> buckets2 = new HashSet<>(keyToBucketMap2.values());
+    Set<Integer> buckets3 = new HashSet<>(keyToBucketMap3.values());
+
+    assertThat(buckets1).doesNotContainAnyElementsOf(buckets2);
+    assertThat(buckets1).doesNotContainAnyElementsOf(buckets3);
+    assertThat(buckets2).doesNotContainAnyElementsOf(buckets3);
+
+    assertThat(buckets1.size() + buckets2.size() + buckets3.size())
+        .isEqualTo(RegionProvider.REDIS_REGION_BUCKETS);
+
+
+  }
+
+  private Map<ByteArrayWrapper, Integer> getKeyToBucketMap(MemberVM vm) {
+    return vm.invoke(
+        (SerializableCallableIF<Map<ByteArrayWrapper, Integer>>) () -> {
+          Region<ByteArrayWrapper, RedisData> region =
+              RedisClusterStartupRule.getCache().getRegion(RegionProvider.REDIS_DATA_REGION);
+
+          LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region);
+          Map<ByteArrayWrapper, Integer> keyMap = new HashMap<>();
+
+          for (Object key : local.localKeys()) {
+            int id = local.getProxy().getKeyInfo(key).getBucketId();
+            keyMap.put((ByteArrayWrapper) key, id);
+          }
+
+          return keyMap;
+        });
+  }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java
new file mode 100644
index 0000000..6bbfa7e
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+
+public class ClusterIntegrationTest {
+
+  @ClassRule
+  public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+  private Jedis jedis;
+
+  @Before
+  public void setUp() {
+    jedis = new Jedis("localhost", server.getPort(), 10000000);
+  }
+
+  @After
+  public void tearDown() {
+    jedis.close();
+  }
+
+  @Test
+  public void testClusterSlots() {
+    System.err.println(server.getPort());
+    System.err.println(jedis.clusterSlots());
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 914e4f6..33ca297 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -33,6 +33,7 @@ import org.apache.geode.redis.internal.ParameterRequirements.UnspecifiedParamete
 import org.apache.geode.redis.internal.executor.Executor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.executor.UnknownExecutor;
+import org.apache.geode.redis.internal.executor.cluster.ClusterExecutor;
 import org.apache.geode.redis.internal.executor.connection.AuthExecutor;
 import org.apache.geode.redis.internal.executor.connection.EchoExecutor;
 import org.apache.geode.redis.internal.executor.connection.PingExecutor;
@@ -276,6 +277,9 @@ public enum RedisCommandType {
   SLOWLOG(new SlowlogExecutor(), UNSUPPORTED, new SlowlogParameterRequirements()),
   TIME(new TimeExecutor(), UNSUPPORTED, new ExactParameterRequirements(1)),
 
+  /***********  CLUSTER  **********/
+  CLUSTER(new ClusterExecutor(), UNSUPPORTED, new MinimumParameterRequirements(1)),
+
   /////////// UNIMPLEMENTED /////////////////////
 
   ACL(null, UNIMPLEMENTED),
@@ -288,7 +292,6 @@ public enum RedisCommandType {
   BZPOPMIN(null, UNIMPLEMENTED),
   BZPOPMAX(null, UNIMPLEMENTED),
   CLIENT(null, UNIMPLEMENTED),
-  CLUSTER(null, UNIMPLEMENTED),
   COMMAND(null, UNIMPLEMENTED),
   CONFIG(null, UNIMPLEMENTED),
   DEBUG(null, UNIMPLEMENTED),
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index c7be2af..a8cd6c8 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -14,19 +14,23 @@
  */
 package org.apache.geode.redis.internal;
 
+import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionFactory;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
 
 public class RegionProvider {
   /**
    * The name of the region that holds data stored in redis.
    */
-  private static final String REDIS_DATA_REGION = "__REDIS_DATA";
-  private static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG";
+  public static final String REDIS_DATA_REGION = "__REDIS_DATA";
+  public static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG";
+  public static final int REDIS_REGION_BUCKETS = Integer.getInteger("redis.region.buckets", 128);
+  public static final int REDIS_SLOTS = Integer.getInteger("redis.slots", 16384);
 
   private final Region<ByteArrayWrapper, RedisData> dataRegion;
   private final Region<String, Object> configRegion;
@@ -36,6 +40,13 @@ public class RegionProvider {
     InternalRegionFactory<ByteArrayWrapper, RedisData> redisDataRegionFactory =
         cache.createInternalRegionFactory(RegionShortcut.PARTITION_REDUNDANT);
     redisDataRegionFactory.setInternalRegion(true).setIsUsedForMetaRegion(true);
+
+    PartitionAttributesFactory<ByteArrayWrapper, RedisData> attributesFactory =
+        new PartitionAttributesFactory<>();
+    attributesFactory.setPartitionResolver(new RedisPartitionResolver());
+    attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS);
+    redisDataRegionFactory.setPartitionAttributes(attributesFactory.create());
+
     dataRegion = redisDataRegionFactory.create(REDIS_DATA_REGION);
 
     InternalRegionFactory<String, Object> redisConfigRegionFactory =
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
new file mode 100644
index 0000000..ed9144c
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.cluster;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.Set;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.internal.inet.LocalHostUtil;
+
+public class BucketRetrievalFunction implements Function<Void> {
+
+  private static final String hostAddress;
+
+  static {
+    InetAddress localhost = null;
+    try {
+      localhost = LocalHostUtil.getLocalHost();
+    } catch (Exception ex) {
+    }
+
+    hostAddress = localhost == null ? "localhost" : localhost.getHostAddress();
+  }
+
+  @Override
+  public void execute(FunctionContext<Void> context) {
+    LocalDataSet local = (LocalDataSet) PartitionRegionHelper
+        .getLocalDataForContext((RegionFunctionContext) context);
+
+    MemberBuckets mb = new MemberBuckets(hostAddress, local.getBucketSet());
+    context.getResultSender().lastResult(mb);
+  }
+
+  public static class MemberBuckets implements Serializable {
+    private final String hostAddress;
+    private final Set<Integer> bucketIds;
+
+    public MemberBuckets(String hostAddress, Set<Integer> bucketIds) {
+      this.hostAddress = hostAddress;
+      this.bucketIds = bucketIds;
+    }
+
+    public String getHostAddress() {
+      return hostAddress;
+    }
+
+    public Set<Integer> getBucketIds() {
+      return bucketIds;
+    }
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
index ae292ca..b24d5a6 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
@@ -15,6 +15,9 @@
  */
 package org.apache.geode.redis.internal.data;
 
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -26,6 +29,8 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.redis.internal.executor.cluster.CRC16;
+import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
 import org.apache.geode.redis.internal.netty.Coder;
 
 /**
@@ -40,6 +45,8 @@ public class ByteArrayWrapper
    */
   protected byte[] value;
 
+  private transient Object routingId;
+
   /**
    * Empty constructor for serialization
    */
@@ -108,6 +115,17 @@ public class ByteArrayWrapper
   }
 
   /**
+   * Used by the {@link RedisPartitionResolver} to map slots to buckets
+   */
+  public synchronized Object getRoutingId() {
+    if (routingId == null) {
+      routingId = (CRC16.calculate(value) % REDIS_SLOTS) % REDIS_REGION_BUCKETS;
+    }
+
+    return routingId;
+  }
+
+  /**
    * Private helper method to compare two byte arrays, A.compareTo(B). The comparison is basically
    * numerical, for each byte index, the byte representing the greater value will be the greater
    *
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
new file mode 100644
index 0000000..7d1d049
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+public class CRC16 {
+
+  // CCITT/SDLC/HDLC x^16 + x^12 + x^5 + 1 (CRC-16-CCITT)
+  private static final int CCITT_POLY = 0x8408;
+  private static final short[] crcTable = new short[256];
+
+  // Create the table up front
+  static {
+    int poly = reverseInt16(CCITT_POLY);
+
+    for (int x = 0; x < 256; x++) {
+      int w = x << 8;
+      for (int i = 0; i < 8; i++) {
+        if ((w & 0x8000) != 0) {
+          w = (w << 1) ^ poly;
+        } else {
+          w = w << 1;
+        }
+      }
+      crcTable[x] = (short) w;
+    }
+  }
+
+  // Calculate CRC with MSB first
+  public static int calculate(byte[] data) {
+    int crc = 0;
+    for (byte datum : data) {
+      crc = ((crc << 8) & 0xFF00) ^ (crcTable[(crc >> 8) ^ (datum & 0xFF)] & 0xFFFF);
+    }
+    return crc;
+  }
+
+  // Reverses the bits of a 16 bit integer.
+  private static int reverseInt16(int i) {
+    i = (i & 0x5555) << 1 | (i >>> 1) & 0x5555;
+    i = (i & 0x3333) << 2 | (i >>> 2) & 0x3333;
+    i = (i & 0x0F0F) << 4 | (i >>> 4) & 0x0F0F;
+    i = (i & 0x00FF) << 8 | (i >>> 8);
+    return i;
+  }
+
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
new file mode 100644
index 0000000..7fac868
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_COMMAND;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+import static org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.MemberBuckets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ClusterExecutor extends AbstractExecutor {
+
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+
+    List<byte[]> args = command.getProcessedCommand();
+    String subCommand = new String(args.get(1));
+
+    StringBuilder strArgs = new StringBuilder();
+    args.forEach(x -> strArgs.append(new String(x)).append(" "));
+
+    logger.info("CLUSTER args: {}", strArgs);
+
+    RedisResponse response;
+    switch (subCommand.toLowerCase()) {
+      case "slots": {
+        response = getSlots(context);
+        break;
+      }
+      default: {
+        response = RedisResponse.error(ERROR_UNKNOWN_COMMAND);
+      }
+    }
+
+    return response;
+  }
+
+  // @SuppressWarnings("unchecked")
+  private RedisResponse getSlots(ExecutionHandlerContext ctx) {
+    Region<?, ?> region = ctx.getRegionProvider().getDataRegion();
+
+    Execution<Void, MemberBuckets, List<MemberBuckets>> execution =
+        FunctionService.onRegion(region);
+    ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector =
+        execution.execute(new BucketRetrievalFunction());
+
+    SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>();
+    int retrievedBucketCount = 0;
+    for (MemberBuckets m : resultCollector.getResult()) {
+      for (Integer id : m.getBucketIds()) {
+        bucketToMemberMap.put(id, m.getHostAddress());
+        retrievedBucketCount++;
+      }
+    }
+
+    if (retrievedBucketCount != REDIS_REGION_BUCKETS) {
+      return RedisResponse.error("Internal error: bucket count mismatch " + retrievedBucketCount
+          + " != " + REDIS_REGION_BUCKETS);
+    }
+
+    int slotsPerBucket = REDIS_SLOTS / REDIS_REGION_BUCKETS;
+    int index = 0;
+    List<Object> slots = new ArrayList<>();
+
+    for (String member : bucketToMemberMap.values()) {
+      List<?> entry = Arrays.asList(
+          index * slotsPerBucket,
+          ((index + 1) * slotsPerBucket) - 1,
+          Arrays.asList(member, ctx.getServerPort()));
+
+      slots.add(entry);
+      index++;
+    }
+
+    return RedisResponse.array(slots);
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
new file mode 100644
index 0000000..5ca304e
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
+
+public class RedisPartitionResolver implements PartitionResolver<ByteArrayWrapper, RedisData> {
+
+  @Override
+  public Object getRoutingObject(EntryOperation<ByteArrayWrapper, RedisData> opDetails) {
+    return opDetails.getKey().getRoutingId();
+  }
+
+}
diff --git a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
index 3d91f76..d299a57 100755
--- a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
+++ b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
@@ -1,6 +1,8 @@
 org/apache/geode/redis/internal/ParameterRequirements/RedisParametersMismatchException,true,-643700717871858072
 org/apache/geode/redis/internal/RedisCommandSupportLevel,false
 org/apache/geode/redis/internal/RedisCommandType,false,deferredParameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,executor:org/apache/geode/redis/internal/executor/Executor,parameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,supportLevel:org/apache/geode/redis/internal/RedisCommandSupportLevel
+org/apache/geode/redis/internal/cluster/BucketRetrievalFunction,false
+org/apache/geode/redis/internal/cluster/BucketRetrievalFunction$MemberBuckets,false
 org/apache/geode/redis/internal/data/NullRedisSet$SetOp,false
 org/apache/geode/redis/internal/data/NullRedisString$BitOp,false
 org/apache/geode/redis/internal/data/RedisDataType,false,toStringValue:java/lang/String