You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2021/03/15 16:37:52 UTC

[geode] branch support/1.14 updated (ef0fa4b -> fba0145)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a change to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git.


    from ef0fa4b  GEODE-9000: Added logic to avoid NPE during processing Network Partition (#6129) (#6130)
     new 71c6eb6  GEODE-9021: Remove unnecessary buffer allocation in redis Coder (#6113)
     new fba0145  GEODE-9023: Add sharding support to redis region (#6117)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../geode/internal/cache/PartitionedRegion.java    |   8 +-
 .../internal/cache/PartitionedRegionDataStore.java |  28 +--
 geode-redis/build.gradle                           |   1 +
 .../cluster/RedisPartitionResolverDUnitTest.java   | 111 +++++++++
 .../geode/redis/internal/RegionProvider.java       |  37 ++-
 .../redis/internal/data/ByteArrayWrapper.java      |  41 ++++
 .../redis/internal/executor/RedisResponse.java     |  46 ++--
 .../redis/internal/executor/cluster/CRC16.java     |  71 ++++++
 .../executor/cluster/RedisPartitionResolver.java   |  17 +-
 .../apache/geode/redis/internal/netty/Coder.java   | 252 ++++++++++-----------
 .../redis/internal/RegionProviderJUnitTest.java    |  24 +-
 .../internal/data/ByteArrayWrapperJUnitTest.java   |  65 ++++++
 .../internal/executor/cluster/CRC16JUnitTest.java  |  52 +++++
 13 files changed, 548 insertions(+), 205 deletions(-)
 create mode 100644 geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
 create mode 100644 geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
 copy geode-junit/src/main/java/org/apache/geode/management/internal/cli/commands/TestPartitionResolver.java => geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java (67%)
 copy geode-core/src/integrationTest/java/org/apache/geode/internal/cache/LocatorMisconfigurationTest.java => geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java (60%)
 create mode 100644 geode-redis/src/test/java/org/apache/geode/redis/internal/data/ByteArrayWrapperJUnitTest.java
 create mode 100644 geode-redis/src/test/java/org/apache/geode/redis/internal/executor/cluster/CRC16JUnitTest.java


[geode] 01/02: GEODE-9021: Remove unnecessary buffer allocation in redis Coder (#6113)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 71c6eb68a6a79cef51433d5ab047a964181a9565
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Thu Mar 11 15:10:15 2021 -0800

    GEODE-9021: Remove unnecessary buffer allocation in redis Coder (#6113)
    
    (cherry picked from commit 00c0e7980bdead3995fc352c3e0f3f52cce4e4fb)
---
 .../redis/internal/executor/RedisResponse.java     |  46 ++--
 .../apache/geode/redis/internal/netty/Coder.java   | 252 ++++++++++-----------
 2 files changed, 138 insertions(+), 160 deletions(-)

diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index 9723987..f6e909e 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -33,11 +33,11 @@ import org.apache.geode.redis.internal.netty.CoderException;
 
 public class RedisResponse {
 
-  private final Function<ByteBufAllocator, ByteBuf> coderCallback;
+  private final Function<ByteBuf, ByteBuf> coderCallback;
 
   private Runnable afterWriteCallback;
 
-  private RedisResponse(Function<ByteBufAllocator, ByteBuf> coderCallback) {
+  private RedisResponse(Function<ByteBuf, ByteBuf> coderCallback) {
     this.coderCallback = coderCallback;
   }
 
@@ -52,37 +52,37 @@ public class RedisResponse {
   }
 
   public ByteBuf encode(ByteBufAllocator allocator) {
-    return coderCallback.apply(allocator);
+    return coderCallback.apply(allocator.buffer());
   }
 
   public static RedisResponse integer(long numericValue) {
-    return new RedisResponse((bba) -> Coder.getIntegerResponse(bba, numericValue));
+    return new RedisResponse((buffer) -> Coder.getIntegerResponse(buffer, numericValue));
   }
 
   public static RedisResponse integer(boolean exists) {
-    return new RedisResponse((bba) -> Coder.getIntegerResponse(bba, exists ? 1 : 0));
+    return new RedisResponse((buffer) -> Coder.getIntegerResponse(buffer, exists ? 1 : 0));
   }
 
   public static RedisResponse string(String stringValue) {
-    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, stringValue));
+    return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, stringValue));
   }
 
   public static RedisResponse string(byte[] byteArray) {
-    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, byteArray));
+    return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, byteArray));
   }
 
   public static RedisResponse bulkString(Object value) {
-    return new RedisResponse((bba) -> {
+    return new RedisResponse((buffer) -> {
       try {
-        return Coder.getBulkStringResponse(bba, value);
+        return Coder.getBulkStringResponse(buffer, value);
       } catch (CoderException e) {
-        return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+        return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
       }
     });
   }
 
   public static RedisResponse ok() {
-    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, "OK"));
+    return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, "OK"));
   }
 
   public static RedisResponse nil() {
@@ -90,21 +90,21 @@ public class RedisResponse {
   }
 
   public static RedisResponse flattenedArray(Collection<Collection<?>> nestedCollection) {
-    return new RedisResponse((bba) -> {
+    return new RedisResponse((buffer) -> {
       try {
-        return Coder.getFlattenedArrayResponse(bba, nestedCollection);
+        return Coder.getFlattenedArrayResponse(buffer, nestedCollection);
       } catch (CoderException e) {
-        return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+        return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
       }
     });
   }
 
   public static RedisResponse array(Collection<?> collection) {
-    return new RedisResponse((bba) -> {
+    return new RedisResponse((buffer) -> {
       try {
-        return Coder.getArrayResponse(bba, collection);
+        return Coder.getArrayResponse(buffer, collection);
       } catch (CoderException e) {
-        return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+        return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
       }
     });
   }
@@ -118,21 +118,19 @@ public class RedisResponse {
   }
 
   public static RedisResponse error(String error) {
-    return new RedisResponse((bba) -> Coder.getErrorResponse(bba, error));
+    return new RedisResponse((buffer) -> Coder.getErrorResponse(buffer, error));
   }
 
   public static RedisResponse customError(String error) {
-    return new RedisResponse((bba) -> Coder.getCustomErrorResponse(bba, error));
+    return new RedisResponse((buffer) -> Coder.getCustomErrorResponse(buffer, error));
   }
 
   public static RedisResponse wrongType(String error) {
-    return new RedisResponse((bba) -> Coder.getWrongTypeResponse(bba, error));
+    return new RedisResponse((buffer) -> Coder.getWrongTypeResponse(buffer, error));
   }
 
   public static RedisResponse scan(BigInteger cursor, List<Object> scanResult) {
-
-    return new RedisResponse(
-        (bba) -> Coder.getScanResponse(bba, cursor, scanResult));
+    return new RedisResponse((buffer) -> Coder.getScanResponse(buffer, cursor, scanResult));
   }
 
   public static RedisResponse emptyScan() {
@@ -147,7 +145,7 @@ public class RedisResponse {
   }
 
   public static RedisResponse bigDecimal(BigDecimal numericValue) {
-    return new RedisResponse((bba) -> Coder.getBigDecimalResponse(bba, numericValue));
+    return new RedisResponse((buffer) -> Coder.getBigDecimalResponse(buffer, numericValue));
   }
 
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 0f62e1f..19f709f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 
 import org.apache.geode.annotations.internal.MakeImmutable;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
@@ -32,13 +31,6 @@ import org.apache.geode.redis.internal.data.ByteArrayWrapper;
  */
 public class Coder {
 
-
-  /*
-   * Take no chances on char to byte conversions with default charsets on jvms, so we'll hard code
-   * the UTF-8 symbol values as bytes here
-   */
-
-
   /**
    * byte identifier of a bulk string
    */
@@ -87,6 +79,10 @@ public class Coder {
 
   @MakeImmutable
   public static final byte[] err = stringToBytes("ERR ");
+
+  @MakeImmutable
+  public static final byte[] oom = stringToBytes("OOM ");
+
   @MakeImmutable
   public static final byte[] wrongType = stringToBytes("WRONGTYPE ");
 
@@ -112,94 +108,78 @@ public class Coder {
    */
   public static final String N_INF = "-inf";
 
-  public static ByteBuf getBulkStringResponse(ByteBufAllocator alloc, Object v)
+  public static ByteBuf getBulkStringResponse(ByteBuf buffer, Object v)
       throws CoderException {
-    ByteBuf response;
     byte[] toWrite;
 
     if (v == null) {
-      response = alloc.buffer();
-      response.writeBytes(bNIL);
+      buffer.writeBytes(bNIL);
     } else if (v instanceof byte[]) {
-      byte[] value = (byte[]) v;
-      response = alloc.buffer(value.length + 20);
-      toWrite = value;
-      writeStringResponse(response, toWrite);
+      toWrite = (byte[]) v;
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof ByteArrayWrapper) {
-      byte[] value = ((ByteArrayWrapper) v).toBytes();
-      response = alloc.buffer(value.length + 20);
-      toWrite = value;
-      writeStringResponse(response, toWrite);
+      toWrite = ((ByteArrayWrapper) v).toBytes();
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof Double) {
-      response = alloc.buffer();
-      toWrite = doubleToBytes(((Double) v).doubleValue());
-      writeStringResponse(response, toWrite);
+      toWrite = doubleToBytes((Double) v);
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof String) {
       String value = (String) v;
-      response = alloc.buffer(value.length() + 20);
       toWrite = stringToBytes(value);
-      writeStringResponse(response, toWrite);
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof Integer) {
-      response = alloc.buffer(15);
-      response.writeByte(INTEGER_ID);
-      response.writeBytes(intToBytes((Integer) v));
-      response.writeBytes(CRLFar);
+      buffer.writeByte(INTEGER_ID);
+      buffer.writeBytes(intToBytes((Integer) v));
+      buffer.writeBytes(CRLFar);
     } else if (v instanceof Long) {
-      response = alloc.buffer(15);
-      response.writeByte(INTEGER_ID);
-      response.writeBytes(intToBytes(((Long) v).intValue()));
-      response.writeBytes(CRLFar);
+      buffer.writeByte(INTEGER_ID);
+      buffer.writeBytes(intToBytes(((Long) v).intValue()));
+      buffer.writeBytes(CRLFar);
     } else {
       throw new CoderException();
     }
 
-    return response;
+    return buffer;
   }
 
-  private static void writeStringResponse(ByteBuf response, byte[] toWrite) {
-    response.writeByte(BULK_STRING_ID);
-    response.writeBytes(intToBytes(toWrite.length));
-    response.writeBytes(CRLFar);
-    response.writeBytes(toWrite);
-    response.writeBytes(CRLFar);
+  private static void writeStringResponse(ByteBuf buffer, byte[] toWrite) {
+    buffer.writeByte(BULK_STRING_ID);
+    buffer.writeBytes(intToBytes(toWrite.length));
+    buffer.writeBytes(CRLFar);
+    buffer.writeBytes(toWrite);
+    buffer.writeBytes(CRLFar);
   }
 
-  public static ByteBuf getFlattenedArrayResponse(ByteBufAllocator alloc,
-      Collection<Collection<?>> items)
+  public static ByteBuf getFlattenedArrayResponse(ByteBuf buffer, Collection<Collection<?>> items)
       throws CoderException {
-    ByteBuf response = alloc.buffer();
-
     for (Object next : items) {
-      writeCollectionOrString(alloc, response, next);
+      writeCollectionOrString(buffer, next);
     }
 
-    return response;
+    return buffer;
   }
 
-  public static ByteBuf getArrayResponse(ByteBufAllocator alloc, Collection<?> items)
+  public static ByteBuf getArrayResponse(ByteBuf buffer, Collection<?> items)
       throws CoderException {
-    ByteBuf response = alloc.buffer();
-    response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(items.size()));
-    response.writeBytes(CRLFar);
+    buffer.writeByte(ARRAY_ID);
+    buffer.writeBytes(intToBytes(items.size()));
+    buffer.writeBytes(CRLFar);
     for (Object next : items) {
-      writeCollectionOrString(alloc, response, next);
+      writeCollectionOrString(buffer, next);
     }
 
-    return response;
+    return buffer;
   }
 
-  private static void writeCollectionOrString(ByteBufAllocator alloc, ByteBuf response, Object next)
+  private static void writeCollectionOrString(ByteBuf buffer, Object next)
       throws CoderException {
     ByteBuf tmp = null;
     try {
       if (next instanceof Collection) {
         Collection<?> nextItems = (Collection<?>) next;
-        tmp = getArrayResponse(alloc, nextItems);
-        response.writeBytes(tmp);
+        getArrayResponse(buffer, nextItems);
       } else {
-        tmp = getBulkStringResponse(alloc, next);
-        response.writeBytes(tmp);
+        getBulkStringResponse(buffer, next);
       }
     } finally {
       if (tmp != null) {
@@ -208,120 +188,120 @@ public class Coder {
     }
   }
 
-  public static ByteBuf getScanResponse(ByteBufAllocator alloc, BigInteger cursor,
+  public static ByteBuf getScanResponse(ByteBuf buffer, BigInteger cursor,
       List<Object> scanResult) {
-    ByteBuf response = alloc.buffer();
-
-    response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(2));
-    response.writeBytes(CRLFar);
-    response.writeByte(BULK_STRING_ID);
+    buffer.writeByte(ARRAY_ID);
+    buffer.writeBytes(intToBytes(2));
+    buffer.writeBytes(CRLFar);
+    buffer.writeByte(BULK_STRING_ID);
     byte[] cursorBytes = stringToBytes(cursor.toString());
-    response.writeBytes(intToBytes(cursorBytes.length));
-    response.writeBytes(CRLFar);
-    response.writeBytes(cursorBytes);
-    response.writeBytes(CRLFar);
-    response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(scanResult.size()));
-    response.writeBytes(CRLFar);
+    buffer.writeBytes(intToBytes(cursorBytes.length));
+    buffer.writeBytes(CRLFar);
+    buffer.writeBytes(cursorBytes);
+    buffer.writeBytes(CRLFar);
+    buffer.writeByte(ARRAY_ID);
+    buffer.writeBytes(intToBytes(scanResult.size()));
+    buffer.writeBytes(CRLFar);
 
     for (Object nextObject : scanResult) {
       if (nextObject instanceof String) {
         String next = (String) nextObject;
-        response.writeByte(BULK_STRING_ID);
-        response.writeBytes(intToBytes(next.length()));
-        response.writeBytes(CRLFar);
-        response.writeBytes(stringToBytes(next));
-        response.writeBytes(CRLFar);
+        buffer.writeByte(BULK_STRING_ID);
+        buffer.writeBytes(intToBytes(next.length()));
+        buffer.writeBytes(CRLFar);
+        buffer.writeBytes(stringToBytes(next));
+        buffer.writeBytes(CRLFar);
       } else if (nextObject instanceof ByteArrayWrapper) {
         byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
-        response.writeByte(BULK_STRING_ID);
-        response.writeBytes(intToBytes(next.length));
-        response.writeBytes(CRLFar);
-        response.writeBytes(next);
-        response.writeBytes(CRLFar);
+        buffer.writeByte(BULK_STRING_ID);
+        buffer.writeBytes(intToBytes(next.length));
+        buffer.writeBytes(CRLFar);
+        buffer.writeBytes(next);
+        buffer.writeBytes(CRLFar);
       }
     }
-    return response;
+    return buffer;
   }
 
-  public static ByteBuf getEmptyArrayResponse(ByteBufAllocator alloc) {
-    ByteBuf buf = alloc.buffer().writeBytes(bEMPTY_ARRAY);
-    return buf;
+  public static ByteBuf getEmptyArrayResponse(ByteBuf buffer) {
+    buffer.writeBytes(bEMPTY_ARRAY);
+    return buffer;
   }
 
-  public static ByteBuf getEmptyStringResponse(ByteBufAllocator alloc) {
-    ByteBuf buf = alloc.buffer().writeBytes(bEMPTY_STRING);
-    return buf;
+  public static ByteBuf getEmptyStringResponse(ByteBuf buffer) {
+    buffer.writeBytes(bEMPTY_STRING);
+    return buffer;
   }
 
-  public static ByteBuf getSimpleStringResponse(ByteBufAllocator alloc, String string) {
+  public static ByteBuf getSimpleStringResponse(ByteBuf buffer, String string) {
     byte[] simpAr = stringToBytes(string);
-    return getSimpleStringResponse(alloc, simpAr);
+    return getSimpleStringResponse(buffer, simpAr);
   }
 
-  public static ByteBuf getSimpleStringResponse(ByteBufAllocator alloc, byte[] byteArray) {
-    ByteBuf response = alloc.buffer(byteArray.length + 20);
-    response.writeByte(SIMPLE_STRING_ID);
-    response.writeBytes(byteArray);
-    response.writeBytes(CRLFar);
-    return response;
+  public static ByteBuf getSimpleStringResponse(ByteBuf buffer, byte[] byteArray) {
+    buffer.writeByte(SIMPLE_STRING_ID);
+    buffer.writeBytes(byteArray);
+    buffer.writeBytes(CRLFar);
+    return buffer;
+  }
+
+  public static ByteBuf getErrorResponse(ByteBuf buffer, String error) {
+    byte[] errorAr = stringToBytes(error);
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(err);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getErrorResponse(ByteBufAllocator alloc, String error) {
+  public static ByteBuf getOOMResponse(ByteBuf buffer, String error) {
     byte[] errorAr = stringToBytes(error);
-    ByteBuf response = alloc.buffer(errorAr.length + 25);
-    response.writeByte(ERROR_ID);
-    response.writeBytes(err);
-    response.writeBytes(errorAr);
-    response.writeBytes(CRLFar);
-    return response;
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(oom);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getCustomErrorResponse(ByteBufAllocator alloc, String error) {
+  public static ByteBuf getCustomErrorResponse(ByteBuf buffer, String error) {
     byte[] errorAr = stringToBytes(error);
-    ByteBuf response = alloc.buffer(errorAr.length + 25);
-    response.writeByte(ERROR_ID);
-    response.writeBytes(errorAr);
-    response.writeBytes(CRLFar);
-    return response;
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getWrongTypeResponse(ByteBufAllocator alloc, String error) {
+  public static ByteBuf getWrongTypeResponse(ByteBuf buffer, String error) {
     byte[] errorAr = stringToBytes(error);
-    ByteBuf response = alloc.buffer(errorAr.length + 31);
-    response.writeByte(ERROR_ID);
-    response.writeBytes(wrongType);
-    response.writeBytes(errorAr);
-    response.writeBytes(CRLFar);
-    return response;
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(wrongType);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getIntegerResponse(ByteBufAllocator alloc, int integer) {
-    ByteBuf response = alloc.buffer(15);
-    response.writeByte(INTEGER_ID);
-    response.writeBytes(intToBytes(integer));
-    response.writeBytes(CRLFar);
-    return response;
+  public static ByteBuf getIntegerResponse(ByteBuf buffer, int integer) {
+    buffer.writeByte(INTEGER_ID);
+    buffer.writeBytes(intToBytes(integer));
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getIntegerResponse(ByteBufAllocator alloc, long l) {
-    ByteBuf response = alloc.buffer(25);
-    response.writeByte(INTEGER_ID);
-    response.writeBytes(longToBytes(l));
-    response.writeBytes(CRLFar);
-    return response;
+  public static ByteBuf getIntegerResponse(ByteBuf buffer, long l) {
+    buffer.writeByte(INTEGER_ID);
+    buffer.writeBytes(longToBytes(l));
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getBigDecimalResponse(ByteBufAllocator alloc, BigDecimal b) {
-    ByteBuf response = alloc.buffer();
-    writeStringResponse(response, bigDecimalToBytes(b));
-    return response;
+  public static ByteBuf getBigDecimalResponse(ByteBuf buffer, BigDecimal b) {
+    writeStringResponse(buffer, bigDecimalToBytes(b));
+    return buffer;
   }
 
-  public static ByteBuf getNilResponse(ByteBufAllocator alloc) {
-    ByteBuf buf = alloc.buffer().writeBytes(bNIL);
-    return buf;
+  public static ByteBuf getNilResponse(ByteBuf buffer) {
+    buffer.writeBytes(bNIL);
+    return buffer;
   }
 
 


[geode] 02/02: GEODE-9023: Add sharding support to redis region (#6117)

Posted by je...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit fba01459a4745fe49da26c941e9919cb18add5af
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Mon Mar 15 09:14:23 2021 -0700

    GEODE-9023: Add sharding support to redis region (#6117)
    
    This ability allows for the future mapping of cluster slots to geode
    buckets which is preemptive to enabling redis clustering support.
    
    - Bucket count defaults to 128 and must be a a power of 2 and a factor
      of 16384 (the number of redis slots).
    - Supports using hashtags in the key.
    
    (cherry picked from commit 37358f58e45a49f36d985161dc2eaee95c677a67)
---
 .../geode/internal/cache/PartitionedRegion.java    |   8 +-
 .../internal/cache/PartitionedRegionDataStore.java |  28 ++----
 geode-redis/build.gradle                           |   1 +
 .../cluster/RedisPartitionResolverDUnitTest.java   | 111 +++++++++++++++++++++
 .../geode/redis/internal/RegionProvider.java       |  37 ++++++-
 .../redis/internal/data/ByteArrayWrapper.java      |  41 ++++++++
 .../redis/internal/executor/cluster/CRC16.java     |  71 +++++++++++++
 .../executor/cluster/RedisPartitionResolver.java   |  35 +++++++
 .../redis/internal/RegionProviderJUnitTest.java    |  36 +++++++
 .../internal/data/ByteArrayWrapperJUnitTest.java   |  65 ++++++++++++
 .../internal/executor/cluster/CRC16JUnitTest.java  |  52 ++++++++++
 11 files changed, 461 insertions(+), 24 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index cd89363..68a039c 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -8149,8 +8149,8 @@ public class PartitionedRegion extends LocalRegion
    * A test method to get the list of all the bucket ids for the partitioned region in the data
    * Store.
    */
-  public List getLocalBucketsListTestOnly() {
-    List localBucketList = null;
+  public List<Integer> getLocalBucketsListTestOnly() {
+    List<Integer> localBucketList = null;
     if (this.dataStore != null) {
       localBucketList = this.dataStore.getLocalBucketsListTestOnly();
     }
@@ -8161,8 +8161,8 @@ public class PartitionedRegion extends LocalRegion
    * A test method to get the list of all the primary bucket ids for the partitioned region in the
    * data Store.
    */
-  public List getLocalPrimaryBucketsListTestOnly() {
-    List localPrimaryList = null;
+  public List<Integer> getLocalPrimaryBucketsListTestOnly() {
+    List<Integer> localPrimaryList = null;
     if (this.dataStore != null) {
       localPrimaryList = this.dataStore.getLocalPrimaryBucketsListTestOnly();
     }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
index d1d3dba..d1cb4e6 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegionDataStore.java
@@ -2604,14 +2604,9 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
    * <i>Test Method</i> Return the list of all the bucket names in this data store.
    *
    */
-  public List getLocalBucketsListTestOnly() {
-    final List bucketList = new ArrayList();
-    visitBuckets(new BucketVisitor() {
-      @Override
-      public void visit(Integer bucketId, Region r) {
-        bucketList.add(bucketId);
-      }
-    });
+  public List<Integer> getLocalBucketsListTestOnly() {
+    final List<Integer> bucketList = new ArrayList<>();
+    visitBuckets((bucketId, r) -> bucketList.add(bucketId));
     return bucketList;
   }
 
@@ -2635,16 +2630,13 @@ public class PartitionedRegionDataStore implements HasCachePerfStats {
    * <i>Test Method</i> Return the list of all the non primary bucket ids in this data store.
    *
    */
-  public List getLocalNonPrimaryBucketsListTestOnly() {
-    final List nonPrimaryBucketList = new ArrayList();
-    visitBuckets(new BucketVisitor() {
-      @Override
-      public void visit(Integer bucketId, Region r) {
-        BucketRegion br = (BucketRegion) r;
-        BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor();
-        if (!ba.isPrimary()) {
-          nonPrimaryBucketList.add(bucketId);
-        }
+  public List<Integer> getLocalNonPrimaryBucketsListTestOnly() {
+    final List<Integer> nonPrimaryBucketList = new ArrayList<>();
+    visitBuckets((bucketId, r) -> {
+      BucketRegion br = (BucketRegion) r;
+      BucketAdvisor ba = (BucketAdvisor) br.getDistributionAdvisor();
+      if (!ba.isPrimary()) {
+        nonPrimaryBucketList.add(bucketId);
       }
     });
     return nonPrimaryBucketList;
diff --git a/geode-redis/build.gradle b/geode-redis/build.gradle
index 11b61f9..59874ad 100644
--- a/geode-redis/build.gradle
+++ b/geode-redis/build.gradle
@@ -42,6 +42,7 @@ dependencies {
 
   testImplementation(project(':geode-junit'))
   testImplementation('org.mockito:mockito-core')
+  testImplementation('redis.clients:jedis')
 
   commonTestImplementation(project(':geode-junit'))
   commonTestImplementation(project(':geode-dunit'))
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
new file mode 100644
index 0000000..2574956
--- /dev/null
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.cluster;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.partition.PartitionRegionHelper;
+import org.apache.geode.internal.cache.LocalDataSet;
+import org.apache.geode.redis.internal.RegionProvider;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class RedisPartitionResolverDUnitTest {
+
+  @ClassRule
+  public static RedisClusterStartupRule cluster = new RedisClusterStartupRule(4);
+
+  private static final String LOCAL_HOST = "127.0.0.1";
+  private static final int JEDIS_TIMEOUT =
+      Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static Jedis jedis1;
+
+  private static MemberVM server1;
+  private static MemberVM server2;
+  private static MemberVM server3;
+
+  @BeforeClass
+  public static void classSetup() {
+    MemberVM locator = cluster.startLocatorVM(0);
+    server1 = cluster.startRedisVM(1, locator.getPort());
+    server2 = cluster.startRedisVM(2, locator.getPort());
+    server3 = cluster.startRedisVM(3, locator.getPort());
+
+    jedis1 = new Jedis(LOCAL_HOST, cluster.getRedisPort(1), JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void testSetup() {
+    jedis1.flushAll();
+  }
+
+  @Test
+  public void testRedisHashesMapToCorrectBuckets() {
+    int numKeys = 1000;
+    for (int i = 0; i < numKeys; i++) {
+      String key = "key-" + i;
+      jedis1.set(key, "value-" + i);
+    }
+
+    Map<String, Integer> keyToBucketMap1 = getKeyToBucketMap(server1);
+    Map<String, Integer> keyToBucketMap2 = getKeyToBucketMap(server2);
+    Map<String, Integer> keyToBucketMap3 = getKeyToBucketMap(server3);
+
+    Set<Integer> buckets1 = new HashSet<>(keyToBucketMap1.values());
+    Set<Integer> buckets2 = new HashSet<>(keyToBucketMap2.values());
+    Set<Integer> buckets3 = new HashSet<>(keyToBucketMap3.values());
+
+    assertThat(buckets1).doesNotContainAnyElementsOf(buckets2);
+    assertThat(buckets1).doesNotContainAnyElementsOf(buckets3);
+    assertThat(buckets2).doesNotContainAnyElementsOf(buckets3);
+
+    assertThat(buckets1.size() + buckets2.size() + buckets3.size())
+        .isEqualTo(RegionProvider.REDIS_REGION_BUCKETS);
+  }
+
+  private Map<String, Integer> getKeyToBucketMap(MemberVM vm) {
+    return vm.invoke((SerializableCallableIF<Map<String, Integer>>) () -> {
+      Region<ByteArrayWrapper, RedisData> region =
+          RedisClusterStartupRule.getCache().getRegion(RegionProvider.REDIS_DATA_REGION);
+
+      LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region);
+      Map<String, Integer> keyMap = new HashMap<>();
+
+      for (Object key : local.localKeys()) {
+        int id = local.getProxy().getKeyInfo(key).getBucketId();
+        keyMap.put(new String(((ByteArrayWrapper) key).toBytes()), id);
+      }
+
+      return keyMap;
+    });
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index c7be2af..a9f6c6f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -14,28 +14,48 @@
  */
 package org.apache.geode.redis.internal;
 
+import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionFactory;
+import org.apache.geode.management.ManagementException;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
 
 public class RegionProvider {
   /**
    * The name of the region that holds data stored in redis.
    */
-  private static final String REDIS_DATA_REGION = "__REDIS_DATA";
-  private static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG";
+  public static final String REDIS_DATA_REGION = "__REDIS_DATA";
+  public static final String REDIS_CONFIG_REGION = "__REDIS_CONFIG";
+  public static final String REDIS_REGION_BUCKETS_PARAM = "redis.region.buckets";
+
+  // Ideally the bucket count should be a power of 2, but technically it is not required.
+  public static final int REDIS_REGION_BUCKETS =
+      Integer.getInteger(REDIS_REGION_BUCKETS_PARAM, 128);
+
+  public static final int REDIS_SLOTS = 16384;
+
+  public static final int REDIS_SLOTS_PER_BUCKET = REDIS_SLOTS / REDIS_REGION_BUCKETS;
 
   private final Region<ByteArrayWrapper, RedisData> dataRegion;
   private final Region<String, Object> configRegion;
 
   public RegionProvider(InternalCache cache) {
+    validateBucketCount(REDIS_REGION_BUCKETS);
 
     InternalRegionFactory<ByteArrayWrapper, RedisData> redisDataRegionFactory =
         cache.createInternalRegionFactory(RegionShortcut.PARTITION_REDUNDANT);
     redisDataRegionFactory.setInternalRegion(true).setIsUsedForMetaRegion(true);
+
+    PartitionAttributesFactory<ByteArrayWrapper, RedisData> attributesFactory =
+        new PartitionAttributesFactory<>();
+    attributesFactory.setPartitionResolver(new RedisPartitionResolver());
+    attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS);
+    redisDataRegionFactory.setPartitionAttributes(attributesFactory.create());
+
     dataRegion = redisDataRegionFactory.create(REDIS_DATA_REGION);
 
     InternalRegionFactory<String, Object> redisConfigRegionFactory =
@@ -51,4 +71,17 @@ public class RegionProvider {
   public Region<String, Object> getConfigRegion() {
     return configRegion;
   }
+
+  /**
+   * Validates that the value passed in is not greater than {@link #REDIS_SLOTS}.
+   *
+   * @throws ManagementException if there is a problem with the value
+   */
+  protected static void validateBucketCount(int buckets) {
+    if (buckets > REDIS_SLOTS) {
+      throw new ManagementException(String.format(
+          "Could not start Redis Server - System property '%s' must be <= %d",
+          REDIS_REGION_BUCKETS_PARAM, REDIS_SLOTS));
+    }
+  }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
index ae292ca..a225e66 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
@@ -13,8 +13,12 @@
  * the License.
  *
  */
+
 package org.apache.geode.redis.internal.data;
 
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -26,6 +30,8 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.redis.internal.executor.cluster.CRC16;
+import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
 import org.apache.geode.redis.internal.netty.Coder;
 
 /**
@@ -40,6 +46,8 @@ public class ByteArrayWrapper
    */
   protected byte[] value;
 
+  private transient Object routingId;
+
   /**
    * Empty constructor for serialization
    */
@@ -108,6 +116,39 @@ public class ByteArrayWrapper
   }
 
   /**
+   * Used by the {@link RedisPartitionResolver} to map slots to buckets. Supports using hashtags
+   * in the same way that redis does.
+   *
+   * @see <a href="https://redis.io/topics/cluster-spec">Redis Cluster Spec</a>
+   */
+  public synchronized Object getRoutingId() {
+    if (routingId == null && value != null) {
+      int startHashtag = Integer.MAX_VALUE;
+      int endHashtag = 0;
+
+      for (int i = 0; i < value.length; i++) {
+        if (value[i] == '{' && startHashtag == Integer.MAX_VALUE) {
+          startHashtag = i;
+        } else if (value[i] == '}') {
+          endHashtag = i;
+          break;
+        }
+      }
+
+      if (endHashtag - startHashtag <= 1) {
+        startHashtag = -1;
+        endHashtag = value.length;
+      }
+
+      // & (REDIS_SLOTS - 1) is equivalent to % REDIS_SLOTS but supposedly faster
+      routingId = (CRC16.calculate(value, startHashtag + 1, endHashtag) & (REDIS_SLOTS - 1))
+          / REDIS_SLOTS_PER_BUCKET;
+    }
+
+    return routingId;
+  }
+
+  /**
    * Private helper method to compare two byte arrays, A.compareTo(B). The comparison is basically
    * numerical, for each byte index, the byte representing the greater value will be the greater
    *
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
new file mode 100644
index 0000000..2745aee
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+/**
+ * Helper class to calculate the CRC-16/XMODEM value of a byte array. This is the same algorithm
+ * that Redis uses.
+ *
+ * Derived from https://www.source-code.biz/snippets/java/crc16/
+ */
+public class CRC16 {
+
+  // CCITT/SDLC/HDLC x^16 + x^12 + x^5 + 1 (CRC-16-CCITT)
+  private static final int CCITT_POLY = 0x8408;
+  private static final short[] crcTable = new short[256];
+
+  // Create the table up front
+  static {
+    int poly = reverseInt16(CCITT_POLY);
+
+    for (int x = 0; x < 256; x++) {
+      int w = x << 8;
+      for (int i = 0; i < 8; i++) {
+        if ((w & 0x8000) != 0) {
+          w = (w << 1) ^ poly;
+        } else {
+          w = w << 1;
+        }
+      }
+      crcTable[x] = (short) w;
+    }
+  }
+
+  /**
+   * Calculate CRC with most significant byte first. Assume all inputs are valid.
+   *
+   * @param data the byte array to use
+   * @param start starting index into the byte array
+   * @param end ending index (exclusive) into the byte array
+   */
+  public static int calculate(byte[] data, int start, int end) {
+    int crc = 0;
+    for (int i = start; i < end; i++) {
+      crc = ((crc << 8) & 0xFF00) ^ (crcTable[(crc >> 8) ^ (data[i] & 0xFF)] & 0xFFFF);
+    }
+    return crc;
+  }
+
+  // Reverses the bits of a 16 bit integer.
+  private static int reverseInt16(int i) {
+    i = (i & 0x5555) << 1 | (i >>> 1) & 0x5555;
+    i = (i & 0x3333) << 2 | (i >>> 2) & 0x3333;
+    i = (i & 0x0F0F) << 4 | (i >>> 4) & 0x0F0F;
+    i = (i & 0x00FF) << 8 | (i >>> 8);
+    return i;
+  }
+
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
new file mode 100644
index 0000000..3e9c211
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import org.apache.geode.cache.EntryOperation;
+import org.apache.geode.cache.PartitionResolver;
+import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisData;
+
+public class RedisPartitionResolver implements PartitionResolver<ByteArrayWrapper, RedisData> {
+
+  @Override
+  public Object getRoutingObject(EntryOperation<ByteArrayWrapper, RedisData> opDetails) {
+    return opDetails.getKey().getRoutingId();
+  }
+
+  @Override
+  public String getName() {
+    return RedisPartitionResolver.class.getName();
+  }
+
+}
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java
new file mode 100644
index 0000000..b493218
--- /dev/null
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/RegionProviderJUnitTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal;
+
+import static org.assertj.core.api.Assertions.assertThatNoException;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import org.junit.Test;
+
+public class RegionProviderJUnitTest {
+
+  @Test
+  public void testBucket_whenPowerOfTwo() {
+    assertThatNoException().isThrownBy(() -> RegionProvider.validateBucketCount(128));
+  }
+
+  @Test
+  public void testException_whenGreaterThanSlots() {
+    assertThatThrownBy(() -> RegionProvider.validateBucketCount(32768))
+        .hasMessageContaining("System property 'redis.region.buckets' must be <= 16384");
+  }
+
+}
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/ByteArrayWrapperJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/ByteArrayWrapperJUnitTest.java
new file mode 100644
index 0000000..8937f70
--- /dev/null
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/ByteArrayWrapperJUnitTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.data;
+
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+
+import org.apache.geode.redis.internal.executor.cluster.CRC16;
+
+public class ByteArrayWrapperJUnitTest {
+
+  @Test
+  public void testRoutingId_withHashtags() {
+    ByteArrayWrapper baw = new ByteArrayWrapper("name{user1000}".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user1000"));
+
+    baw = new ByteArrayWrapper("{user1000".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("{user1000"));
+
+    baw = new ByteArrayWrapper("}user1000{".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("}user1000{"));
+
+    baw = new ByteArrayWrapper("user{}1000".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user{}1000"));
+
+    baw = new ByteArrayWrapper("user}{1000".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user}{1000"));
+
+    baw = new ByteArrayWrapper("{user1000}}bar".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user1000"));
+
+    baw = new ByteArrayWrapper("foo{user1000}{bar}".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user1000"));
+
+    baw = new ByteArrayWrapper("foo{}{user1000}".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("foo{}{user1000}"));
+
+    baw = new ByteArrayWrapper("{}{user1000}".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("{}{user1000}"));
+
+    baw = new ByteArrayWrapper("foo{{user1000}}bar".getBytes());
+    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("{user1000"));
+  }
+
+  private int calculateRoutingId(String data) {
+    return (CRC16.calculate(data.getBytes(), 0, data.length()) % REDIS_SLOTS)
+        / REDIS_SLOTS_PER_BUCKET;
+  }
+}
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/executor/cluster/CRC16JUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/executor/cluster/CRC16JUnitTest.java
new file mode 100644
index 0000000..c2dfaf3
--- /dev/null
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/executor/cluster/CRC16JUnitTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.executor.cluster;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Test;
+import redis.clients.jedis.util.JedisClusterCRC16;
+
+public class CRC16JUnitTest {
+
+  @Test
+  public void testBasicCRC16_sameAsRedis() {
+    byte[] data = new byte[] {0};
+    assertThat(CRC16.calculate(data, 0, 1))
+        .isEqualTo((short) JedisClusterCRC16.getCRC16(data));
+
+    data = new byte[] {1};
+    assertThat(CRC16.calculate(new byte[] {1}, 0, 1))
+        .isEqualTo((short) JedisClusterCRC16.getCRC16(data));
+
+    data = "123456789".getBytes();
+    assertThat(CRC16.calculate(data, 0, data.length))
+        .isEqualTo((short) JedisClusterCRC16.getCRC16(data));
+
+    data = "---123456789---".getBytes();
+    assertThat(CRC16.calculate(data, 3, 12))
+        .isEqualTo((short) JedisClusterCRC16.getCRC16(data, 3, 12));
+
+    data = "abcdefghijklmnopqrstuvwxyz".getBytes();
+    assertThat(CRC16.calculate(data, 0, data.length))
+        .isEqualTo((short) JedisClusterCRC16.getCRC16(data));
+
+    data = "user1000".getBytes();
+    assertThat(CRC16.calculate(data, 0, data.length))
+        .isEqualTo((short) JedisClusterCRC16.getCRC16(data));
+  }
+
+}