You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/03/16 18:54:34 UTC
[geode] 01/36: WIP add clustering support
This is an automated email from the ASF dual-hosted git repository.
upthewaterspout pushed a commit to branch feature/redis-performance-testing
in repository https://gitbox.apache.org/repos/asf/geode.git
commit cbe19e467f3d854a471e5b9857db6c5c9f1bebe0
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Tue Feb 16 11:01:54 2021 -0800
WIP add clustering support
---
.../org/apache/geode/cache/PartitionResolver.java | 4 +-
.../geode/internal/cache/PartitionedRegion.java | 8 +-
.../internal/cache/PartitionedRegionDataStore.java | 28 ++---
.../cluster/RedisPartitionResolverDUnitTest.java | 123 +++++++++++++++++++++
.../executor/cluster/ClusterIntegrationTest.java | 48 ++++++++
.../geode/redis/internal/RedisCommandType.java | 5 +-
.../geode/redis/internal/RegionProvider.java | 15 ++-
.../internal/cluster/BucketRetrievalFunction.java | 69 ++++++++++++
.../redis/internal/data/ByteArrayWrapper.java | 18 +++
.../redis/internal/executor/cluster/CRC16.java | 59 ++++++++++
.../internal/executor/cluster/ClusterExecutor.java | 110 ++++++++++++++++++
.../executor/cluster/RedisPartitionResolver.java | 30 +++++
.../sanctioned-geode-redis-serializables.txt | 2 +
13 files changed, 493 insertions(+), 26 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java b/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java
index 3e9981c..7ad3cb9 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/PartitionResolver.java
@@ -76,5 +76,7 @@ public interface PartitionResolver<K, V> extends CacheCallback {
*
* @return String name
*/
- String getName();
+ default String getName() {
+ return getClass().getName();
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index cd89363..68a039c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -8149,8 +8149,8 @@ public class PartitionedRegion extends LocalRegion
* A test method to get the list of all the bucket ids for the partitioned region in the data
* Store.
*/
- public List getLocalBucketsListTestOnly() {
- List localBucketList = null;
+ public List<Integer> getLocalBucketsListTestOnly() {
+ List<Integer> localBucketList = null;
if (this.dataStore != null) {
localBucketList = this.dataStore.getLocalBucketsListTestOnly();
}
@@ -8161,8 +8161,8 @@ public class PartitionedRegion extends LocalRegion
* A test method to get the list of all the primary bucket ids for the partitioned region in the
* data Store.
*/
- public List getLocalPrimaryBucketsListTestOnly() {
- List localPrimaryList = null;
+ public List<Integer> getLocalPrimaryBucketsListTestOnly() {
+ List<Integer> localPrimaryList = null;
if (this.dataStore != null) {
localPrimaryList = this.dataStore.getLocalPrimaryBucketsListTestOnly();
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index d1d3dba..d1cb4e6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2604,14 +2604,9 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
* <i>Test Method</i> Return the list of all the bucket names in this data store.
*
*/
- public List getLocalBucketsListTestOnly() {
- final List bucketList = new ArrayList();
- visitBuckets(new BucketVisitor() {
- @Override
- public void visit(Integer bucketId, Region r) {
- bucketList.add(bucketId);
- }
- });
+ public List<Integer> getLocalBucketsListTestOnly() {
+ final List<Integer> bucketList = new ArrayList<>();
+ visitBuckets((bucketId, r) -> bucketList.add(bucketId));
return bucketList;
}
@@ -2635,16 +2630,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
* <i>Test Method</i> Return the list of all the non primary bucket ids in this data store.
*
*/
- public List getLocalNonPrimaryBucketsListTestOnly() {
- final List nonPrimaryBucketList = new ArrayList();
- visitBuckets(new BucketVisitor() {
- @Override
- public void visit(Integer bucketId, Region r) {
- BucketRegion br = (BucketRegion) r;
- BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor();
- if (!ba.isPrimary()) {
- nonPrimaryBucketList.add(bucketId);
- }
+ public List<Integer> getLocalNonPrimaryBucketsListTestOnly() {
+ final List<Integer> nonPrimaryBucketList = new ArrayList<>();
+ visitBuckets((bucketId, r) -> {
+ BucketRegion br = (BucketRegion) r;
+ BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor();
+ if (!ba.isPrimary()) {
+ nonPrimaryBucketList.add(bucketId);
}
});
return nonPrimaryBucketList;
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
new file mode 100644
index 0000000..3304932
--- /dev/null
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.cluster;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class RedisPartitionResolverDUnitTest {
+
+ @ClassRule
+ public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(4);
+
+ private static final String LOCAL_HOST = "127.0.0.1";
+ private static final int JEDIS_TIMEOUT =
+ Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+ private static Jedis jedis1;
+
+ private static MemberVM locator;
+ private static MemberVM server1;
+ private static MemberVM server2;
+ private static MemberVM server3;
+
+ private static int redisServerPort1;
+ private static int redisServerPort2;
+
+ @BeforeClass
+ public static void classSetup() {
+ locator = cluster.startLocatorVM(0);
+ server1 = cluster.startRedisVM(1, locator.getPort());
+ server2 = cluster.startRedisVM(2, locator.getPort());
+ server3 = cluster.startRedisVM(3, locator.getPort());
+
+ redisServerPort1 = cluster.getRedisPort(1);
+ redisServerPort2 = cluster.getRedisPort(2);
+
+ jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+ }
+
+ @Before
+ public void testSetup() {
+ jedis1.flushAll();
+ }
+
+ @Test
+ public void testRedisHashesMapToCorrectBuckets() {
+ int numKeys = 1000;
+ for (int i = 0; i < numKeys; i++) {
+ String key = "key-" + i;
+ jedis1.set(key, "value-" + i);
+ }
+
+ Map<ByteArrayWrapper, Integer> keyToBucketMap1 = getKeyToBucketMap(server1);
+ Map<ByteArrayWrapper, Integer> keyToBucketMap2 = getKeyToBucketMap(server2);
+ Map<ByteArrayWrapper, Integer> keyToBucketMap3 = getKeyToBucketMap(server3);
+
+ Set<Integer> buckets1 = new HashSet<>(keyToBucketMap1.values());
+ Set<Integer> buckets2 = new HashSet<>(keyToBucketMap2.values());
+ Set<Integer> buckets3 = new HashSet<>(keyToBucketMap3.values());
+
+ assertThat(buckets1).doesNotContainAnyElementsOf(buckets2);
+ assertThat(buckets1).doesNotContainAnyElementsOf(buckets3);
+ assertThat(buckets2).doesNotContainAnyElementsOf(buckets3);
+
+ assertThat(buckets1.size() + buckets2.size() + buckets3.size())
+ .isEqualTo(RegionProvider.REDIS_REGION_BUCKETS);
+
+
+ }
+
+ private Map<ByteArrayWrapper, Integer> getKeyToBucketMap(MemberVM vm) {
+ return vm.invoke(
+ (SerializableCallableIF<Map<ByteArrayWrapper, Integer>>) () -> {
+ Region<ByteArrayWrapper, RedisData> region =
+ RedisClusterStartupRule.getCache().getRegion(RegionProvider.REDIS_DATA_REGION);
+
+ LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region);
+ Map<ByteArrayWrapper, Integer> keyMap = new HashMap<>();
+
+ for (Object key : local.localKeys()) {
+ int id = local.getProxy().getKeyInfo(key).getBucketId();
+ keyMap.put((ByteArrayWrapper) key, id);
+ }
+
+ return keyMap;
+ });
+ }
+}
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java
new file mode 100644
index 0000000..6bbfa7e
--- /dev/null
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/cluster/ClusterIntegrationTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import org.apache.geode.redis.GeodeRedisServerRule;
+
+public class ClusterIntegrationTest {
+
+ @ClassRule
+ public static GeodeRedisServerRule server = new GeodeRedisServerRule();
+
+ private Jedis jedis;
+
+ @Before
+ public void setUp() {
+ jedis = new Jedis("localhost", server.getPort(), 10000000);
+ }
+
+ @After
+ public void tearDown() {
+ jedis.close();
+ }
+
+ @Test
+ public void testClusterSlots() {
+ System.err.println(server.getPort());
+ System.err.println(jedis.clusterSlots());
+ }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
index 914e4f6..33ca297 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RedisCommandType.java
@@ -33,6 +33,7 @@ import org.apache.geode.redis.internal.ParameterRequirements.UnspecifiedParamete
import org.apache.geode.redis.internal.executor.Executor;
import org.apache.geode.redis.internal.executor.RedisResponse;
import org.apache.geode.redis.internal.executor.UnknownExecutor;
+import org.apache.geode.redis.internal.executor.cluster.ClusterExecutor;
import org.apache.geode.redis.internal.executor.connection.AuthExecutor;
import org.apache.geode.redis.internal.executor.connection.EchoExecutor;
import org.apache.geode.redis.internal.executor.connection.PingExecutor;
@@ -276,6 +277,9 @@ public enum RedisCommandType {
SLOWLOG(new SlowlogExecutor(), UNSUPPORTED, new SlowlogParameterRequirements()),
TIME(new TimeExecutor(), UNSUPPORTED, new ExactParameterRequirements(1)),
+ /*********** CLUSTER **********/
+ CLUSTER(new ClusterExecutor(), UNSUPPORTED, new MinimumParameterRequirements(1)),
+
/////////// UNIMPLEMENTED /////////////////////
ACL(null, UNIMPLEMENTED),
@@ -288,7 +292,6 @@ public enum RedisCommandType {
BZPOPMIN(null, UNIMPLEMENTED),
BZPOPMAX(null, UNIMPLEMENTED),
CLIENT(null, UNIMPLEMENTED),
- CLUSTER(null, UNIMPLEMENTED),
COMMAND(null, UNIMPLEMENTED),
CONFIG(null, UNIMPLEMENTED),
DEBUG(null, UNIMPLEMENTED),
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index c7be2af..a8cd6c8 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -14,19 +14,23 @@
*/
package org.apache.geode.redis.internal;
+import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.redis.internal.data.ByteArrayWrapper;
import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
public class RegionProvider {
/**
* The name of the region that holds data stored in redis.
*/
- private static final String REDIS_DATA_REGION = "__REDIS_DATA";
- private static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG";
+ public static final String REDIS_DATA_REGION = "__REDIS_DATA";
+ public static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG";
+ public static final int REDIS_REGION_BUCKETS = Integer.getInteger("redis.region.buckets", 128);
+ public static final int REDIS_SLOTS = Integer.getInteger("redis.slots", 16384);
private final Region<ByteArrayWrapper, RedisData> dataRegion;
private final Region<String, Object> configRegion;
@@ -36,6 +40,13 @@ public class RegionProvider {
InternalRegionFactory<ByteArrayWrapper, RedisData> redisDataRegionFactory =
cache.createInternalRegionFactory(RegionShortcut.PARTITION_REDUNDANT);
redisDataRegionFactory.setInternalRegion(true).setIsUsedForMetaRegion(true);
+
+ PartitionAttributesFactory<ByteArrayWrapper, RedisData> attributesFactory =
+ new PartitionAttributesFactory<>();
+ attributesFactory.setPartitionResolver(new RedisPartitionResolver());
+ attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS);
+ redisDataRegionFactory.setPartitionAttributes(attributesFactory.create());
+
dataRegion = redisDataRegionFactory.create(REDIS_DATA_REGION);
InternalRegionFactory<String, Object> redisConfigRegionFactory =
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
new file mode 100644
index 0000000..ed9144c
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/cluster/BucketRetrievalFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.cluster;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.util.Set;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.internal.inet.LocalHostUtil;
+
+public class BucketRetrievalFunction implements Function<Void> {
+
+ private static final String hostAddress;
+
+ static {
+ InetAddress localhost = null;
+ try {
+ localhost = LocalHostUtil.getLocalHost();
+ } catch (Exception ex) {
+ }
+
+ hostAddress = localhost == null ? "localhost" : localhost.getHostAddress();
+ }
+
+ @Override
+ public void execute(FunctionContext<Void> context) {
+ LocalDataSet local = (LocalDataSet) PartitionRegionHelper
+ .getLocalDataForContext((RegionFunctionContext) context);
+
+ MemberBuckets mb = new MemberBuckets(hostAddress, local.getBucketSet());
+ context.getResultSender().lastResult(mb);
+ }
+
+ public static class MemberBuckets implements Serializable {
+ private final String hostAddress;
+ private final Set<Integer> bucketIds;
+
+ public MemberBuckets(String hostAddress, Set<Integer> bucketIds) {
+ this.hostAddress = hostAddress;
+ this.bucketIds = bucketIds;
+ }
+
+ public String getHostAddress() {
+ return hostAddress;
+ }
+
+ public Set<Integer> getBucketIds() {
+ return bucketIds;
+ }
+ }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
index ae292ca..b24d5a6 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
@@ -15,6 +15,9 @@
*/
package org.apache.geode.redis.internal.data;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@@ -26,6 +29,8 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.KnownVersion;
import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.redis.internal.executor.cluster.CRC16;
+import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
import org.apache.geode.redis.internal.netty.Coder;
/**
@@ -40,6 +45,8 @@ public class ByteArrayWrapper
*/
protected byte[] value;
+ private transient Object routingId;
+
/**
* Empty constructor for serialization
*/
@@ -108,6 +115,17 @@ public class ByteArrayWrapper
}
/**
+ * Used by the {@link RedisPartitionResolver} to map slots to buckets
+ */
+ public synchronized Object getRoutingId() {
+ if (routingId == null) {
+ routingId = (CRC16.calculate(value) % REDIS_SLOTS) % REDIS_REGION_BUCKETS;
+ }
+
+ return routingId;
+ }
+
+ /**
* Private helper method to compare two byte arrays, A.compareTo(B). The comparison is basically
* numerical, for each byte index, the byte representing the greater value will be the greater
*
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
new file mode 100644
index 0000000..7d1d049
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+public class CRC16 {
+
+ // CCITT/SDLC/HDLC x^16 + x^12 + x^5 + 1 (CRC-16-CCITT)
+ private static final int CCITT_POLY = 0x8408;
+ private static final short[] crcTable = new short[256];
+
+ // Create the table up front
+ static {
+ int poly = reverseInt16(CCITT_POLY);
+
+ for (int x = 0; x < 256; x++) {
+ int w = x << 8;
+ for (int i = 0; i < 8; i++) {
+ if ((w & 0x8000) != 0) {
+ w = (w << 1) ^ poly;
+ } else {
+ w = w << 1;
+ }
+ }
+ crcTable[x] = (short) w;
+ }
+ }
+
+ // Calculate CRC with MSB first
+ public static int calculate(byte[] data) {
+ int crc = 0;
+ for (byte datum : data) {
+ crc = ((crc << 8) & 0xFF00) ^ (crcTable[(crc >> 8) ^ (datum & 0xFF)] & 0xFFFF);
+ }
+ return crc;
+ }
+
+ // Reverses the bits of a 16 bit integer.
+ private static int reverseInt16(int i) {
+ i = (i & 0x5555) << 1 | (i >>> 1) & 0x5555;
+ i = (i & 0x3333) << 2 | (i >>> 2) & 0x3333;
+ i = (i & 0x0F0F) << 4 | (i >>> 4) & 0x0F0F;
+ i = (i & 0x00FF) << 8 | (i >>> 8);
+ return i;
+ }
+
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
new file mode 100644
index 0000000..7fac868
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/ClusterExecutor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import static org.apache.geode.redis.internal.RedisConstants.ERROR_UNKNOWN_COMMAND;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_REGION_BUCKETS;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+import static org.apache.geode.redis.internal.cluster.BucketRetrievalFunction.MemberBuckets;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.redis.internal.cluster.BucketRetrievalFunction;
+import org.apache.geode.redis.internal.executor.AbstractExecutor;
+import org.apache.geode.redis.internal.executor.RedisResponse;
+import org.apache.geode.redis.internal.netty.Command;
+import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
+
+public class ClusterExecutor extends AbstractExecutor {
+
+ private static final Logger logger = LogService.getLogger();
+
+ @Override
+ public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
+
+ List<byte[]> args = command.getProcessedCommand();
+ String subCommand = new String(args.get(1));
+
+ StringBuilder strArgs = new StringBuilder();
+ args.forEach(x -> strArgs.append(new String(x)).append(" "));
+
+ logger.info("CLUSTER args: {}", strArgs);
+
+ RedisResponse response;
+ switch (subCommand.toLowerCase()) {
+ case "slots": {
+ response = getSlots(context);
+ break;
+ }
+ default: {
+ response = RedisResponse.error(ERROR_UNKNOWN_COMMAND);
+ }
+ }
+
+ return response;
+ }
+
+ // @SuppressWarnings("unchecked")
+ private RedisResponse getSlots(ExecutionHandlerContext ctx) {
+ Region<?, ?> region = ctx.getRegionProvider().getDataRegion();
+
+ Execution<Void, MemberBuckets, List<MemberBuckets>> execution =
+ FunctionService.onRegion(region);
+ ResultCollector<MemberBuckets, List<MemberBuckets>> resultCollector =
+ execution.execute(new BucketRetrievalFunction());
+
+ SortedMap<Integer, String> bucketToMemberMap = new TreeMap<>();
+ int retrievedBucketCount = 0;
+ for (MemberBuckets m : resultCollector.getResult()) {
+ for (Integer id : m.getBucketIds()) {
+ bucketToMemberMap.put(id, m.getHostAddress());
+ retrievedBucketCount++;
+ }
+ }
+
+ if (retrievedBucketCount != REDIS_REGION_BUCKETS) {
+ return RedisResponse.error("Internal error: bucket count mismatch " + retrievedBucketCount
+ + " != " + REDIS_REGION_BUCKETS);
+ }
+
+ int slotsPerBucket = REDIS_SLOTS / REDIS_REGION_BUCKETS;
+ int index = 0;
+ List<Object> slots = new ArrayList<>();
+
+ for (String member : bucketToMemberMap.values()) {
+ List<?> entry = Arrays.asList(
+ index * slotsPerBucket,
+ ((index + 1) * slotsPerBucket) - 1,
+ Arrays.asList(member, ctx.getServerPort()));
+
+ slots.add(entry);
+ index++;
+ }
+
+ return RedisResponse.array(slots);
+ }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
new file mode 100644
index 0000000..5ca304e
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
+
+public class RedisPartitionResolver implements PartitionResolver<ByteArrayWrapper, RedisData> {
+
+ @Override
+ public Object getRoutingObject(EntryOperation<ByteArrayWrapper, RedisData> opDetails) {
+ return opDetails.getKey().getRoutingId();
+ }
+
+}
diff --git a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
index 3d91f76..d299a57 100755
--- a/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
+++ b/geode-redis/src/main/resources/org/apache/geode/redis/internal/sanctioned-geode-redis-serializables.txt
@@ -1,6 +1,8 @@
org/apache/geode/redis/internal/ParameterRequirements/RedisParametersMismatchException,true,-643700717871858072
org/apache/geode/redis/internal/RedisCommandSupportLevel,false
org/apache/geode/redis/internal/RedisCommandType,false,deferredParameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,executor:org/apache/geode/redis/internal/executor/Executor,parameterRequirements:org/apache/geode/redis/internal/ParameterRequirements/ParameterRequirements,supportLevel:org/apache/geode/redis/internal/RedisCommandSupportLevel
+org/apache/geode/redis/internal/cluster/BucketRetrievalFunction,false
+org/apache/geode/redis/internal/cluster/BucketRetrievalFunction$MemberBuckets,false
org/apache/geode/redis/internal/data/NullRedisSet$SetOp,false
org/apache/geode/redis/internal/data/NullRedisString$BitOp,false
org/apache/geode/redis/internal/data/RedisDataType,false,toStringValue:java/lang/String