You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2021/03/23 18:54:01 UTC

[geode] branch support/1.14 updated: GEODE-9044: Introduce RedisKey as key object for RedisData entries (#6146) (#6173)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/support/1.14 by this push:
     new f91c577  GEODE-9044: Introduce RedisKey as key object for RedisData entries (#6146) (#6173)
f91c577 is described below

commit f91c577f1751d98ed4bc8e8569fd7d88a01c224e
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Tue Mar 23 11:53:06 2021 -0700

    GEODE-9044: Introduce RedisKey as key object for RedisData entries (#6146) (#6173)
    
    - Refactor all redis key references to use RedisKey
    - RedisKey extends ByteArrayWrapper and adds a routingId integer value.
      This is used by the RedisPartitionResolver to map entries to buckets.
    
    (cherry picked from commit 3a9baeff35c362cb6ec5df949553764eefcdc6a2)
---
 .../cluster/RedisPartitionResolverDUnitTest.java   |   6 +-
 .../internal/executor/key/RenameDUnitTest.java     |   3 +-
 .../key/AbstractRenameIntegrationTest.java         |   9 +-
 .../codeAnalysis/sanctionedDataSerializables.txt   |   4 +
 .../geode/redis/internal/GeodeRedisService.java    |   4 +
 .../redis/internal/PassiveExpirationManager.java   |  14 ++-
 .../geode/redis/internal/RegionProvider.java       |  10 +--
 .../redis/internal/data/AbstractRedisData.java     |  17 ++--
 .../redis/internal/data/ByteArrayWrapper.java      |  42 +--------
 .../geode/redis/internal/data/CommandHelper.java   |  20 ++---
 .../geode/redis/internal/data/NullRedisData.java   |  17 ++--
 .../geode/redis/internal/data/NullRedisHash.java   |   6 +-
 .../geode/redis/internal/data/NullRedisSet.java    |  38 ++++----
 .../geode/redis/internal/data/NullRedisString.java |  46 ++++------
 .../geode/redis/internal/data/RedisData.java       |  15 ++--
 .../data/RedisDataCommandsFunctionExecutor.java    |   4 +-
 .../geode/redis/internal/data/RedisHash.java       |  35 ++++----
 .../data/RedisHashCommandsFunctionExecutor.java    |  29 +++---
 .../apache/geode/redis/internal/data/RedisKey.java |  88 ++++++++++++++++++
 .../data/RedisKeyCommandsFunctionExecutor.java     |  18 ++--
 .../apache/geode/redis/internal/data/RedisSet.java |  12 +--
 .../data/RedisSetCommandsFunctionExecutor.java     |  41 +++------
 .../geode/redis/internal/data/RedisString.java     |  28 +++---
 .../data/RedisStringCommandsFunctionExecutor.java  |  44 ++++-----
 .../redis/internal/executor/AbstractExecutor.java  |   4 +-
 .../redis/internal/executor/CommandFunction.java   |  15 ++--
 .../executor/RedisCommandsFunctionInvoker.java     |   9 +-
 .../executor/SingleResultRedisFunction.java        |  12 ++-
 .../redis/internal/executor/cluster/CRC16.java     |   5 ++
 .../executor/cluster/RedisPartitionResolver.java   |  12 ++-
 .../redis/internal/executor/hash/HDelExecutor.java |   3 +-
 .../internal/executor/hash/HExistsExecutor.java    |   3 +-
 .../internal/executor/hash/HGetAllExecutor.java    |   3 +-
 .../redis/internal/executor/hash/HGetExecutor.java |   3 +-
 .../internal/executor/hash/HIncrByExecutor.java    |   3 +-
 .../executor/hash/HIncrByFloatExecutor.java        |   3 +-
 .../internal/executor/hash/HKeysExecutor.java      |   3 +-
 .../redis/internal/executor/hash/HLenExecutor.java |   4 +-
 .../internal/executor/hash/HMGetExecutor.java      |   3 +-
 .../internal/executor/hash/HMSetExecutor.java      |   3 +-
 .../internal/executor/hash/HScanExecutor.java      |   4 +-
 .../redis/internal/executor/hash/HSetExecutor.java |   3 +-
 .../internal/executor/hash/HStrLenExecutor.java    |   3 +-
 .../internal/executor/hash/HValsExecutor.java      |   3 +-
 .../internal/executor/hash/RedisHashCommands.java  |  27 +++---
 .../hash/RedisHashCommandsFunctionInvoker.java     |  31 ++++---
 .../redis/internal/executor/key/DelExecutor.java   |   4 +-
 .../internal/executor/key/ExistsExecutor.java      |   4 +-
 .../internal/executor/key/ExpireAtExecutor.java    |   4 +-
 .../internal/executor/key/ExpireExecutor.java      |   4 +-
 .../redis/internal/executor/key/KeysExecutor.java  |   3 +-
 .../internal/executor/key/PersistExecutor.java     |   4 +-
 .../internal/executor/key/RedisKeyCommands.java    |  20 ++---
 .../key/RedisKeyCommandsFunctionInvoker.java       |  21 ++---
 .../internal/executor/key/RenameExecutor.java      |   8 +-
 .../internal/executor/key/RenameFunction.java      |  42 ++++-----
 .../redis/internal/executor/key/ScanExecutor.java  |   6 +-
 .../redis/internal/executor/key/TTLExecutor.java   |   4 +-
 .../redis/internal/executor/key/TypeExecutor.java  |   4 +-
 .../internal/executor/server/FlushAllExecutor.java |   4 +-
 .../internal/executor/set/RedisSetCommands.java    |  25 +++---
 .../set/RedisSetCommandsFunctionInvoker.java       |  28 +++---
 .../redis/internal/executor/set/SCardExecutor.java |   4 +-
 .../internal/executor/set/SIsMemberExecutor.java   |   3 +-
 .../internal/executor/set/SMembersExecutor.java    |   3 +-
 .../redis/internal/executor/set/SMoveExecutor.java |   5 +-
 .../redis/internal/executor/set/SPopExecutor.java  |   3 +-
 .../internal/executor/set/SRandMemberExecutor.java |   3 +-
 .../redis/internal/executor/set/SRemExecutor.java  |   3 +-
 .../redis/internal/executor/set/SScanExecutor.java |   4 +-
 .../redis/internal/executor/set/SetOpExecutor.java |  13 +--
 .../internal/executor/string/AppendExecutor.java   |   3 +-
 .../internal/executor/string/BitCountExecutor.java |   4 +-
 .../internal/executor/string/BitOpExecutor.java    |   8 +-
 .../internal/executor/string/BitPosExecutor.java   |   4 +-
 .../internal/executor/string/DecrByExecutor.java   |   4 +-
 .../internal/executor/string/DecrExecutor.java     |   4 +-
 .../internal/executor/string/GetBitExecutor.java   |   4 +-
 .../internal/executor/string/GetExecutor.java      |   3 +-
 .../internal/executor/string/GetRangeExecutor.java |   3 +-
 .../internal/executor/string/GetSetExecutor.java   |   3 +-
 .../internal/executor/string/IncrByExecutor.java   |   4 +-
 .../executor/string/IncrByFloatExecutor.java       |   4 +-
 .../internal/executor/string/IncrExecutor.java     |   4 +-
 .../internal/executor/string/MGetExecutor.java     |   3 +-
 .../internal/executor/string/MSetExecutor.java     |   3 +-
 .../internal/executor/string/MSetNXExecutor.java   |   5 +-
 .../executor/string/RedisStringCommands.java       |  39 ++++----
 .../string/RedisStringCommandsFunctionInvoker.java |  41 ++++-----
 .../internal/executor/string/SetBitExecutor.java   |   4 +-
 .../internal/executor/string/SetEXExecutor.java    |   3 +-
 .../internal/executor/string/SetExecutor.java      |   5 +-
 .../internal/executor/string/SetNXExecutor.java    |   3 +-
 .../internal/executor/string/SetRangeExecutor.java |   4 +-
 .../internal/executor/string/StrlenExecutor.java   |   4 +-
 .../apache/geode/redis/internal/netty/Command.java |  16 +++-
 .../apache/geode/redis/internal/pubsub/PubSub.java |   4 +-
 .../geode/redis/internal/pubsub/PubSubImpl.java    |   6 +-
 .../internal/data/ByteArrayWrapperJUnitTest.java   |  65 --------------
 .../geode/redis/internal/data/RedisHashTest.java   |   6 +-
 .../redis/internal/data/RedisKeyJUnitTest.java     | 100 +++++++++++++++++++++
 .../geode/redis/internal/data/RedisSetTest.java    |   6 +-
 .../geode/redis/internal/data/RedisStringTest.java |  94 +++++++++----------
 .../serialization/DataSerializableFixedID.java     |   4 +-
 104 files changed, 752 insertions(+), 650 deletions(-)

diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
index 2574956..9f8b81d 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/cluster/RedisPartitionResolverDUnitTest.java
@@ -32,8 +32,8 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.internal.cache.LocalDataSet;
 import org.apache.geode.redis.internal.RegionProvider;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 import org.apache.geode.test.dunit.SerializableCallableIF;
 import org.apache.geode.test.dunit.rules.MemberVM;
@@ -94,7 +94,7 @@ public class RedisPartitionResolverDUnitTest {
 
   private Map<String, Integer> getKeyToBucketMap(MemberVM vm) {
     return vm.invoke((SerializableCallableIF<Map<String, Integer>>) () -> {
-      Region<ByteArrayWrapper, RedisData> region =
+      Region<RedisKey, RedisData> region =
           RedisClusterStartupRule.getCache().getRegion(RegionProvider.REDIS_DATA_REGION);
 
       LocalDataSet local = (LocalDataSet) PartitionRegionHelper.getLocalPrimaryData(region);
@@ -102,7 +102,7 @@ public class RedisPartitionResolverDUnitTest {
 
       for (Object key : local.localKeys()) {
         int id = local.getProxy().getKeyInfo(key).getBucketId();
-        keyMap.put(new String(((ByteArrayWrapper) key).toBytes()), id);
+        keyMap.put(key.toString(), id);
       }
 
       return keyMap;
diff --git a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
index d1140b0..0fe6e32 100644
--- a/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
+++ b/geode-redis/src/distributedTest/java/org/apache/geode/redis/internal/executor/key/RenameDUnitTest.java
@@ -37,6 +37,7 @@ import org.junit.Test;
 import redis.clients.jedis.Jedis;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.StripedExecutor;
 import org.apache.geode.redis.internal.executor.SynchronizedStripedExecutor;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -136,7 +137,7 @@ public class RenameDUnitTest {
   private Set<String> getKeysOnSameRandomStripe(int numKeysNeeded) {
     Random random = new Random();
     String key1 = "keyz" + random.nextInt();
-    ByteArrayWrapper key1ByteArrayWrapper = new ByteArrayWrapper(key1.getBytes());
+    RedisKey key1ByteArrayWrapper = new RedisKey(key1.getBytes());
     StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
     Set<String> keys = new HashSet<>();
     keys.add(key1);
diff --git a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
index 9397a80..d9f7e60 100644
--- a/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
+++ b/geode-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/key/AbstractRenameIntegrationTest.java
@@ -41,6 +41,7 @@ import redis.clients.jedis.exceptions.JedisDataException;
 
 import org.apache.geode.redis.internal.RedisConstants;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.StripedExecutor;
 import org.apache.geode.redis.internal.executor.SynchronizedStripedExecutor;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
@@ -246,7 +247,7 @@ public abstract class AbstractRenameIntegrationTest implements RedisPortSupplier
   private List<String> getKeysOnDifferentStripes() {
     String key1 = "keyz" + new Random().nextInt();
 
-    ByteArrayWrapper key1ByteArrayWrapper = new ByteArrayWrapper(key1.getBytes());
+    RedisKey key1ByteArrayWrapper = new RedisKey(key1.getBytes());
     StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
     int iterator = 0;
     String key2;
@@ -262,7 +263,7 @@ public abstract class AbstractRenameIntegrationTest implements RedisPortSupplier
   private Set<String> getKeysOnSameRandomStripe(int numKeysNeeded) {
     Random random = new Random();
     String key1 = "keyz" + random.nextInt();
-    ByteArrayWrapper key1ByteArrayWrapper = new ByteArrayWrapper(key1.getBytes());
+    RedisKey key1ByteArrayWrapper = new RedisKey(key1.getBytes());
     StripedExecutor stripedExecutor = new SynchronizedStripedExecutor();
     Set<String> keys = new HashSet<>();
     keys.add(key1);
@@ -326,10 +327,10 @@ public abstract class AbstractRenameIntegrationTest implements RedisPortSupplier
     List<String> keys = new ArrayList<>();
 
     String key1;
-    ByteArrayWrapper key1ByteArrayWrapper;
+    RedisKey key1ByteArrayWrapper;
     do {
       key1 = "keyz" + new Random().nextInt();
-      key1ByteArrayWrapper = new ByteArrayWrapper(key1.getBytes());
+      key1ByteArrayWrapper = new RedisKey(key1.getBytes());
     } while (stripedExecutor.compareStripes(key1ByteArrayWrapper, toAvoid) == 0 && keys.add(key1));
 
     do {
diff --git a/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 6e96a2c..0512760 100644
--- a/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -14,6 +14,10 @@ org/apache/geode/redis/internal/data/RedisHash,2
 toData,15
 fromData,15
 
+org/apache/geode/redis/internal/data/RedisKey,2
+fromData,20
+toData,17
+
 org/apache/geode/redis/internal/data/RedisSet,2
 toData,15
 fromData,15
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java
index 8957731..2db0cfd 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java
@@ -31,6 +31,7 @@ import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.NullRedisData;
 import org.apache.geode.redis.internal.data.RedisHash;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.data.RedisSet;
 import org.apache.geode.redis.internal.data.RedisString;
 import org.apache.geode.redis.internal.executor.string.SetOptions;
@@ -55,6 +56,9 @@ public class GeodeRedisService implements CacheService, ResourceEventsListener {
 
   private void registerDataSerializables() {
     InternalDataSerializer.getDSFIDSerializer().registerDSFID(
+        DataSerializableFixedID.REDIS_KEY,
+        RedisKey.class);
+    InternalDataSerializer.getDSFIDSerializer().registerDSFID(
         DataSerializableFixedID.REDIS_BYTE_ARRAY_WRAPPER,
         ByteArrayWrapper.class);
     InternalDataSerializer.getDSFIDSerializer().registerDSFID(
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java
index 6094fe9..c4b261a 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/PassiveExpirationManager.java
@@ -29,8 +29,8 @@ import org.apache.geode.cache.EntryDestroyedException;
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.partition.PartitionRegionHelper;
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommands;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommandsFunctionInvoker;
 import org.apache.geode.redis.internal.statistics.RedisStats;
@@ -38,13 +38,12 @@ import org.apache.geode.redis.internal.statistics.RedisStats;
 public class PassiveExpirationManager {
   private static final Logger logger = LogService.getLogger();
 
-  private final Region<ByteArrayWrapper, RedisData> dataRegion;
+  private final Region<RedisKey, RedisData> dataRegion;
   private final ScheduledExecutorService expirationExecutor;
   private final RedisStats redisStats;
 
 
-  public PassiveExpirationManager(Region<ByteArrayWrapper, RedisData> dataRegion,
-      RedisStats redisStats) {
+  public PassiveExpirationManager(Region<RedisKey, RedisData> dataRegion, RedisStats redisStats) {
     this.dataRegion = dataRegion;
     this.redisStats = redisStats;
     expirationExecutor = newSingleThreadScheduledExecutor("GemFireRedis-PassiveExpiration-");
@@ -58,16 +57,15 @@ public class PassiveExpirationManager {
     expirationExecutor.shutdownNow();
   }
 
-  private void doDataExpiration(
-      Region<ByteArrayWrapper, RedisData> redisData) {
+  private void doDataExpiration(Region<RedisKey, RedisData> redisData) {
     final long start = redisStats.startPassiveExpirationCheck();
     long expireCount = 0;
     try {
       final long now = System.currentTimeMillis();
-      Region<ByteArrayWrapper, RedisData> localPrimaryData =
+      Region<RedisKey, RedisData> localPrimaryData =
           PartitionRegionHelper.getLocalPrimaryData(redisData);
       RedisKeyCommands redisKeyCommands = new RedisKeyCommandsFunctionInvoker(redisData);
-      for (Map.Entry<ByteArrayWrapper, RedisData> entry : localPrimaryData.entrySet()) {
+      for (Map.Entry<RedisKey, RedisData> entry : localPrimaryData.entrySet()) {
         try {
           if (entry.getValue().hasExpired(now)) {
             // pttl will do its own check using active expiration and expire the key if needed
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
index a9f6c6f..17a4544 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/RegionProvider.java
@@ -20,8 +20,8 @@ import org.apache.geode.cache.RegionShortcut;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.InternalRegionFactory;
 import org.apache.geode.management.ManagementException;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
 
 public class RegionProvider {
@@ -40,17 +40,17 @@ public class RegionProvider {
 
   public static final int REDIS_SLOTS_PER_BUCKET = REDIS_SLOTS / REDIS_REGION_BUCKETS;
 
-  private final Region<ByteArrayWrapper, RedisData> dataRegion;
+  private final Region<RedisKey, RedisData> dataRegion;
   private final Region<String, Object> configRegion;
 
   public RegionProvider(InternalCache cache) {
     validateBucketCount(REDIS_REGION_BUCKETS);
 
-    InternalRegionFactory<ByteArrayWrapper, RedisData> redisDataRegionFactory =
+    InternalRegionFactory<RedisKey, RedisData> redisDataRegionFactory =
         cache.createInternalRegionFactory(RegionShortcut.PARTITION_REDUNDANT);
     redisDataRegionFactory.setInternalRegion(true).setIsUsedForMetaRegion(true);
 
-    PartitionAttributesFactory<ByteArrayWrapper, RedisData> attributesFactory =
+    PartitionAttributesFactory<RedisKey, RedisData> attributesFactory =
         new PartitionAttributesFactory<>();
     attributesFactory.setPartitionResolver(new RedisPartitionResolver());
     attributesFactory.setTotalNumBuckets(REDIS_REGION_BUCKETS);
@@ -64,7 +64,7 @@ public class RegionProvider {
     configRegion = redisConfigRegionFactory.create(REDIS_CONFIG_REGION);
   }
 
-  public Region<ByteArrayWrapper, RedisData> getDataRegion() {
+  public Region<RedisKey, RedisData> getDataRegion() {
     return dataRegion;
   }
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index 99fbb0f..ee8acac 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -56,8 +56,7 @@ public abstract class AbstractRedisData implements RedisData {
   private transient DeltaInfo deltaInfo;
 
   @Override
-  public void setExpirationTimestamp(Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key, long value) {
+  public void setExpirationTimestamp(Region<RedisKey, RedisData> region, RedisKey key, long value) {
     expirationTimestamp = value;
     storeChanges(region, key, new TimestampDeltaInfo(value));
   }
@@ -67,7 +66,7 @@ public abstract class AbstractRedisData implements RedisData {
   }
 
   @Override
-  public int pexpireat(CommandHelper helper, ByteArrayWrapper key, long timestamp) {
+  public int pexpireat(CommandHelper helper, RedisKey key, long timestamp) {
     long now = System.currentTimeMillis();
     if (now >= timestamp) {
       // already expired
@@ -79,15 +78,14 @@ public abstract class AbstractRedisData implements RedisData {
   }
 
   @Override
-  public void doExpiration(CommandHelper helper, ByteArrayWrapper key) {
+  public void doExpiration(CommandHelper helper, RedisKey key) {
     long start = helper.getRedisStats().startExpiration();
     helper.getRegion().remove(key);
     helper.getRedisStats().endExpiration(start);
   }
 
   @Override
-  public boolean rename(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper oldKey,
-      ByteArrayWrapper newKey) {
+  public boolean rename(Region<RedisKey, RedisData> region, RedisKey oldKey, RedisKey newKey) {
     region.put(newKey, this, primaryMoveReadLockAcquired);
     try {
       region.destroy(oldKey, primaryMoveReadLockAcquired);
@@ -102,7 +100,7 @@ public abstract class AbstractRedisData implements RedisData {
   }
 
   @Override
-  public long pttl(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) {
+  public long pttl(Region<RedisKey, RedisData> region, RedisKey key) {
     long expireTimestamp = getExpirationTimestamp();
     if (expireTimestamp == NO_EXPIRATION) {
       return -1;
@@ -116,8 +114,7 @@ public abstract class AbstractRedisData implements RedisData {
   }
 
   @Override
-  public int persist(Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key) {
+  public int persist(Region<RedisKey, RedisData> region, RedisKey key) {
     if (getExpirationTimestamp() == NO_EXPIRATION) {
       return 0;
     }
@@ -217,7 +214,7 @@ public abstract class AbstractRedisData implements RedisData {
     }
   }
 
-  protected void storeChanges(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  protected void storeChanges(Region<RedisKey, RedisData> region, RedisKey key,
       DeltaInfo deltaInfo) {
     if (deltaInfo != null) {
       if (removeFromRegion()) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
index 162cf46..79d4f4f 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/ByteArrayWrapper.java
@@ -16,9 +16,6 @@
 
 package org.apache.geode.redis.internal.data;
 
-import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
-import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
@@ -29,8 +26,6 @@ import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.redis.internal.executor.cluster.CRC16;
-import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
 import org.apache.geode.redis.internal.netty.Coder;
 
 /**
@@ -45,8 +40,6 @@ public class ByteArrayWrapper
    */
   protected byte[] value;
 
-  private transient Object routingId;
-
   /**
    * Empty constructor for serialization
    */
@@ -115,39 +108,6 @@ public class ByteArrayWrapper
   }
 
   /**
-   * Used by the {@link RedisPartitionResolver} to map slots to buckets. Supports using hashtags
-   * in the same way that redis does.
-   *
-   * @see <a href="https://redis.io/topics/cluster-spec">Redis Cluster Spec</a>
-   */
-  public synchronized Object getRoutingId() {
-    if (routingId == null && value != null) {
-      int startHashtag = Integer.MAX_VALUE;
-      int endHashtag = 0;
-
-      for (int i = 0; i < value.length; i++) {
-        if (value[i] == '{' && startHashtag == Integer.MAX_VALUE) {
-          startHashtag = i;
-        } else if (value[i] == '}') {
-          endHashtag = i;
-          break;
-        }
-      }
-
-      if (endHashtag - startHashtag <= 1) {
-        startHashtag = -1;
-        endHashtag = value.length;
-      }
-
-      // & (REDIS_SLOTS - 1) is equivalent to % REDIS_SLOTS but supposedly faster
-      routingId = (CRC16.calculate(value, startHashtag + 1, endHashtag) & (REDIS_SLOTS - 1))
-          / REDIS_SLOTS_PER_BUCKET;
-    }
-
-    return routingId;
-  }
-
-  /**
    * Private helper method to compare two byte arrays, A.compareTo(B). The comparison is basically
    * numerical, for each byte index, the byte representing the greater value will be the greater
    *
@@ -213,7 +173,7 @@ public class ByteArrayWrapper
 
   @Override
   public void fromData(DataInput in, DeserializationContext context)
-      throws IOException {
+      throws IOException, ClassNotFoundException {
     value = DataSerializer.readByteArray(in);
   }
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java
index 8f228b0..e78c963 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/CommandHelper.java
@@ -39,11 +39,11 @@ import org.apache.geode.redis.internal.statistics.RedisStats;
  * to prevent garbage creation.
  */
 public class CommandHelper {
-  private final Region<ByteArrayWrapper, RedisData> region;
+  private final Region<RedisKey, RedisData> region;
   private final RedisStats redisStats;
   private final StripedExecutor stripedExecutor;
 
-  public Region<ByteArrayWrapper, RedisData> getRegion() {
+  public Region<RedisKey, RedisData> getRegion() {
     return region;
   }
 
@@ -56,7 +56,7 @@ public class CommandHelper {
   }
 
   public CommandHelper(
-      Region<ByteArrayWrapper, RedisData> region,
+      Region<RedisKey, RedisData> region,
       RedisStats redisStats,
       StripedExecutor stripedExecutor) {
     this.region = region;
@@ -64,11 +64,11 @@ public class CommandHelper {
     this.stripedExecutor = stripedExecutor;
   }
 
-  RedisData getRedisData(ByteArrayWrapper key) {
+  RedisData getRedisData(RedisKey key) {
     return getRedisData(key, NullRedisDataStructures.NULL_REDIS_DATA);
   }
 
-  RedisData getRedisData(ByteArrayWrapper key, RedisData notFoundValue) {
+  RedisData getRedisData(RedisKey key, RedisData notFoundValue) {
     RedisData result = region.get(key);
     if (result != null) {
       if (result.hasExpired()) {
@@ -83,7 +83,7 @@ public class CommandHelper {
     }
   }
 
-  RedisSet getRedisSet(ByteArrayWrapper key, boolean updateStats) {
+  RedisSet getRedisSet(RedisKey key, boolean updateStats) {
     RedisData redisData = getRedisData(key, NULL_REDIS_SET);
     if (updateStats) {
       if (redisData == NULL_REDIS_SET) {
@@ -105,7 +105,7 @@ public class CommandHelper {
     return (RedisSet) redisData;
   }
 
-  RedisHash getRedisHash(ByteArrayWrapper key, boolean updateStats) {
+  RedisHash getRedisHash(RedisKey key, boolean updateStats) {
     RedisData redisData = getRedisData(key, NULL_REDIS_HASH);
     if (updateStats) {
       if (redisData == NULL_REDIS_HASH) {
@@ -140,7 +140,7 @@ public class CommandHelper {
     return (RedisString) redisData;
   }
 
-  RedisString getRedisString(ByteArrayWrapper key, boolean updateStats) {
+  RedisString getRedisString(RedisKey key, boolean updateStats) {
     RedisData redisData = getRedisData(key, NULL_REDIS_STRING);
     if (updateStats) {
       if (redisData == NULL_REDIS_STRING) {
@@ -153,7 +153,7 @@ public class CommandHelper {
     return checkStringType(redisData, false);
   }
 
-  RedisString getRedisStringIgnoringType(ByteArrayWrapper key, boolean updateStats) {
+  RedisString getRedisStringIgnoringType(RedisKey key, boolean updateStats) {
     RedisData redisData = getRedisData(key, NULL_REDIS_STRING);
     if (updateStats) {
       if (redisData == NULL_REDIS_STRING) {
@@ -166,7 +166,7 @@ public class CommandHelper {
     return checkStringType(redisData, true);
   }
 
-  RedisString setRedisString(ByteArrayWrapper key, ByteArrayWrapper value) {
+  RedisString setRedisString(RedisKey key, ByteArrayWrapper value) {
     RedisString result;
     RedisData redisData = getRedisData(key);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
index e810cb9..b31e912 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
@@ -40,10 +40,8 @@ public class NullRedisData implements RedisData {
   }
 
   @Override
-  public void setExpirationTimestamp(Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key, long value) {
-
-  }
+  public void setExpirationTimestamp(Region<RedisKey, RedisData> region,
+      RedisKey key, long value) {}
 
   @Override
   public long getExpirationTimestamp() {
@@ -51,7 +49,7 @@ public class NullRedisData implements RedisData {
   }
 
   @Override
-  public int persist(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) {
+  public int persist(Region<RedisKey, RedisData> region, RedisKey key) {
     return 0;
   }
 
@@ -66,17 +64,17 @@ public class NullRedisData implements RedisData {
   }
 
   @Override
-  public long pttl(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) {
+  public long pttl(Region<RedisKey, RedisData> region, RedisKey key) {
     return -2;
   }
 
   @Override
-  public int pexpireat(CommandHelper helper, ByteArrayWrapper key, long timestamp) {
+  public int pexpireat(CommandHelper helper, RedisKey key, long timestamp) {
     return 0;
   }
 
   @Override
-  public void doExpiration(CommandHelper helper, ByteArrayWrapper key) {
+  public void doExpiration(CommandHelper helper, RedisKey key) {
     // nothing needed
   }
 
@@ -86,8 +84,7 @@ public class NullRedisData implements RedisData {
   }
 
   @Override
-  public boolean rename(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper oldKey,
-      ByteArrayWrapper newKey) {
+  public boolean rename(Region<RedisKey, RedisData> region, RedisKey oldKey, RedisKey newKey) {
     return false;
   }
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisHash.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisHash.java
index 4e20ad9..3a453ec 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisHash.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisHash.java
@@ -36,14 +36,14 @@ public class NullRedisHash extends RedisHash {
   }
 
   @Override
-  public int hset(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public int hset(Region<RedisKey, RedisData> region, RedisKey key,
       List<ByteArrayWrapper> fieldsToSet, boolean nx) {
     region.put(key, new RedisHash(fieldsToSet));
     return fieldsToSet.size() / 2;
   }
 
   @Override
-  public long hincrby(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public long hincrby(Region<RedisKey, RedisData> region, RedisKey key,
       ByteArrayWrapper field, long increment)
       throws NumberFormatException, ArithmeticException {
     region.put(key,
@@ -52,7 +52,7 @@ public class NullRedisHash extends RedisHash {
   }
 
   @Override
-  public BigDecimal hincrbyfloat(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public BigDecimal hincrbyfloat(Region<RedisKey, RedisData> region, RedisKey key,
       ByteArrayWrapper field, BigDecimal increment) throws NumberFormatException {
     region.put(key,
         new RedisHash(
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java
index f2b5446..9ef7658 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisSet.java
@@ -41,8 +41,8 @@ class NullRedisSet extends RedisSet {
   }
 
   @Override
-  Collection<ByteArrayWrapper> spop(Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key, int popCount) {
+  Collection<ByteArrayWrapper> spop(Region<RedisKey, RedisData> region,
+      RedisKey key, int popCount) {
     return emptyList();
   }
 
@@ -62,15 +62,15 @@ class NullRedisSet extends RedisSet {
   }
 
   @Override
-  long sadd(ArrayList<ByteArrayWrapper> membersToAdd,
-      Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) {
+  long sadd(ArrayList<ByteArrayWrapper> membersToAdd, Region<RedisKey, RedisData> region,
+      RedisKey key) {
     region.create(key, new RedisSet(membersToAdd));
     return membersToAdd.size();
   }
 
   @Override
-  long srem(ArrayList<ByteArrayWrapper> membersToRemove,
-      Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key) {
+  long srem(ArrayList<ByteArrayWrapper> membersToRemove, Region<RedisKey, RedisData> region,
+      RedisKey key) {
     return 0;
   }
 
@@ -85,23 +85,23 @@ class NullRedisSet extends RedisSet {
     UNION, INTERSECTION, DIFF
   }
 
-  public int sunionstore(CommandHelper helper, ByteArrayWrapper destination,
-      ArrayList<ByteArrayWrapper> setKeys) {
+  public int sunionstore(CommandHelper helper, RedisKey destination,
+      ArrayList<RedisKey> setKeys) {
     return doSetOp(SetOp.UNION, helper, destination, setKeys);
   }
 
-  public int sinterstore(CommandHelper helper, ByteArrayWrapper destination,
-      ArrayList<ByteArrayWrapper> setKeys) {
+  public int sinterstore(CommandHelper helper, RedisKey destination,
+      ArrayList<RedisKey> setKeys) {
     return doSetOp(SetOp.INTERSECTION, helper, destination, setKeys);
   }
 
-  public int sdiffstore(CommandHelper helper, ByteArrayWrapper destination,
-      ArrayList<ByteArrayWrapper> setKeys) {
+  public int sdiffstore(CommandHelper helper, RedisKey destination,
+      ArrayList<RedisKey> setKeys) {
     return doSetOp(SetOp.DIFF, helper, destination, setKeys);
   }
 
   private int doSetOp(SetOp setOp, CommandHelper helper,
-      ByteArrayWrapper destination, ArrayList<ByteArrayWrapper> setKeys) {
+      RedisKey destination, ArrayList<RedisKey> setKeys) {
     ArrayList<Set<ByteArrayWrapper>> nonDestinationSets =
         fetchSets(helper.getRegion(), setKeys, destination);
     return helper.getStripedExecutor()
@@ -110,7 +110,7 @@ class NullRedisSet extends RedisSet {
   }
 
   private int doSetOpWhileLocked(SetOp setOp, CommandHelper helper,
-      ByteArrayWrapper destination,
+      RedisKey destination,
       ArrayList<Set<ByteArrayWrapper>> nonDestinationSets) {
     Set<ByteArrayWrapper> result = computeSetOp(setOp, nonDestinationSets, helper, destination);
     if (result.isEmpty()) {
@@ -125,7 +125,7 @@ class NullRedisSet extends RedisSet {
   private Set<ByteArrayWrapper> computeSetOp(SetOp setOp,
       ArrayList<Set<ByteArrayWrapper>> nonDestinationSets,
       CommandHelper helper,
-      ByteArrayWrapper destination) {
+      RedisKey destination) {
     Set<ByteArrayWrapper> result = null;
     if (nonDestinationSets.isEmpty()) {
       return emptySet();
@@ -160,12 +160,12 @@ class NullRedisSet extends RedisSet {
    * This is all done outside the striped executor to prevent a deadlock.
    */
   private ArrayList<Set<ByteArrayWrapper>> fetchSets(
-      Region<ByteArrayWrapper, RedisData> region,
-      ArrayList<ByteArrayWrapper> setKeys,
-      ByteArrayWrapper destination) {
+      Region<RedisKey, RedisData> region,
+      ArrayList<RedisKey> setKeys,
+      RedisKey destination) {
     ArrayList<Set<ByteArrayWrapper>> result = new ArrayList<>(setKeys.size());
     RedisSetCommands redisSetCommands = new RedisSetCommandsFunctionInvoker(region);
-    for (ByteArrayWrapper key : setKeys) {
+    for (RedisKey key : setKeys) {
       if (key.equals(destination)) {
         result.add(null);
       } else {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java
index 89e3fa6..684778a 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisString.java
@@ -67,7 +67,7 @@ public class NullRedisString extends RedisString {
   }
 
   @Override
-  public int bitpos(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, int bit,
+  public int bitpos(Region<RedisKey, RedisData> region, RedisKey key, int bit,
       int start, Integer end) {
     if (bit == 0) {
       return 0;
@@ -77,9 +77,8 @@ public class NullRedisString extends RedisString {
   }
 
   @Override
-  public int setbit(
-      Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key, int bitValue, int byteIndex, byte bitIndex) {
+  public int setbit(Region<RedisKey, RedisData> region, RedisKey key,
+      int bitValue, int byteIndex, byte bitIndex) {
     RedisString newValue;
     if (bitValue == 1) {
       byte[] bytes = new byte[byteIndex + 1];
@@ -94,7 +93,7 @@ public class NullRedisString extends RedisString {
   }
 
   @Override
-  public long incr(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key)
+  public long incr(Region<RedisKey, RedisData> region, RedisKey key)
       throws NumberFormatException, ArithmeticException {
     byte[] newValue = {Coder.NUMBER_1_BYTE};
     region.put(key, new RedisString(new ByteArrayWrapper(newValue)));
@@ -102,7 +101,7 @@ public class NullRedisString extends RedisString {
   }
 
   @Override
-  public long incrby(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public long incrby(Region<RedisKey, RedisData> region, RedisKey key,
       long increment) throws NumberFormatException, ArithmeticException {
     byte[] newValue = Coder.longToBytes(increment);
     region.put(key, new RedisString(new ByteArrayWrapper(newValue)));
@@ -110,7 +109,7 @@ public class NullRedisString extends RedisString {
   }
 
   @Override
-  public BigDecimal incrbyfloat(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public BigDecimal incrbyfloat(Region<RedisKey, RedisData> region, RedisKey key,
       BigDecimal increment) throws NumberFormatException, ArithmeticException {
     byte[] newValue = Coder.bigDecimalToBytes(increment);
     region.put(key, new RedisString(new ByteArrayWrapper(newValue)));
@@ -118,37 +117,35 @@ public class NullRedisString extends RedisString {
   }
 
   @Override
-  public long decr(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key)
+  public long decr(Region<RedisKey, RedisData> region, RedisKey key)
       throws NumberFormatException, ArithmeticException {
     region.put(key, new RedisString(new ByteArrayWrapper(Coder.stringToBytes("-1"))));
     return -1;
   }
 
   @Override
-  public long decrby(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
-      long decrement) {
+  public long decrby(Region<RedisKey, RedisData> region, RedisKey key, long decrement) {
     byte[] newValue = Coder.longToBytes(-decrement);
     region.put(key, new RedisString(new ByteArrayWrapper(newValue)));
     return -decrement;
   }
 
   @Override
-  public int append(ByteArrayWrapper appendValue,
-      Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key) {
+  public int append(ByteArrayWrapper appendValue, Region<RedisKey, RedisData> region,
+      RedisKey key) {
     region.put(key, new RedisString(appendValue));
     return appendValue.length();
   }
 
   @Override
-  public ByteArrayWrapper getset(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public ByteArrayWrapper getset(Region<RedisKey, RedisData> region, RedisKey key,
       ByteArrayWrapper value) {
     region.put(key, new RedisString(value));
     return null;
   }
 
   @Override
-  public int setrange(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, int offset,
+  public int setrange(Region<RedisKey, RedisData> region, RedisKey key, int offset,
       byte[] valueToAdd) {
     byte[] newBytes = valueToAdd;
     if (valueToAdd.length != 0) {
@@ -165,8 +162,8 @@ public class NullRedisString extends RedisString {
    * SET is currently mostly implemented here. It does not have an implementation on
    * RedisString which is a bit odd.
    */
-  public boolean set(CommandHelper helper, ByteArrayWrapper key,
-      ByteArrayWrapper value, SetOptions options) {
+  public boolean set(CommandHelper helper, RedisKey key, ByteArrayWrapper value,
+      SetOptions options) {
     if (options != null) {
       if (options.isNX()) {
         return setnx(helper, key, value, options);
@@ -182,8 +179,8 @@ public class NullRedisString extends RedisString {
     return true;
   }
 
-  private boolean setnx(CommandHelper helper, ByteArrayWrapper key,
-      ByteArrayWrapper value, SetOptions options) {
+  private boolean setnx(CommandHelper helper, RedisKey key, ByteArrayWrapper value,
+      SetOptions options) {
     if (helper.getRedisData(key).exists()) {
       return false;
     }
@@ -198,15 +195,13 @@ public class NullRedisString extends RedisString {
    * RedisString which is a bit odd. This implementation only has a couple of places
    * that care if a RedisString for "key" exists.
    */
-  public int bitop(CommandHelper helper,
-      String operation,
-      ByteArrayWrapper key, List<ByteArrayWrapper> sources) {
+  public int bitop(CommandHelper helper, String operation, RedisKey key, List<RedisKey> sources) {
     List<ByteArrayWrapper> sourceValues = new ArrayList<>();
     int selfIndex = -1;
     // Read all the source values, except for self, before locking the stripe.
     RedisStringCommands commander =
         new RedisStringCommandsFunctionInvoker(helper.getRegion());
-    for (ByteArrayWrapper sourceKey : sources) {
+    for (RedisKey sourceKey : sources) {
       if (sourceKey.equals(key)) {
         // get self later after the stripe is locked
         selfIndex = sourceValues.size();
@@ -225,10 +220,7 @@ public class NullRedisString extends RedisString {
     AND, OR, XOR
   }
 
-  private int doBitOp(CommandHelper helper,
-      String operation,
-      ByteArrayWrapper key,
-      int selfIndex,
+  private int doBitOp(CommandHelper helper, String operation, RedisKey key, int selfIndex,
       List<ByteArrayWrapper> sourceValues) {
     if (selfIndex != -1) {
       RedisString redisString = helper.getRedisString(key, true);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
index 396d2a8..ef1aab0 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
@@ -37,28 +37,25 @@ public interface RedisData extends Delta, DataSerializableFixedID {
 
   RedisDataType getType();
 
-  void setExpirationTimestamp(Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key, long value);
+  void setExpirationTimestamp(Region<RedisKey, RedisData> region, RedisKey key, long value);
 
   long getExpirationTimestamp();
 
-  int persist(Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key);
+  int persist(Region<RedisKey, RedisData> region, RedisKey key);
 
   boolean hasExpired();
 
   boolean hasExpired(long now);
 
-  long pttl(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key);
+  long pttl(Region<RedisKey, RedisData> region, RedisKey key);
 
-  int pexpireat(CommandHelper helper, ByteArrayWrapper key, long timestamp);
+  int pexpireat(CommandHelper helper, RedisKey key, long timestamp);
 
-  void doExpiration(CommandHelper helper, ByteArrayWrapper key);
+  void doExpiration(CommandHelper helper, RedisKey key);
 
   String type();
 
-  boolean rename(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper oldKey,
-      ByteArrayWrapper newKey);
+  boolean rename(Region<RedisKey, RedisData> region, RedisKey oldKey, RedisKey newKey);
 
   default boolean getForceRecalculateSize() {
     return true;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
index e918324..056269b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisDataCommandsFunctionExecutor.java
@@ -34,7 +34,7 @@ public abstract class RedisDataCommandsFunctionExecutor {
     this.helper = helper;
   }
 
-  protected Region<ByteArrayWrapper, RedisData> getRegion() {
+  protected Region<RedisKey, RedisData> getRegion() {
     return helper.getRegion();
   }
 
@@ -43,7 +43,7 @@ public abstract class RedisDataCommandsFunctionExecutor {
     return helper.getStripedExecutor().execute(key, callable);
   }
 
-  protected RedisData getRedisData(ByteArrayWrapper key) {
+  protected RedisData getRedisData(RedisKey key) {
     return helper.getRedisData(key);
   }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
index 39f923e..9a8f81a 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
@@ -136,7 +136,6 @@ public class RedisHash extends AbstractRedisData {
     this.HSCANSnapshotExpirationExecutor = null;
   }
 
-
   /**
    * Since GII (getInitialImage) can come in and call toData while other threads are modifying this
    * object, the striped executor will not protect toData. So any methods that modify "hash" needs
@@ -148,6 +147,18 @@ public class RedisHash extends AbstractRedisData {
     DataSerializer.writeHashMap(hash, out);
   }
 
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+    super.fromData(in, context);
+    hash = DataSerializer.readHashMap(in);
+  }
+
+  @Override
+  public int getDSFID() {
+    return REDIS_HASH_ID;
+  }
+
   private synchronized ByteArrayWrapper hashPut(ByteArrayWrapper field, ByteArrayWrapper value) {
     return hash.put(field, value);
   }
@@ -162,13 +173,6 @@ public class RedisHash extends AbstractRedisData {
   }
 
   @Override
-  public void fromData(DataInput in, DeserializationContext context)
-      throws IOException, ClassNotFoundException {
-    super.fromData(in, context);
-    hash = DataSerializer.readHashMap(in);
-  }
-
-  @Override
   protected void applyDelta(DeltaInfo deltaInfo) {
     if (deltaInfo instanceof AddsDeltaInfo) {
       AddsDeltaInfo addsDeltaInfo = (AddsDeltaInfo) deltaInfo;
@@ -186,7 +190,7 @@ public class RedisHash extends AbstractRedisData {
     }
   }
 
-  public int hset(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public int hset(Region<RedisKey, RedisData> region, RedisKey key,
       List<ByteArrayWrapper> fieldsToSet, boolean nx) {
     int fieldsAdded = 0;
     AddsDeltaInfo deltaInfo = null;
@@ -219,7 +223,7 @@ public class RedisHash extends AbstractRedisData {
     return fieldsAdded;
   }
 
-  public int hdel(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public int hdel(Region<RedisKey, RedisData> region, RedisKey key,
       List<ByteArrayWrapper> fieldsToRemove) {
     int fieldsRemoved = 0;
     RemsDeltaInfo deltaInfo = null;
@@ -378,7 +382,6 @@ public class RedisHash extends AbstractRedisData {
     return keySnapShot;
   }
 
-
   @SuppressWarnings("unchecked")
   private List<ByteArrayWrapper> createKeySnapShot(UUID clientID) {
 
@@ -394,8 +397,7 @@ public class RedisHash extends AbstractRedisData {
     return keySnapShot;
   }
 
-
-  public long hincrby(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public long hincrby(Region<RedisKey, RedisData> region, RedisKey key,
       ByteArrayWrapper field, long increment)
       throws NumberFormatException, ArithmeticException {
     ByteArrayWrapper oldValue = hash.get(field);
@@ -431,7 +433,7 @@ public class RedisHash extends AbstractRedisData {
     return value;
   }
 
-  public BigDecimal hincrbyfloat(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public BigDecimal hincrbyfloat(Region<RedisKey, RedisData> region, RedisKey key,
       ByteArrayWrapper field, BigDecimal increment)
       throws NumberFormatException {
     ByteArrayWrapper oldValue = hash.get(field);
@@ -504,11 +506,6 @@ public class RedisHash extends AbstractRedisData {
   }
 
   @Override
-  public int getDSFID() {
-    return REDIS_HASH_ID;
-  }
-
-  @Override
   public KnownVersion[] getSerializationVersions() {
     return null;
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
index 45c3847..621ac7d 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHashCommandsFunctionExecutor.java
@@ -34,66 +34,66 @@ public class RedisHashCommandsFunctionExecutor extends RedisDataCommandsFunction
     super(helper);
   }
 
-  private RedisHash getRedisHash(ByteArrayWrapper key, boolean updateStats) {
+  private RedisHash getRedisHash(RedisKey key, boolean updateStats) {
     return helper.getRedisHash(key, updateStats);
   }
 
   @Override
-  public int hset(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToSet, boolean NX) {
+  public int hset(RedisKey key, List<ByteArrayWrapper> fieldsToSet, boolean NX) {
     return stripedExecute(key,
         () -> getRedisHash(key, false)
             .hset(getRegion(), key, fieldsToSet, NX));
   }
 
   @Override
-  public int hdel(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToRemove) {
+  public int hdel(RedisKey key, List<ByteArrayWrapper> fieldsToRemove) {
     return stripedExecute(key,
         () -> getRedisHash(key, false)
             .hdel(getRegion(), key, fieldsToRemove));
   }
 
   @Override
-  public Collection<ByteArrayWrapper> hgetall(ByteArrayWrapper key) {
+  public Collection<ByteArrayWrapper> hgetall(RedisKey key) {
     return stripedExecute(key, () -> getRedisHash(key, true).hgetall());
   }
 
   @Override
-  public int hexists(ByteArrayWrapper key, ByteArrayWrapper field) {
+  public int hexists(RedisKey key, ByteArrayWrapper field) {
     return stripedExecute(key, () -> getRedisHash(key, true).hexists(field));
   }
 
   @Override
-  public ByteArrayWrapper hget(ByteArrayWrapper key, ByteArrayWrapper field) {
+  public ByteArrayWrapper hget(RedisKey key, ByteArrayWrapper field) {
     return stripedExecute(key, () -> getRedisHash(key, true).hget(field));
   }
 
   @Override
-  public int hlen(ByteArrayWrapper key) {
+  public int hlen(RedisKey key) {
     return stripedExecute(key, () -> getRedisHash(key, true).hlen());
   }
 
   @Override
-  public int hstrlen(ByteArrayWrapper key, ByteArrayWrapper field) {
+  public int hstrlen(RedisKey key, ByteArrayWrapper field) {
     return stripedExecute(key, () -> getRedisHash(key, true).hstrlen(field));
   }
 
   @Override
-  public List<ByteArrayWrapper> hmget(ByteArrayWrapper key, List<ByteArrayWrapper> fields) {
+  public List<ByteArrayWrapper> hmget(RedisKey key, List<ByteArrayWrapper> fields) {
     return stripedExecute(key, () -> getRedisHash(key, true).hmget(fields));
   }
 
   @Override
-  public Collection<ByteArrayWrapper> hvals(ByteArrayWrapper key) {
+  public Collection<ByteArrayWrapper> hvals(RedisKey key) {
     return stripedExecute(key, () -> getRedisHash(key, true).hvals());
   }
 
   @Override
-  public Collection<ByteArrayWrapper> hkeys(ByteArrayWrapper key) {
+  public Collection<ByteArrayWrapper> hkeys(RedisKey key) {
     return stripedExecute(key, () -> getRedisHash(key, true).hkeys());
   }
 
   @Override
-  public Pair<Integer, List<Object>> hscan(ByteArrayWrapper key, Pattern matchPattern,
+  public Pair<Integer, List<Object>> hscan(RedisKey key, Pattern matchPattern,
       int count, int cursor, UUID clientID) {
     return stripedExecute(key,
         () -> getRedisHash(key, true)
@@ -101,15 +101,14 @@ public class RedisHashCommandsFunctionExecutor extends RedisDataCommandsFunction
   }
 
   @Override
-  public long hincrby(ByteArrayWrapper key, ByteArrayWrapper field, long increment) {
+  public long hincrby(RedisKey key, ByteArrayWrapper field, long increment) {
     return stripedExecute(key,
         () -> getRedisHash(key, false)
             .hincrby(getRegion(), key, field, increment));
   }
 
   @Override
-  public BigDecimal hincrbyfloat(ByteArrayWrapper key, ByteArrayWrapper field,
-      BigDecimal increment) {
+  public BigDecimal hincrbyfloat(RedisKey key, ByteArrayWrapper field, BigDecimal increment) {
     return stripedExecute(key,
         () -> getRedisHash(key, false)
             .hincrbyfloat(getRegion(), key, field, increment));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKey.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKey.java
new file mode 100644
index 0000000..95c6d83
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKey.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.data;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.internal.serialization.DataSerializableFixedID;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.redis.internal.executor.cluster.CRC16;
+import org.apache.geode.redis.internal.executor.cluster.RedisPartitionResolver;
+
+public class RedisKey extends ByteArrayWrapper implements DataSerializableFixedID {
+
+  private int crc16;
+
+  public RedisKey() {}
+
+  public RedisKey(byte[] value) {
+    super(value);
+
+    int startHashtag = Integer.MAX_VALUE;
+    int endHashtag = 0;
+
+    for (int i = 0; i < value.length; i++) {
+      if (value[i] == '{' && startHashtag == Integer.MAX_VALUE) {
+        startHashtag = i;
+      } else if (value[i] == '}') {
+        endHashtag = i;
+        break;
+      }
+    }
+
+    if (endHashtag - startHashtag <= 1) {
+      startHashtag = -1;
+      endHashtag = value.length;
+    }
+
+    crc16 = CRC16.calculate(value, startHashtag + 1, endHashtag);
+  }
+
+  @Override
+  public int getDSFID() {
+    return DataSerializableFixedID.REDIS_KEY;
+  }
+
+  @Override
+  public void toData(DataOutput out, SerializationContext context) throws IOException {
+    out.writeShort(crc16);
+    super.toData(out, context);
+  }
+
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+    // Need to convert a signed short to unsigned
+    crc16 = in.readShort() & 0xffff;
+    super.fromData(in, context);
+  }
+
+  @Override
+  public KnownVersion[] getSerializationVersions() {
+    return null;
+  }
+
+  /**
+   * Used by the {@link RedisPartitionResolver} to map slots to buckets
+   */
+  public int getCrc16() {
+    return crc16;
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
index 0b6f355..82dfcba 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisKeyCommandsFunctionExecutor.java
@@ -27,12 +27,12 @@ public class RedisKeyCommandsFunctionExecutor extends RedisDataCommandsFunctionE
   }
 
   @Override
-  public boolean del(ByteArrayWrapper key) {
+  public boolean del(RedisKey key) {
     return stripedExecute(key, () -> getRegion().remove(key) != null);
   }
 
   @Override
-  public boolean exists(ByteArrayWrapper key) {
+  public boolean exists(RedisKey key) {
     boolean keyExists = stripedExecute(key, () -> getRedisData(key).exists());
 
     if (keyExists) {
@@ -45,7 +45,7 @@ public class RedisKeyCommandsFunctionExecutor extends RedisDataCommandsFunctionE
   }
 
   @Override
-  public long pttl(ByteArrayWrapper key) {
+  public long pttl(RedisKey key) {
     long result = stripedExecute(key, () -> getRedisData(key).pttl(getRegion(), key));
 
     if (result == -2) {
@@ -58,23 +58,23 @@ public class RedisKeyCommandsFunctionExecutor extends RedisDataCommandsFunctionE
   }
 
   @Override
-  public long internalPttl(ByteArrayWrapper key) {
+  public long internalPttl(RedisKey key) {
     return stripedExecute(key, () -> getRedisData(key).pttl(getRegion(), key));
   }
 
   @Override
-  public int pexpireat(ByteArrayWrapper key, long timestamp) {
+  public int pexpireat(RedisKey key, long timestamp) {
     return stripedExecute(key,
         () -> getRedisData(key).pexpireat(helper, key, timestamp));
   }
 
   @Override
-  public int persist(ByteArrayWrapper key) {
+  public int persist(RedisKey key) {
     return stripedExecute(key, () -> getRedisData(key).persist(getRegion(), key));
   }
 
   @Override
-  public String type(ByteArrayWrapper key) {
+  public String type(RedisKey key) {
     String type = stripedExecute(key, () -> getRedisData(key).type());
 
     if (type.equalsIgnoreCase("none")) {
@@ -87,12 +87,12 @@ public class RedisKeyCommandsFunctionExecutor extends RedisDataCommandsFunctionE
   }
 
   @Override
-  public String internalType(ByteArrayWrapper key) {
+  public String internalType(RedisKey key) {
     return stripedExecute(key, () -> getRedisData(key).type());
   }
 
   @Override
-  public boolean rename(ByteArrayWrapper oldKey, ByteArrayWrapper newKey) {
+  public boolean rename(RedisKey oldKey, RedisKey newKey) {
     // caller has already done all the stripedExecutor locking
     return getRedisData(oldKey).rename(getRegion(), oldKey, newKey);
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
index f16e11e..a7844d5 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
@@ -99,8 +99,8 @@ public class RedisSet extends AbstractRedisData {
     return scanResult;
   }
 
-  Collection<ByteArrayWrapper> spop(Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key, int popCount) {
+  Collection<ByteArrayWrapper> spop(Region<RedisKey, RedisData> region,
+      RedisKey key, int popCount) {
     int originalSize = scard();
     if (originalSize == 0) {
       return emptyList();
@@ -238,8 +238,8 @@ public class RedisSet extends AbstractRedisData {
    * @param key the name of the set to add to
    * @return the number of members actually added
    */
-  long sadd(ArrayList<ByteArrayWrapper> membersToAdd, Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key) {
+  long sadd(ArrayList<ByteArrayWrapper> membersToAdd, Region<RedisKey, RedisData> region,
+      RedisKey key) {
 
     membersToAdd.removeIf(memberToAdd -> !membersAdd(memberToAdd));
     int membersAdded = membersToAdd.size();
@@ -256,8 +256,8 @@ public class RedisSet extends AbstractRedisData {
    * @param key the name of the set to remove from
    * @return the number of members actually removed
    */
-  long srem(ArrayList<ByteArrayWrapper> membersToRemove, Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key) {
+  long srem(ArrayList<ByteArrayWrapper> membersToRemove, Region<RedisKey, RedisData> region,
+      RedisKey key) {
 
     membersToRemove.removeIf(memberToRemove -> !membersRemove(memberToRemove));
     int membersRemoved = membersToRemove.size();
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSetCommandsFunctionExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSetCommandsFunctionExecutor.java
index 01ba706..65b7dcf 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSetCommandsFunctionExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSetCommandsFunctionExecutor.java
@@ -37,86 +37,73 @@ public class RedisSetCommandsFunctionExecutor extends RedisDataCommandsFunctionE
     super(helper);
   }
 
-  private RedisSet getRedisSet(ByteArrayWrapper key, boolean updateStats) {
+  private RedisSet getRedisSet(RedisKey key, boolean updateStats) {
     return helper.getRedisSet(key, updateStats);
   }
 
   @Override
-  public long sadd(
-      ByteArrayWrapper key,
-      ArrayList<ByteArrayWrapper> membersToAdd) {
+  public long sadd(RedisKey key, ArrayList<ByteArrayWrapper> membersToAdd) {
     return stripedExecute(key,
         () -> getRedisSet(key, false)
             .sadd(membersToAdd,
                 getRegion(), key));
   }
 
-
-
   @Override
-  public int sunionstore(ByteArrayWrapper destination,
-      ArrayList<ByteArrayWrapper> setKeys) {
+  public int sunionstore(RedisKey destination, ArrayList<RedisKey> setKeys) {
     return NULL_REDIS_SET.sunionstore(helper, destination, setKeys);
   }
 
   @Override
-  public int sinterstore(ByteArrayWrapper destination,
-      ArrayList<ByteArrayWrapper> setKeys) {
+  public int sinterstore(RedisKey destination, ArrayList<RedisKey> setKeys) {
     return NULL_REDIS_SET.sinterstore(helper, destination, setKeys);
   }
 
   @Override
-  public int sdiffstore(ByteArrayWrapper destination,
-      ArrayList<ByteArrayWrapper> setKeys) {
+  public int sdiffstore(RedisKey destination, ArrayList<RedisKey> setKeys) {
     return NULL_REDIS_SET.sdiffstore(helper, destination, setKeys);
   }
 
   @Override
-  public long srem(
-      ByteArrayWrapper key,
-      ArrayList<ByteArrayWrapper> membersToRemove) {
+  public long srem(RedisKey key, ArrayList<ByteArrayWrapper> membersToRemove) {
     return stripedExecute(key, () -> getRedisSet(key, false).srem(membersToRemove,
         getRegion(), key));
   }
 
   @Override
-  public Set<ByteArrayWrapper> smembers(
-      ByteArrayWrapper key) {
+  public Set<ByteArrayWrapper> smembers(RedisKey key) {
     return stripedExecute(key, () -> getRedisSet(key, true).smembers());
   }
 
   @Override
-  public Set<ByteArrayWrapper> internalsmembers(
-      ByteArrayWrapper key) {
+  public Set<ByteArrayWrapper> internalsmembers(RedisKey key) {
     return stripedExecute(key, () -> getRedisSet(key, false).smembers());
   }
 
   @Override
-  public int scard(ByteArrayWrapper key) {
+  public int scard(RedisKey key) {
     return stripedExecute(key, () -> getRedisSet(key, true).scard());
   }
 
   @Override
-  public boolean sismember(
-      ByteArrayWrapper key, ByteArrayWrapper member) {
+  public boolean sismember(RedisKey key, ByteArrayWrapper member) {
     return stripedExecute(key, () -> getRedisSet(key, true).sismember(member));
   }
 
   @Override
-  public Collection<ByteArrayWrapper> srandmember(
-      ByteArrayWrapper key, int count) {
+  public Collection<ByteArrayWrapper> srandmember(RedisKey key, int count) {
     return stripedExecute(key, () -> getRedisSet(key, true).srandmember(count));
   }
 
   @Override
-  public Collection<ByteArrayWrapper> spop(
-      ByteArrayWrapper key, int popCount) {
+  public Collection<ByteArrayWrapper> spop(RedisKey key, int popCount) {
     return stripedExecute(key, () -> getRedisSet(key, false)
         .spop(getRegion(), key, popCount));
   }
 
   @Override
-  public Pair<BigInteger, List<Object>> sscan(ByteArrayWrapper key, Pattern matchPattern, int count,
+  public Pair<BigInteger, List<Object>> sscan(RedisKey key, Pattern matchPattern,
+      int count,
       BigInteger cursor) {
     return stripedExecute(key, () -> getRedisSet(key, true).sscan(matchPattern, count, cursor));
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java
index 07c3c05..f10fc36 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java
@@ -55,16 +55,15 @@ public class RedisString extends AbstractRedisData {
     valueSet(value);
   }
 
-  public int append(ByteArrayWrapper appendValue,
-      Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key) {
+  public int append(ByteArrayWrapper appendValue, Region<RedisKey, RedisData> region,
+      RedisKey key) {
     valueAppend(appendValue.toBytes());
     appendSequence++;
     storeChanges(region, key, new AppendDeltaInfo(appendValue.toBytes(), appendSequence));
     return value.length();
   }
 
-  public long incr(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key)
+  public long incr(Region<RedisKey, RedisData> region, RedisKey key)
       throws NumberFormatException, ArithmeticException {
     long longValue = parseValueAsLong();
     if (longValue == Long.MAX_VALUE) {
@@ -77,8 +76,7 @@ public class RedisString extends AbstractRedisData {
     return longValue;
   }
 
-  public long incrby(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
-      long increment)
+  public long incrby(Region<RedisKey, RedisData> region, RedisKey key, long increment)
       throws NumberFormatException, ArithmeticException {
     long longValue = parseValueAsLong();
     if (longValue >= 0 && increment > (Long.MAX_VALUE - longValue)) {
@@ -91,7 +89,7 @@ public class RedisString extends AbstractRedisData {
     return longValue;
   }
 
-  public BigDecimal incrbyfloat(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public BigDecimal incrbyfloat(Region<RedisKey, RedisData> region, RedisKey key,
       BigDecimal increment)
       throws NumberFormatException, ArithmeticException {
     BigDecimal bigDecimalValue = parseValueAsBigDecimal();
@@ -103,8 +101,7 @@ public class RedisString extends AbstractRedisData {
     return bigDecimalValue;
   }
 
-  public long decrby(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
-      long decrement) {
+  public long decrby(Region<RedisKey, RedisData> region, RedisKey key, long decrement) {
     long longValue = parseValueAsLong();
     if (longValue <= 0 && -decrement < (Long.MIN_VALUE - longValue)) {
       throw new ArithmeticException(RedisConstants.ERROR_OVERFLOW);
@@ -116,7 +113,7 @@ public class RedisString extends AbstractRedisData {
     return longValue;
   }
 
-  public long decr(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key)
+  public long decr(Region<RedisKey, RedisData> region, RedisKey key)
       throws NumberFormatException, ArithmeticException {
     long longValue = parseValueAsLong();
     if (longValue == Long.MIN_VALUE) {
@@ -170,7 +167,7 @@ public class RedisString extends AbstractRedisData {
     return new ByteArrayWrapper(returnRange);
   }
 
-  public int setrange(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, int offset,
+  public int setrange(Region<RedisKey, RedisData> region, RedisKey key, int offset,
       byte[] valueToAdd) {
     if (valueToAdd.length == 0) {
       return value.length();
@@ -205,7 +202,7 @@ public class RedisString extends AbstractRedisData {
     }
   }
 
-  public int bitpos(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key, int bit,
+  public int bitpos(Region<RedisKey, RedisData> region, RedisKey key, int bit,
       int start, Integer end) {
     int length = value.length();
     if (length == 0) {
@@ -577,9 +574,8 @@ public class RedisString extends AbstractRedisData {
     return (value.toBytes()[byteIndex] & (0x80 >> offset)) >> (7 - offset);
   }
 
-  public int setbit(
-      Region<ByteArrayWrapper, RedisData> region,
-      ByteArrayWrapper key, int bitValue, int byteIndex, byte bitIndex) {
+  public int setbit(Region<RedisKey, RedisData> region, RedisKey key,
+      int bitValue, int byteIndex, byte bitIndex) {
     int returnBit;
     byte[] bytes = value.toBytes();
     if (byteIndex < bytes.length) {
@@ -655,7 +651,7 @@ public class RedisString extends AbstractRedisData {
     return RedisDataType.REDIS_STRING;
   }
 
-  public ByteArrayWrapper getset(Region<ByteArrayWrapper, RedisData> region, ByteArrayWrapper key,
+  public ByteArrayWrapper getset(Region<RedisKey, RedisData> region, RedisKey key,
       ByteArrayWrapper newValue) {
     // No need to copy "value" since we are locked and will be calling set which replaces
     // "value" with a new instance.
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor.java
index f31c132..f9e6595 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/data/RedisStringCommandsFunctionExecutor.java
@@ -32,119 +32,119 @@ public class RedisStringCommandsFunctionExecutor extends RedisDataCommandsFuncti
     super(helper);
   }
 
-  private RedisString getRedisString(ByteArrayWrapper key, boolean updateStats) {
+  private RedisString getRedisString(RedisKey key, boolean updateStats) {
     return helper.getRedisString(key, updateStats);
   }
 
-  private RedisString getRedisStringIgnoringType(ByteArrayWrapper key, boolean updateStats) {
+  private RedisString getRedisStringIgnoringType(RedisKey key, boolean updateStats) {
     return helper.getRedisStringIgnoringType(key, updateStats);
   }
 
   @Override
-  public long append(ByteArrayWrapper key, ByteArrayWrapper valueToAppend) {
+  public long append(RedisKey key, ByteArrayWrapper valueToAppend) {
     return stripedExecute(key,
         () -> getRedisString(key, false)
             .append(valueToAppend, getRegion(), key));
   }
 
   @Override
-  public ByteArrayWrapper get(ByteArrayWrapper key) {
+  public ByteArrayWrapper get(RedisKey key) {
     return stripedExecute(key, () -> getRedisString(key, true).get());
   }
 
   @Override
-  public ByteArrayWrapper mget(ByteArrayWrapper key) {
+  public ByteArrayWrapper mget(RedisKey key) {
     return stripedExecute(key, () -> getRedisStringIgnoringType(key, true).get());
   }
 
   @Override
-  public boolean set(ByteArrayWrapper key, ByteArrayWrapper value, SetOptions options) {
+  public boolean set(RedisKey key, ByteArrayWrapper value, SetOptions options) {
     return stripedExecute(key, () -> NULL_REDIS_STRING
         .set(helper, key, value, options));
   }
 
   @Override
-  public long incr(ByteArrayWrapper key) {
+  public long incr(RedisKey key) {
     return stripedExecute(key, () -> getRedisString(key, false).incr(getRegion(), key));
   }
 
   @Override
-  public long decr(ByteArrayWrapper key) {
+  public long decr(RedisKey key) {
     return stripedExecute(key, () -> getRedisString(key, false).decr(getRegion(), key));
   }
 
   @Override
-  public ByteArrayWrapper getset(ByteArrayWrapper key, ByteArrayWrapper value) {
+  public ByteArrayWrapper getset(RedisKey key, ByteArrayWrapper value) {
     return stripedExecute(key,
         () -> getRedisString(key, true).getset(getRegion(), key, value));
   }
 
   @Override
-  public long incrby(ByteArrayWrapper key, long increment) {
+  public long incrby(RedisKey key, long increment) {
     return stripedExecute(key,
         () -> getRedisString(key, false).incrby(getRegion(), key, increment));
   }
 
   @Override
-  public BigDecimal incrbyfloat(ByteArrayWrapper key, BigDecimal increment) {
+  public BigDecimal incrbyfloat(RedisKey key, BigDecimal increment) {
     return stripedExecute(key,
         () -> getRedisString(key, false)
             .incrbyfloat(getRegion(), key, increment));
   }
 
   @Override
-  public int bitop(String operation, ByteArrayWrapper key,
-      List<ByteArrayWrapper> sources) {
+  public int bitop(String operation, RedisKey key,
+      List<RedisKey> sources) {
     return NULL_REDIS_STRING.bitop(helper, operation, key, sources);
   }
 
   @Override
-  public long decrby(ByteArrayWrapper key, long decrement) {
+  public long decrby(RedisKey key, long decrement) {
     return stripedExecute(key,
         () -> getRedisString(key, false).decrby(getRegion(), key, decrement));
   }
 
   @Override
-  public ByteArrayWrapper getrange(ByteArrayWrapper key, long start, long end) {
+  public ByteArrayWrapper getrange(RedisKey key, long start, long end) {
     return stripedExecute(key, () -> getRedisString(key, true).getrange(start, end));
   }
 
   @Override
-  public int setrange(ByteArrayWrapper key, int offset, byte[] value) {
+  public int setrange(RedisKey key, int offset, byte[] value) {
     return stripedExecute(key,
         () -> getRedisString(key, false)
             .setrange(getRegion(), key, offset, value));
   }
 
   @Override
-  public int bitpos(ByteArrayWrapper key, int bit, int start, Integer end) {
+  public int bitpos(RedisKey key, int bit, int start, Integer end) {
     return stripedExecute(key,
         () -> getRedisString(key, true)
             .bitpos(getRegion(), key, bit, start, end));
   }
 
   @Override
-  public long bitcount(ByteArrayWrapper key, int start, int end) {
+  public long bitcount(RedisKey key, int start, int end) {
     return stripedExecute(key, () -> getRedisString(key, true).bitcount(start, end));
   }
 
   @Override
-  public long bitcount(ByteArrayWrapper key) {
+  public long bitcount(RedisKey key) {
     return stripedExecute(key, () -> getRedisString(key, true).bitcount());
   }
 
   @Override
-  public int strlen(ByteArrayWrapper key) {
+  public int strlen(RedisKey key) {
     return stripedExecute(key, () -> getRedisString(key, true).strlen());
   }
 
   @Override
-  public int getbit(ByteArrayWrapper key, int offset) {
+  public int getbit(RedisKey key, int offset) {
     return stripedExecute(key, () -> getRedisString(key, true).getbit(offset));
   }
 
   @Override
-  public int setbit(ByteArrayWrapper key, long offset, int value) {
+  public int setbit(RedisKey key, long offset, int value) {
     int byteIndex = (int) (offset / 8);
     byte bitIndex = (byte) (offset % 8);
     return stripedExecute(key,
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
index 01bf0ac..3ffa413 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/AbstractExecutor.java
@@ -18,8 +18,8 @@ import java.util.Collection;
 
 import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.GeodeRedisServer;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommands;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommandsFunctionInvoker;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -41,7 +41,7 @@ public abstract class AbstractExecutor implements Executor {
     return new RedisKeyCommandsFunctionInvoker(context.getRegionProvider().getDataRegion());
   }
 
-  protected Region<ByteArrayWrapper, RedisData> getDataRegion(ExecutionHandlerContext context) {
+  protected Region<RedisKey, RedisData> getDataRegion(ExecutionHandlerContext context) {
     return context.getRegionProvider().getDataRegion();
   }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
index c19b6eb..3588d58 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/CommandFunction.java
@@ -31,6 +31,7 @@ import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.CommandHelper;
 import org.apache.geode.redis.internal.data.RedisData;
 import org.apache.geode.redis.internal.data.RedisHashCommandsFunctionExecutor;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.data.RedisKeyCommandsFunctionExecutor;
 import org.apache.geode.redis.internal.data.RedisSetCommandsFunctionExecutor;
 import org.apache.geode.redis.internal.data.RedisStringCommandsFunctionExecutor;
@@ -47,7 +48,7 @@ public class CommandFunction extends SingleResultRedisFunction {
   private final transient RedisSetCommandsFunctionExecutor setCommands;
   private final transient RedisStringCommandsFunctionExecutor stringCommands;
 
-  public static void register(Region<ByteArrayWrapper, RedisData> dataRegion,
+  public static void register(Region<RedisKey, RedisData> dataRegion,
       StripedExecutor stripedExecutor,
       RedisStats redisStats) {
     FunctionService.registerFunction(new CommandFunction(dataRegion, stripedExecutor, redisStats));
@@ -66,7 +67,7 @@ public class CommandFunction extends SingleResultRedisFunction {
     return result;
   }
 
-  public CommandFunction(Region<ByteArrayWrapper, RedisData> dataRegion,
+  public CommandFunction(Region<RedisKey, RedisData> dataRegion,
       StripedExecutor stripedExecutor,
       RedisStats redisStats) {
     super(dataRegion);
@@ -84,7 +85,7 @@ public class CommandFunction extends SingleResultRedisFunction {
 
   @Override
   @SuppressWarnings("unchecked")
-  protected Object compute(ByteArrayWrapper key, Object[] args) {
+  protected Object compute(RedisKey key, Object[] args) {
     RedisCommandType command = (RedisCommandType) args[0];
     switch (command) {
       case DEL:
@@ -160,7 +161,7 @@ public class CommandFunction extends SingleResultRedisFunction {
       }
       case BITOP: {
         String operation = (String) args[1];
-        List<ByteArrayWrapper> sources = (List<ByteArrayWrapper>) args[2];
+        List<RedisKey> sources = (List<RedisKey>) args[2];
         return stringCommands.bitop(operation, key, sources);
       }
       case INCR:
@@ -212,15 +213,15 @@ public class CommandFunction extends SingleResultRedisFunction {
         return setCommands.sscan(key, matchPattern, count, cursor);
       }
       case SUNIONSTORE: {
-        ArrayList<ByteArrayWrapper> setKeys = (ArrayList<ByteArrayWrapper>) args[1];
+        ArrayList<RedisKey> setKeys = (ArrayList<RedisKey>) args[1];
         return setCommands.sunionstore(key, setKeys);
       }
       case SINTERSTORE: {
-        ArrayList<ByteArrayWrapper> setKeys = (ArrayList<ByteArrayWrapper>) args[1];
+        ArrayList<RedisKey> setKeys = (ArrayList<RedisKey>) args[1];
         return setCommands.sinterstore(key, setKeys);
       }
       case SDIFFSTORE: {
-        ArrayList<ByteArrayWrapper> setKeys = (ArrayList<ByteArrayWrapper>) args[1];
+        ArrayList<RedisKey> setKeys = (ArrayList<RedisKey>) args[1];
         return setCommands.sdiffstore(key, setKeys);
       }
       case HSET: {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisCommandsFunctionInvoker.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisCommandsFunctionInvoker.java
index 2866532..33f1355 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisCommandsFunctionInvoker.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisCommandsFunctionInvoker.java
@@ -22,13 +22,13 @@ import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.internal.cache.PrimaryBucketLockException;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 
 public abstract class RedisCommandsFunctionInvoker {
-  protected final Region<ByteArrayWrapper, RedisData> region;
+  protected final Region<RedisKey, RedisData> region;
 
-  protected RedisCommandsFunctionInvoker(Region<ByteArrayWrapper, RedisData> region) {
+  protected RedisCommandsFunctionInvoker(Region<RedisKey, RedisData> region) {
     this.region = region;
   }
 
@@ -67,8 +67,7 @@ public abstract class RedisCommandsFunctionInvoker {
     } while (true);
   }
 
-  protected <T> T invokeCommandFunction(ByteArrayWrapper key,
-      Object... arguments) {
+  protected <T> T invokeCommandFunction(RedisKey key, Object... arguments) {
     return invoke(CommandFunction.ID, key, arguments);
   }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
index fc18392..ed1ae1a 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/SingleResultRedisFunction.java
@@ -20,28 +20,26 @@ import org.apache.geode.cache.execute.FunctionContext;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 
 public abstract class SingleResultRedisFunction implements InternalFunction<Object[]> {
 
   private static final long serialVersionUID = 3239452234149879302L;
   private final transient PartitionedRegion partitionedRegion;
 
-  public SingleResultRedisFunction(Region<ByteArrayWrapper, RedisData> dataRegion) {
+  public SingleResultRedisFunction(Region<RedisKey, RedisData> dataRegion) {
     this.partitionedRegion = (PartitionedRegion) dataRegion;
   }
 
-  protected abstract Object compute(ByteArrayWrapper key, Object[] args);
+  protected abstract Object compute(RedisKey key, Object[] args);
 
   @Override
   public void execute(FunctionContext<Object[]> context) {
 
-    RegionFunctionContextImpl regionFunctionContext =
-        (RegionFunctionContextImpl) context;
+    RegionFunctionContextImpl regionFunctionContext = (RegionFunctionContextImpl) context;
 
-    ByteArrayWrapper key =
-        (ByteArrayWrapper) regionFunctionContext.getFilter().iterator().next();
+    RedisKey key = (RedisKey) regionFunctionContext.getFilter().iterator().next();
 
     Object[] args = context.getArguments();
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
index 2745aee..98f3b79 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/CRC16.java
@@ -59,6 +59,11 @@ public class CRC16 {
     return crc;
   }
 
+  public static int calculate(String data) {
+    byte[] bytes = data.getBytes();
+    return calculate(bytes, 0, bytes.length);
+  }
+
   // Reverses the bits of a 16 bit integer.
   private static int reverseInt16(int i) {
     i = (i & 0x5555) << 1 | (i >>> 1) & 0x5555;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
index 3e9c211..38f2034 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/cluster/RedisPartitionResolver.java
@@ -15,16 +15,20 @@
 
 package org.apache.geode.redis.internal.executor.cluster;
 
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
+import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
+
 import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.PartitionResolver;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 
-public class RedisPartitionResolver implements PartitionResolver<ByteArrayWrapper, RedisData> {
+public class RedisPartitionResolver implements PartitionResolver<RedisKey, RedisData> {
 
   @Override
-  public Object getRoutingObject(EntryOperation<ByteArrayWrapper, RedisData> opDetails) {
-    return opDetails.getKey().getRoutingId();
+  public Object getRoutingObject(EntryOperation<RedisKey, RedisData> opDetails) {
+    // & (REDIS_SLOTS - 1) is equivalent to % REDIS_SLOTS but supposedly faster
+    return opDetails.getKey().getCrc16() & (REDIS_SLOTS - 1) / REDIS_SLOTS_PER_BUCKET;
   }
 
   @Override
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java
index 6064221..e259b47 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HDelExecutor.java
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -46,7 +47,7 @@ public class HDelExecutor extends HashExecutor {
       ExecutionHandlerContext context) {
     List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers();
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     ArrayList<ByteArrayWrapper> fieldsToDelete =
         new ArrayList<>(commandElems.subList(2, commandElems.size()));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java
index d0170f8..a2d6a25 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HExistsExecutor.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -46,7 +47,7 @@ public class HExistsExecutor extends HashExecutor {
 
     byte[] byteField = commandElems.get(FIELD_INDEX);
     ByteArrayWrapper field = new ByteArrayWrapper(byteField);
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
 
     return RedisResponse.integer(redisHashCommands.hexists(key, field));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java
index b6524dc..c0cc768 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetAllExecutor.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.util.Collection;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -45,7 +46,7 @@ public class HGetAllExecutor extends HashExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     Collection<ByteArrayWrapper> fieldsAndValues = redisHashCommands.hgetall(key);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java
index 413fb48..7bb5644 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HGetExecutor.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -43,7 +44,7 @@ public class HGetExecutor extends HashExecutor {
 
     byte[] byteField = commandElems.get(FIELD_INDEX);
     ByteArrayWrapper field = new ByteArrayWrapper(byteField);
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     ByteArrayWrapper valueWrapper = redisHashCommands.hget(key, field);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java
index 42ddcca..ceee123 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByExecutor.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -53,7 +54,7 @@ public class HIncrByExecutor extends HashExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     byte[] byteField = commandElems.get(FIELD_INDEX);
     ByteArrayWrapper field = new ByteArrayWrapper(byteField);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java
index 455683f..148ee41 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HIncrByFloatExecutor.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.executor.string.IncrByFloatExecutor;
 import org.apache.geode.redis.internal.netty.Command;
@@ -63,7 +64,7 @@ public class HIncrByFloatExecutor extends HashExecutor {
       return validated.getRight();
     }
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     byte[] byteField = commandElems.get(FIELD_INDEX);
     ByteArrayWrapper field = new ByteArrayWrapper(byteField);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java
index 64ef054..b54bb1f 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HKeysExecutor.java
@@ -18,6 +18,7 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.util.Collection;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -45,7 +46,7 @@ public class HKeysExecutor extends HashExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     Collection<ByteArrayWrapper> keys = redisHashCommands.hkeys(key);
     if (keys.isEmpty()) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java
index 18c19cd..21a6390 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HLenExecutor.java
@@ -15,7 +15,7 @@
 package org.apache.geode.redis.internal.executor.hash;
 
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -38,7 +38,7 @@ public class HLenExecutor extends HashExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     int len = redisHashCommands.hlen(key);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java
index 9939d60..09e3c63 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMGetExecutor.java
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -46,7 +47,7 @@ public class HMGetExecutor extends HashExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     List<ByteArrayWrapper> commandElements = command.getProcessedCommandWrappers();
     ArrayList<ByteArrayWrapper> fields =
         new ArrayList<>(commandElements.subList(2, commandElements.size()));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java
index 298b5f6..7ce7cc0 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HMSetExecutor.java
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -51,7 +52,7 @@ public class HMSetExecutor extends HashExecutor {
       ExecutionHandlerContext context) {
     List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers();
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     ArrayList<ByteArrayWrapper> fieldsToSet =
         new ArrayList<>(commandElems.subList(2, commandElems.size()));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
index 76fc5d1..9e31b35 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HScanExecutor.java
@@ -29,8 +29,8 @@ import java.util.regex.PatternSyntaxException;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.executor.key.AbstractScanExecutor;
 import org.apache.geode.redis.internal.netty.Coder;
@@ -66,7 +66,7 @@ public class HScanExecutor extends AbstractScanExecutor {
       cursor = 0;
     }
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     if (!getDataRegion(context).containsKey(key)) {
       context.getRedisStats().incKeyspaceMisses();
       return RedisResponse.emptyScan();
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java
index 9c6bb67..9823faf 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HSetExecutor.java
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -44,7 +45,7 @@ public class HSetExecutor extends HashExecutor {
       ExecutionHandlerContext context) {
     List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers();
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HStrLenExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HStrLenExecutor.java
index 4b68a0a..bdc8ba5 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HStrLenExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HStrLenExecutor.java
@@ -18,6 +18,7 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -26,7 +27,7 @@ public class HStrLenExecutor extends HashExecutor {
 
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     List<byte[]> commandElems = command.getProcessedCommand();
     byte[] byteField = commandElems.get(FIELD_INDEX);
     ByteArrayWrapper field = new ByteArrayWrapper(byteField);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java
index 939622a..0156b6c 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/HValsExecutor.java
@@ -18,6 +18,7 @@ package org.apache.geode.redis.internal.executor.hash;
 import java.util.Collection;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -51,7 +52,7 @@ public class HValsExecutor extends HashExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     RedisHashCommands redisHashCommands = createRedisHashCommands(context);
     Collection<ByteArrayWrapper> values = redisHashCommands.hvals(key);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
index 8bff977..09ea200 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommands.java
@@ -24,32 +24,33 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 
 public interface RedisHashCommands {
-  int hset(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToSet, boolean NX);
+  int hset(RedisKey key, List<ByteArrayWrapper> fieldsToSet, boolean NX);
 
-  int hdel(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToRemove);
+  int hdel(RedisKey key, List<ByteArrayWrapper> fieldsToRemove);
 
-  Collection<ByteArrayWrapper> hgetall(ByteArrayWrapper key);
+  Collection<ByteArrayWrapper> hgetall(RedisKey key);
 
-  int hexists(ByteArrayWrapper key, ByteArrayWrapper field);
+  int hexists(RedisKey key, ByteArrayWrapper field);
 
-  ByteArrayWrapper hget(ByteArrayWrapper key, ByteArrayWrapper field);
+  ByteArrayWrapper hget(RedisKey key, ByteArrayWrapper field);
 
-  int hlen(ByteArrayWrapper key);
+  int hlen(RedisKey key);
 
-  int hstrlen(ByteArrayWrapper key, ByteArrayWrapper field);
+  int hstrlen(RedisKey key, ByteArrayWrapper field);
 
-  List<ByteArrayWrapper> hmget(ByteArrayWrapper key, List<ByteArrayWrapper> fields);
+  List<ByteArrayWrapper> hmget(RedisKey key, List<ByteArrayWrapper> fields);
 
-  Collection<ByteArrayWrapper> hvals(ByteArrayWrapper key);
+  Collection<ByteArrayWrapper> hvals(RedisKey key);
 
-  Collection<ByteArrayWrapper> hkeys(ByteArrayWrapper key);
+  Collection<ByteArrayWrapper> hkeys(RedisKey key);
 
-  Pair<Integer, List<Object>> hscan(ByteArrayWrapper key, Pattern matchPattern, int count,
+  Pair<Integer, List<Object>> hscan(RedisKey key, Pattern matchPattern, int count,
       int cursor, UUID clientID);
 
-  long hincrby(ByteArrayWrapper key, ByteArrayWrapper field, long increment);
+  long hincrby(RedisKey key, ByteArrayWrapper field, long increment);
 
-  BigDecimal hincrbyfloat(ByteArrayWrapper key, ByteArrayWrapper field, BigDecimal increment);
+  BigDecimal hincrbyfloat(RedisKey key, ByteArrayWrapper field, BigDecimal increment);
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
index 7fae2f7..e8f30a0 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/hash/RedisHashCommandsFunctionInvoker.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 
 /**
@@ -50,76 +51,74 @@ import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 public class RedisHashCommandsFunctionInvoker extends RedisCommandsFunctionInvoker
     implements RedisHashCommands {
 
-  public RedisHashCommandsFunctionInvoker(Region<ByteArrayWrapper, RedisData> region) {
+  public RedisHashCommandsFunctionInvoker(Region<RedisKey, RedisData> region) {
     super(region);
   }
 
   @Override
-  public int hset(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToSet, boolean NX) {
+  public int hset(RedisKey key, List<ByteArrayWrapper> fieldsToSet, boolean NX) {
     return invokeCommandFunction(key, HSET, fieldsToSet, NX);
   }
 
   @Override
-  public int hdel(ByteArrayWrapper key, List<ByteArrayWrapper> fieldsToRemove) {
+  public int hdel(RedisKey key, List<ByteArrayWrapper> fieldsToRemove) {
     return invokeCommandFunction(key, HDEL, fieldsToRemove);
   }
 
   @Override
-  public Collection<ByteArrayWrapper> hgetall(ByteArrayWrapper key) {
+  public Collection<ByteArrayWrapper> hgetall(RedisKey key) {
     return invokeCommandFunction(key, HGETALL);
   }
 
   @Override
-  public int hexists(ByteArrayWrapper key, ByteArrayWrapper field) {
+  public int hexists(RedisKey key, ByteArrayWrapper field) {
     return invokeCommandFunction(key, HEXISTS, field);
   }
 
   @Override
-  public ByteArrayWrapper hget(ByteArrayWrapper key, ByteArrayWrapper field) {
+  public ByteArrayWrapper hget(RedisKey key, ByteArrayWrapper field) {
     return invokeCommandFunction(key, HGET, field);
   }
 
   @Override
-  public int hlen(ByteArrayWrapper key) {
+  public int hlen(RedisKey key) {
     return invokeCommandFunction(key, HLEN);
   }
 
   @Override
-  public int hstrlen(ByteArrayWrapper key, ByteArrayWrapper field) {
+  public int hstrlen(RedisKey key, ByteArrayWrapper field) {
     return invokeCommandFunction(key, HSTRLEN, field);
   }
 
   @Override
-  public List<ByteArrayWrapper> hmget(ByteArrayWrapper key,
+  public List<ByteArrayWrapper> hmget(RedisKey key,
       List<ByteArrayWrapper> fields) {
     return invokeCommandFunction(key, HMGET, fields);
   }
 
   @Override
-  public Collection<ByteArrayWrapper> hvals(ByteArrayWrapper key) {
+  public Collection<ByteArrayWrapper> hvals(RedisKey key) {
     return invokeCommandFunction(key, HVALS);
   }
 
   @Override
-  public Collection<ByteArrayWrapper> hkeys(ByteArrayWrapper key) {
+  public Collection<ByteArrayWrapper> hkeys(RedisKey key) {
     return invokeCommandFunction(key, HKEYS);
   }
 
   @Override
-  public Pair<Integer, List<Object>> hscan(ByteArrayWrapper key, Pattern matchPattern,
+  public Pair<Integer, List<Object>> hscan(RedisKey key, Pattern matchPattern,
       int count, int cursor, UUID clientID) {
-
     return invokeCommandFunction(key, HSCAN, matchPattern, count, cursor, clientID);
   }
 
   @Override
-  public long hincrby(ByteArrayWrapper key, ByteArrayWrapper field, long increment) {
+  public long hincrby(RedisKey key, ByteArrayWrapper field, long increment) {
     return invokeCommandFunction(key, HINCRBY, field, increment);
   }
 
   @Override
-  public BigDecimal hincrbyfloat(ByteArrayWrapper key, ByteArrayWrapper field,
-      BigDecimal increment) {
+  public BigDecimal hincrbyfloat(RedisKey key, ByteArrayWrapper field, BigDecimal increment) {
     return invokeCommandFunction(key, HINCRBYFLOAT, field, increment);
   }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/DelExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/DelExecutor.java
index b234f1d..7867af4 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/DelExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/DelExecutor.java
@@ -17,7 +17,7 @@ package org.apache.geode.redis.internal.executor.key;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
@@ -28,7 +28,7 @@ public class DelExecutor extends AbstractExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers();
+    List<RedisKey> commandElems = command.getProcessedCommandWrapperKeys();
 
     long numRemoved = commandElems
         .subList(1, commandElems.size())
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExistsExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExistsExecutor.java
index 375991f..711c95c 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExistsExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExistsExecutor.java
@@ -17,7 +17,7 @@ package org.apache.geode.redis.internal.executor.key;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
@@ -28,7 +28,7 @@ public class ExistsExecutor extends AbstractExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers();
+    List<RedisKey> commandElems = command.getProcessedCommandWrapperKeys();
     RedisKeyCommands redisKeyCommands = getRedisKeyCommands(context);
 
     long existsCount = commandElems
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireAtExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireAtExecutor.java
index b9aee36..707ee15 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireAtExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireAtExecutor.java
@@ -20,7 +20,7 @@ import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
@@ -35,7 +35,7 @@ public class ExpireAtExecutor extends AbstractExecutor {
     List<byte[]> commandElems = command.getProcessedCommand();
     int TIMESTAMP_INDEX = 2;
 
-    ByteArrayWrapper wKey = command.getKey();
+    RedisKey wKey = command.getKey();
 
     byte[] timestampByteArray = commandElems.get(TIMESTAMP_INDEX);
     long timestamp;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireExecutor.java
index 1d472d5..96284f8 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ExpireExecutor.java
@@ -20,7 +20,7 @@ import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
@@ -35,7 +35,7 @@ public class ExpireExecutor extends AbstractExecutor {
     List<byte[]> commandElems = command.getProcessedCommand();
     int SECONDS_INDEX = 2;
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     byte[] delayByteArray = commandElems.get(SECONDS_INDEX);
     long delay;
     try {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/KeysExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/KeysExecutor.java
index 9556055..fcb2a06 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/KeysExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/KeysExecutor.java
@@ -25,6 +25,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.GlobPattern;
 import org.apache.geode.redis.internal.executor.RedisResponse;
@@ -40,7 +41,7 @@ public class KeysExecutor extends AbstractExecutor {
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
     String glob = Coder.bytesToString(commandElems.get(1));
-    Set<ByteArrayWrapper> allKeys = getDataRegion(context).keySet();
+    Set<RedisKey> allKeys = getDataRegion(context).keySet();
     List<ByteArrayWrapper> matchingKeys = new ArrayList<>();
 
     Pattern pattern;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/PersistExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/PersistExecutor.java
index b9452f9..00784ec 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/PersistExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/PersistExecutor.java
@@ -16,7 +16,7 @@
 package org.apache.geode.redis.internal.executor.key;
 
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
@@ -26,7 +26,7 @@ public class PersistExecutor extends AbstractExecutor {
 
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     RedisKeyCommands redisKeyCommands = new RedisKeyCommandsFunctionInvoker(
         context.getRegionProvider().getDataRegion());
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommands.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommands.java
index ef42028..0a1bc67 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommands.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommands.java
@@ -16,24 +16,24 @@
 
 package org.apache.geode.redis.internal.executor.key;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 
 public interface RedisKeyCommands {
-  boolean del(ByteArrayWrapper key);
+  boolean del(RedisKey key);
 
-  boolean exists(ByteArrayWrapper key);
+  boolean exists(RedisKey key);
 
-  boolean rename(ByteArrayWrapper oldKey, ByteArrayWrapper newKey);
+  boolean rename(RedisKey oldKey, RedisKey newKey);
 
-  long pttl(ByteArrayWrapper key);
+  long pttl(RedisKey key);
 
-  long internalPttl(ByteArrayWrapper key);
+  long internalPttl(RedisKey key);
 
-  int pexpireat(ByteArrayWrapper key, long timestamp);
+  int pexpireat(RedisKey key, long timestamp);
 
-  int persist(ByteArrayWrapper key);
+  int persist(RedisKey key);
 
-  String type(ByteArrayWrapper key);
+  String type(RedisKey key);
 
-  String internalType(ByteArrayWrapper key);
+  String internalType(RedisKey key);
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
index cc7fc2f..c30da4f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RedisKeyCommandsFunctionInvoker.java
@@ -30,6 +30,7 @@ import java.util.List;
 import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 
 /**
@@ -39,52 +40,52 @@ import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 public class RedisKeyCommandsFunctionInvoker extends RedisCommandsFunctionInvoker
     implements RedisKeyCommands {
   public RedisKeyCommandsFunctionInvoker(
-      Region<ByteArrayWrapper, RedisData> region) {
+      Region<RedisKey, RedisData> region) {
     super(region);
   }
 
   @Override
-  public boolean del(ByteArrayWrapper key) {
+  public boolean del(RedisKey key) {
     return invokeCommandFunction(key, DEL);
   }
 
   @Override
-  public boolean exists(ByteArrayWrapper key) {
+  public boolean exists(RedisKey key) {
     return invokeCommandFunction(key, EXISTS);
   }
 
   @Override
-  public long pttl(ByteArrayWrapper key) {
+  public long pttl(RedisKey key) {
     return invokeCommandFunction(key, PTTL);
   }
 
   @Override
-  public long internalPttl(ByteArrayWrapper key) {
+  public long internalPttl(RedisKey key) {
     return invokeCommandFunction(key, INTERNALPTTL);
   }
 
   @Override
-  public int pexpireat(ByteArrayWrapper key, long timestamp) {
+  public int pexpireat(RedisKey key, long timestamp) {
     return invokeCommandFunction(key, PEXPIREAT, timestamp);
   }
 
   @Override
-  public int persist(ByteArrayWrapper key) {
+  public int persist(RedisKey key) {
     return invokeCommandFunction(key, PERSIST);
   }
 
   @Override
-  public String type(ByteArrayWrapper key) {
+  public String type(RedisKey key) {
     return invokeCommandFunction(key, TYPE);
   }
 
   @Override
-  public String internalType(ByteArrayWrapper key) {
+  public String internalType(RedisKey key) {
     return invokeCommandFunction(key, INTERNALTYPE);
   }
 
   @Override
-  public boolean rename(ByteArrayWrapper oldKey, ByteArrayWrapper newKey) {
+  public boolean rename(RedisKey oldKey, RedisKey newKey) {
     if (!region.containsKey(oldKey)) {
       return false;
     }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
index abfc320..a5e7322 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameExecutor.java
@@ -20,7 +20,7 @@ import static org.apache.geode.redis.internal.RedisConstants.ERROR_NO_SUCH_KEY;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
@@ -31,9 +31,9 @@ public class RenameExecutor extends AbstractExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    List<ByteArrayWrapper> commandElems = command.getProcessedCommandWrappers();
-    ByteArrayWrapper key = command.getKey();
-    ByteArrayWrapper newKey = commandElems.get(2);
+    List<RedisKey> commandElems = command.getProcessedCommandWrapperKeys();
+    RedisKey key = command.getKey();
+    RedisKey newKey = commandElems.get(2);
     RedisKeyCommands redisKeyCommands = getRedisKeyCommands(context);
 
     if (key.equals(newKey)) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
index b11c095..722920b 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/RenameFunction.java
@@ -28,9 +28,9 @@ import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.internal.cache.execute.RegionFunctionContextImpl;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.CommandHelper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.data.RedisKeyCommandsFunctionExecutor;
 import org.apache.geode.redis.internal.executor.SingleResultCollector;
 import org.apache.geode.redis.internal.executor.StripedExecutor;
@@ -46,14 +46,13 @@ public class RenameFunction implements InternalFunction {
   private final transient CommandHelper commandHelper;
   private final transient RedisKeyCommandsFunctionExecutor keyCommands;
 
-  public static void register(Region<ByteArrayWrapper, RedisData> dataRegion,
+  public static void register(Region<RedisKey, RedisData> dataRegion,
       StripedExecutor stripedExecutor,
       RedisStats redisStats) {
     FunctionService.registerFunction(new RenameFunction(dataRegion, stripedExecutor, redisStats));
   }
 
-  public RenameFunction(
-      Region<ByteArrayWrapper, RedisData> dataRegion,
+  public RenameFunction(Region<RedisKey, RedisData> dataRegion,
       StripedExecutor stripedExecutor,
       RedisStats redisStats) {
     partitionedRegion = (PartitionedRegion) dataRegion;
@@ -87,7 +86,7 @@ public class RenameFunction implements InternalFunction {
       return getLockForNextKey(context);
     }
 
-    List<ByteArrayWrapper> keysToOperateOn = new ArrayList<>(context.getKeysFixedOnPrimary());
+    List<RedisKey> keysToOperateOn = new ArrayList<>(context.getKeysFixedOnPrimary());
     context.getKeysToOperateOn().addAll(keysToOperateOn);
 
     context.getKeysToOperateOn().sort(((o1, o2) -> compare(o1, o2, context)));
@@ -150,14 +149,14 @@ public class RenameFunction implements InternalFunction {
   }
 
   private void markCurrentKeyAsLocked(RenameContext context) {
-    ByteArrayWrapper keyToMarkAsLocked = context.getKeyToLock();
+    RedisKey keyToMarkAsLocked = context.getKeyToLock();
 
     context.getLockedKeys().add(keyToMarkAsLocked);
     context.getKeysToOperateOn().remove(keyToMarkAsLocked);
   }
 
   private boolean alreadyHaveLockForCurrentKey(
-      ByteArrayWrapper lockedKey, RenameContext context) {
+      RedisKey lockedKey, RenameContext context) {
 
     boolean stripesAreTheSame =
         getStripedExecutor().compareStripes(lockedKey, context.getKeyToLock()) == 0;
@@ -175,10 +174,7 @@ public class RenameFunction implements InternalFunction {
     DistributedMember primaryMemberForLockedKey = PartitionRegionHelper
         .getPrimaryMemberForKey(region, lockedKey);
 
-    boolean primaryMembersAreTheSame =
-        primaryMemberForCurrentKey.equals(primaryMemberForLockedKey);
-
-    return primaryMembersAreTheSame;
+    return primaryMemberForCurrentKey.equals(primaryMemberForLockedKey);
   }
 
   private boolean getLockForNextKey(RenameContext context) {
@@ -225,29 +221,29 @@ public class RenameFunction implements InternalFunction {
       this.context = (RegionFunctionContextImpl) context;
     }
 
-    private ByteArrayWrapper getOldKey() {
-      return (ByteArrayWrapper) ((Object[]) context.getArguments())[0];
+    private RedisKey getOldKey() {
+      return (RedisKey) ((Object[]) context.getArguments())[0];
     }
 
-    private ByteArrayWrapper getNewKey() {
-      return (ByteArrayWrapper) ((Object[]) context.getArguments())[1];
+    private RedisKey getNewKey() {
+      return (RedisKey) ((Object[]) context.getArguments())[1];
     }
 
-    private List<ByteArrayWrapper> getKeysToOperateOn() {
-      return (List<ByteArrayWrapper>) ((Object[]) context.getArguments())[2];
+    private List<RedisKey> getKeysToOperateOn() {
+      return (List<RedisKey>) ((Object[]) context.getArguments())[2];
     }
 
-    private List<ByteArrayWrapper> getKeysFixedOnPrimary() {
-      return (List<ByteArrayWrapper>) ((Object[]) context.getArguments())[3];
+    private List<RedisKey> getKeysFixedOnPrimary() {
+      return (List<RedisKey>) ((Object[]) context.getArguments())[3];
     }
 
-    private List<ByteArrayWrapper> getLockedKeys() {
-      return (List<ByteArrayWrapper>) ((Object[]) context.getArguments())[4];
+    private List<RedisKey> getLockedKeys() {
+      return (List<RedisKey>) ((Object[]) context.getArguments())[4];
     }
 
 
-    private ByteArrayWrapper getKeyToLock() {
-      return (ByteArrayWrapper) context.getFilter().iterator().next();
+    private RedisKey getKeyToLock() {
+      return (RedisKey) context.getFilter().iterator().next();
     }
 
     private Region getDataRegion() {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ScanExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ScanExecutor.java
index a79cc63..cefc1dc 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ScanExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/ScanExecutor.java
@@ -30,7 +30,7 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -102,7 +102,7 @@ public class ScanExecutor extends AbstractScanExecutor {
     return RedisResponse.scan(scanResult.getLeft(), scanResult.getRight());
   }
 
-  private Pair<BigInteger, List<Object>> scan(Collection<ByteArrayWrapper> list,
+  private Pair<BigInteger, List<Object>> scan(Collection<RedisKey> list,
       Pattern matchPattern,
       int count, BigInteger cursor) {
     List<Object> returnList = new ArrayList<>();
@@ -110,7 +110,7 @@ public class ScanExecutor extends AbstractScanExecutor {
     BigInteger beforeCursor = new BigInteger("0");
     int numElements = 0;
     int i = -1;
-    for (ByteArrayWrapper key : list) {
+    for (RedisKey key : list) {
       i++;
       if (beforeCursor.compareTo(cursor) < 0) {
         beforeCursor = beforeCursor.add(new BigInteger("1"));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TTLExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TTLExecutor.java
index 2f8cbec..4785a26 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TTLExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TTLExecutor.java
@@ -17,7 +17,7 @@ package org.apache.geode.redis.internal.executor.key;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
@@ -28,7 +28,7 @@ public class TTLExecutor extends AbstractExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     RedisKeyCommands redisKeyCommands = getRedisKeyCommands(context);
     long result = redisKeyCommands.pttl(key);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TypeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TypeExecutor.java
index 321f329..6a73e45 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TypeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/key/TypeExecutor.java
@@ -16,7 +16,7 @@
 package org.apache.geode.redis.internal.executor.key;
 
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
@@ -28,7 +28,7 @@ public class TypeExecutor extends AbstractExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     String result = getRedisKeyCommands(context).type(key);
 
     return respondBulkStrings(result);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/FlushAllExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/FlushAllExecutor.java
index 4aa704c..1c053f7 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/FlushAllExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/server/FlushAllExecutor.java
@@ -15,7 +15,7 @@
  */
 package org.apache.geode.redis.internal.executor.server;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.AbstractExecutor;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommands;
@@ -29,7 +29,7 @@ public class FlushAllExecutor extends AbstractExecutor {
       ExecutionHandlerContext context) {
     RedisKeyCommands redisKeyCommands = getRedisKeyCommands(context);
 
-    for (ByteArrayWrapper skey : context.getRegionProvider().getDataRegion().keySet()) {
+    for (RedisKey skey : context.getRegionProvider().getDataRegion().keySet()) {
       redisKeyCommands.del(skey);
     }
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java
index 2bb14fd..47c1998 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommands.java
@@ -25,31 +25,32 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 
 public interface RedisSetCommands {
 
-  long sadd(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToAdd);
+  long sadd(RedisKey key, ArrayList<ByteArrayWrapper> membersToAdd);
 
-  long srem(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToRemove);
+  long srem(RedisKey key, ArrayList<ByteArrayWrapper> membersToRemove);
 
-  Set<ByteArrayWrapper> smembers(ByteArrayWrapper key);
+  Set<ByteArrayWrapper> smembers(RedisKey key);
 
-  Set<ByteArrayWrapper> internalsmembers(ByteArrayWrapper key);
+  Set<ByteArrayWrapper> internalsmembers(RedisKey key);
 
-  int scard(ByteArrayWrapper key);
+  int scard(RedisKey key);
 
-  boolean sismember(ByteArrayWrapper key, ByteArrayWrapper member);
+  boolean sismember(RedisKey key, ByteArrayWrapper member);
 
-  Collection<ByteArrayWrapper> srandmember(ByteArrayWrapper key, int count);
+  Collection<ByteArrayWrapper> srandmember(RedisKey key, int count);
 
-  Collection<ByteArrayWrapper> spop(ByteArrayWrapper key, int popCount);
+  Collection<ByteArrayWrapper> spop(RedisKey key, int popCount);
 
-  Pair<BigInteger, List<Object>> sscan(ByteArrayWrapper key, Pattern matchPattern, int count,
+  Pair<BigInteger, List<Object>> sscan(RedisKey key, Pattern matchPattern, int count,
       BigInteger cursor);
 
-  int sunionstore(ByteArrayWrapper destination, ArrayList<ByteArrayWrapper> setKeys);
+  int sunionstore(RedisKey destination, ArrayList<RedisKey> setKeys);
 
-  int sinterstore(ByteArrayWrapper destination, ArrayList<ByteArrayWrapper> setKeys);
+  int sinterstore(RedisKey destination, ArrayList<RedisKey> setKeys);
 
-  int sdiffstore(ByteArrayWrapper destination, ArrayList<ByteArrayWrapper> setKeys);
+  int sdiffstore(RedisKey destination, ArrayList<RedisKey> setKeys);
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionInvoker.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionInvoker.java
index cc38577..93499ef 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionInvoker.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/RedisSetCommandsFunctionInvoker.java
@@ -40,6 +40,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 
 /**
@@ -50,68 +51,69 @@ import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 public class RedisSetCommandsFunctionInvoker extends RedisCommandsFunctionInvoker
     implements RedisSetCommands {
 
-  public RedisSetCommandsFunctionInvoker(Region<ByteArrayWrapper, RedisData> region) {
+  public RedisSetCommandsFunctionInvoker(Region<RedisKey, RedisData> region) {
     super(region);
   }
 
   @Override
-  public long sadd(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToAdd) {
+  public long sadd(RedisKey key, ArrayList<ByteArrayWrapper> membersToAdd) {
     return invokeCommandFunction(key, SADD, membersToAdd);
   }
 
   @Override
-  public long srem(ByteArrayWrapper key, ArrayList<ByteArrayWrapper> membersToRemove) {
+  public long srem(RedisKey key, ArrayList<ByteArrayWrapper> membersToRemove) {
     return invokeCommandFunction(key, SREM, membersToRemove);
   }
 
   @Override
-  public Set<ByteArrayWrapper> smembers(ByteArrayWrapper key) {
+  public Set<ByteArrayWrapper> smembers(RedisKey key) {
     return invokeCommandFunction(key, SMEMBERS);
   }
 
   @Override
-  public Set<ByteArrayWrapper> internalsmembers(ByteArrayWrapper key) {
+  public Set<ByteArrayWrapper> internalsmembers(RedisKey key) {
     return invokeCommandFunction(key, INTERNALSMEMBERS);
   }
 
   @Override
-  public int scard(ByteArrayWrapper key) {
+  public int scard(RedisKey key) {
     return invokeCommandFunction(key, SCARD);
   }
 
   @Override
-  public boolean sismember(ByteArrayWrapper key, ByteArrayWrapper member) {
+  public boolean sismember(RedisKey key, ByteArrayWrapper member) {
     return invokeCommandFunction(key, SISMEMBER, member);
   }
 
   @Override
-  public Collection<ByteArrayWrapper> srandmember(ByteArrayWrapper key, int count) {
+  public Collection<ByteArrayWrapper> srandmember(RedisKey key, int count) {
     return invokeCommandFunction(key, SRANDMEMBER, count);
   }
 
   @Override
-  public Collection<ByteArrayWrapper> spop(ByteArrayWrapper key, int popCount) {
+  public Collection<ByteArrayWrapper> spop(RedisKey key, int popCount) {
     return invokeCommandFunction(key, SPOP, popCount);
   }
 
   @Override
-  public Pair<BigInteger, List<Object>> sscan(ByteArrayWrapper key, Pattern matchPattern, int count,
+  public Pair<BigInteger, List<Object>> sscan(RedisKey key, Pattern matchPattern,
+      int count,
       BigInteger cursor) {
     return invokeCommandFunction(key, SSCAN, matchPattern, count, cursor);
   }
 
   @Override
-  public int sunionstore(ByteArrayWrapper destination, ArrayList<ByteArrayWrapper> setKeys) {
+  public int sunionstore(RedisKey destination, ArrayList<RedisKey> setKeys) {
     return invokeCommandFunction(destination, SUNIONSTORE, setKeys);
   }
 
   @Override
-  public int sinterstore(ByteArrayWrapper destination, ArrayList<ByteArrayWrapper> setKeys) {
+  public int sinterstore(RedisKey destination, ArrayList<RedisKey> setKeys) {
     return invokeCommandFunction(destination, SINTERSTORE, setKeys);
   }
 
   @Override
-  public int sdiffstore(ByteArrayWrapper destination, ArrayList<ByteArrayWrapper> setKeys) {
+  public int sdiffstore(RedisKey destination, ArrayList<RedisKey> setKeys) {
     return invokeCommandFunction(destination, SDIFFSTORE, setKeys);
   }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
index 54d79a7..5b75a7e 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SCardExecutor.java
@@ -15,7 +15,7 @@
 package org.apache.geode.redis.internal.executor.set;
 
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -25,7 +25,7 @@ public class SCardExecutor extends SetExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisSetCommands redisSetCommands = createRedisSetCommands(context);
     return RedisResponse.integer(redisSetCommands.scard(key));
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
index 8a72834..4da5737 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SIsMemberExecutor.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal.executor.set;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -26,7 +27,7 @@ public class SIsMemberExecutor extends SetExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     ByteArrayWrapper member = new ByteArrayWrapper(commandElems.get(2));
     RedisSetCommands redisSetCommands = createRedisSetCommands(context);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
index 22ebf97..2b60b10 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMembersExecutor.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal.executor.set;
 import java.util.Set;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -26,7 +27,7 @@ public class SMembersExecutor extends SetExecutor {
   @Override
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisSetCommands redisSetCommands = createRedisSetCommands(context);
     Set<ByteArrayWrapper> members = redisSetCommands.smembers(key);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
index 147f510..77c57b4 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SMoveExecutor.java
@@ -21,6 +21,7 @@ import java.util.List;
 import org.apache.geode.redis.internal.RedisConstants;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisDataType;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -36,8 +37,8 @@ public class SMoveExecutor extends SetExecutor {
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
-    ByteArrayWrapper source = command.getKey();
-    ByteArrayWrapper destination = new ByteArrayWrapper(commandElems.get(2));
+    RedisKey source = command.getKey();
+    RedisKey destination = new RedisKey(commandElems.get(2));
     ByteArrayWrapper member = new ByteArrayWrapper(commandElems.get(3));
 
     String destinationType = getRedisKeyCommands(context).internalType(destination);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
index d23c310..e629057 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SPopExecutor.java
@@ -18,6 +18,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -37,7 +38,7 @@ public class SPopExecutor extends SetExecutor {
       popCount = Integer.parseInt(new String(commandElems.get(2)));
     }
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisSetCommands redisSetCommands = createRedisSetCommands(context);
     Collection<ByteArrayWrapper> popped = redisSetCommands.spop(key, popCount);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
index 737d4f0..574ee96 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRandMemberExecutor.java
@@ -18,6 +18,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -32,7 +33,7 @@ public class SRandMemberExecutor extends SetExecutor {
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     boolean countSpecified = false;
     int count = 1;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
index 3dc0395..ad4a5fd 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SRemExecutor.java
@@ -18,6 +18,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -28,7 +29,7 @@ public class SRemExecutor extends SetExecutor {
       ExecutionHandlerContext context) {
     List<ByteArrayWrapper> commandElements = command.getProcessedCommandWrappers();
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     RedisSetCommands redisSetCommands = createRedisSetCommands(context);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
index 909c955..9d78971 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SScanExecutor.java
@@ -29,8 +29,8 @@ import java.util.regex.PatternSyntaxException;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisDataTypeMismatchException;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.executor.key.AbstractScanExecutor;
 import org.apache.geode.redis.internal.netty.Coder;
@@ -60,7 +60,7 @@ public class SScanExecutor extends AbstractScanExecutor {
       return RedisResponse.error(ERROR_CURSOR);
     }
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     if (!getDataRegion(context).containsKey(key)) {
       context.getRedisStats().incKeyspaceMisses();
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
index 343e3f8..be0028b 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/set/SetOpExecutor.java
@@ -19,6 +19,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -34,11 +35,11 @@ public abstract class SetOpExecutor extends SetExecutor {
       setsStartIndex++;
     }
 
-    List<ByteArrayWrapper> commandElements = command.getProcessedCommandWrappers();
-    ArrayList<ByteArrayWrapper> setKeys =
+    List<RedisKey> commandElements = command.getProcessedCommandWrapperKeys();
+    ArrayList<RedisKey> setKeys =
         new ArrayList<>(commandElements.subList(setsStartIndex, commandElements.size()));
     if (isStorage()) {
-      ByteArrayWrapper destination = command.getKey();
+      RedisKey destination = command.getKey();
       RedisSetCommands redisSetCommands = createRedisSetCommands(context);
       int storeCount;
       switch (command.getCommandType()) {
@@ -62,12 +63,12 @@ public abstract class SetOpExecutor extends SetExecutor {
   }
 
   private RedisResponse doActualSetOperation(ExecutionHandlerContext context,
-      ArrayList<ByteArrayWrapper> setKeys) {
+      ArrayList<RedisKey> setKeys) {
     RedisSetCommands redisSetCommands = createRedisSetCommands(context);
-    ByteArrayWrapper firstSetKey = setKeys.remove(0);
+    RedisKey firstSetKey = setKeys.remove(0);
     Set<ByteArrayWrapper> resultSet = redisSetCommands.smembers(firstSetKey);
 
-    for (ByteArrayWrapper key : setKeys) {
+    for (RedisKey key : setKeys) {
       Set<ByteArrayWrapper> nextSet = redisSetCommands.smembers(key);
       if (doSetOp(resultSet, nextSet)) {
         break;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/AppendExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/AppendExecutor.java
index c66029a..2cc14eb 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/AppendExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/AppendExecutor.java
@@ -18,6 +18,7 @@ package org.apache.geode.redis.internal.executor.string;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -30,7 +31,7 @@ public class AppendExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     byte[] bytesToAppend = commandElems.get(VALUE_INDEX);
     ByteArrayWrapper valueToAppend = new ByteArrayWrapper(bytesToAppend);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitCountExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitCountExecutor.java
index b02dd0e..5324d0c 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitCountExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitCountExecutor.java
@@ -17,7 +17,7 @@ package org.apache.geode.redis.internal.executor.string;
 import java.util.List;
 
 import org.apache.geode.redis.internal.RedisConstants;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -32,7 +32,7 @@ public class BitCountExecutor extends StringExecutor {
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisStringCommands stringCommands = getRedisStringCommands(context);
     long result;
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitOpExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitOpExecutor.java
index 87971b2..75b2318 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitOpExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitOpExecutor.java
@@ -19,7 +19,7 @@ import static org.apache.geode.redis.internal.RedisConstants.ERROR_SYNTAX;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -42,11 +42,11 @@ public class BitOpExecutor extends StringExecutor {
       return RedisResponse.error(ERROR_SYNTAX);
     }
 
-    ByteArrayWrapper destKey = new ByteArrayWrapper(commandElems.get(2));
+    RedisKey destKey = new RedisKey(commandElems.get(2));
 
-    List<ByteArrayWrapper> values = new ArrayList<>();
+    List<RedisKey> values = new ArrayList<>();
     for (int i = 3; i < commandElems.size(); i++) {
-      ByteArrayWrapper key = new ByteArrayWrapper(commandElems.get(i));
+      RedisKey key = new RedisKey(commandElems.get(i));
       values.add(key);
     }
     if (operation.equals("NOT") && values.size() != 1) {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitPosExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitPosExecutor.java
index 60fa1cc..bc66fcb 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitPosExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/BitPosExecutor.java
@@ -16,7 +16,7 @@ package org.apache.geode.redis.internal.executor.string;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -32,7 +32,7 @@ public class BitPosExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     int bit;
     int start = 0;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrByExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrByExecutor.java
index b2b48d1..8837b54 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrByExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrByExecutor.java
@@ -19,7 +19,7 @@ import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -32,7 +32,7 @@ public class DecrByExecutor extends StringExecutor {
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     byte[] decrArray = commandElems.get(DECREMENT_INDEX);
     String decrString = Coder.bytesToString(decrArray);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrExecutor.java
index ac2366d..b74fd54 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/DecrExecutor.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.redis.internal.executor.string;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -24,7 +24,7 @@ public class DecrExecutor extends StringExecutor {
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisStringCommands stringCommands = getRedisStringCommands(context);
 
     long value = stringCommands.decr(key);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetBitExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetBitExecutor.java
index c5cc330..2b9f30e 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetBitExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetBitExecutor.java
@@ -16,7 +16,7 @@ package org.apache.geode.redis.internal.executor.string;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -30,7 +30,7 @@ public class GetBitExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
 
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     int offset;
     try {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetExecutor.java
index ce1a00a..5b60d90 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetExecutor.java
@@ -15,6 +15,7 @@
 package org.apache.geode.redis.internal.executor.string;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -25,7 +26,7 @@ public class GetExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisStringCommands redisStringCommands = getRedisStringCommands(context);
     ByteArrayWrapper result = redisStringCommands.get(key);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetRangeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetRangeExecutor.java
index 4101981..605904b 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetRangeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetRangeExecutor.java
@@ -19,6 +19,7 @@ import static org.apache.geode.redis.internal.RedisConstants.ERROR_NOT_INTEGER;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -46,7 +47,7 @@ public class GetRangeExecutor extends StringExecutor {
     }
 
     RedisStringCommands stringCommands = getRedisStringCommands(context);
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     ByteArrayWrapper returnRange = stringCommands.getrange(key, start, end);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetSetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetSetExecutor.java
index a2b233f..c26d33d 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetSetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/GetSetExecutor.java
@@ -18,6 +18,7 @@ package org.apache.geode.redis.internal.executor.string;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -30,7 +31,7 @@ public class GetSetExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     byte[] newCharValue = commandElems.get(VALUE_INDEX);
     ByteArrayWrapper newValueWrapper = new ByteArrayWrapper(newCharValue);
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByExecutor.java
index c7c5b76..b5a82c5 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByExecutor.java
@@ -17,7 +17,7 @@ package org.apache.geode.redis.internal.executor.string;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -33,7 +33,7 @@ public class IncrByExecutor extends StringExecutor {
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisStringCommands stringCommands = getRedisStringCommands(context);
 
     byte[] incrArray = commandElems.get(INCREMENT_INDEX);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByFloatExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByFloatExecutor.java
index 470d93f..0bbbb9d 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByFloatExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrByFloatExecutor.java
@@ -21,7 +21,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang3.tuple.Pair;
 
 import org.apache.geode.redis.internal.RedisConstants;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -37,7 +37,7 @@ public class IncrByFloatExecutor extends StringExecutor {
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     Pair<BigDecimal, RedisResponse> validated =
         validateIncrByFloatArgument(commandElems.get(INCREMENT_INDEX));
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrExecutor.java
index 92af648..7bad3cc 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/IncrExecutor.java
@@ -16,7 +16,7 @@ package org.apache.geode.redis.internal.executor.string;
 
 
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -25,7 +25,7 @@ public class IncrExecutor extends StringExecutor {
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     RedisStringCommands stringCommands = getRedisStringCommands(context);
 
     long value = stringCommands.incr(key);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MGetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MGetExecutor.java
index 3f86cb9..2447241 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MGetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MGetExecutor.java
@@ -19,6 +19,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -33,7 +34,7 @@ public class MGetExecutor extends StringExecutor {
     Collection<ByteArrayWrapper> values = new ArrayList<>();
     for (int i = 1; i < commandElems.size(); i++) {
       byte[] keyArray = commandElems.get(i);
-      ByteArrayWrapper key = new ByteArrayWrapper(keyArray);
+      RedisKey key = new RedisKey(keyArray);
       values.add(stringCommands.mget(key));
     }
 
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetExecutor.java
index 8013e56..98ce50a 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetExecutor.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal.executor.string;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -34,7 +35,7 @@ public class MSetExecutor extends StringExecutor {
     // TODO: make this atomic
     for (int i = 1; i < commandElems.size(); i += 2) {
       byte[] keyArray = commandElems.get(i);
-      ByteArrayWrapper key = new ByteArrayWrapper(keyArray);
+      RedisKey key = new RedisKey(keyArray);
       byte[] valueArray = commandElems.get(i + 1);
       ByteArrayWrapper value = new ByteArrayWrapper(valueArray);
       stringCommands.set(key, value, null);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetNXExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetNXExecutor.java
index 4e508d7..d3ac56a 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetNXExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/MSetNXExecutor.java
@@ -18,6 +18,7 @@ package org.apache.geode.redis.internal.executor.string;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.executor.key.RedisKeyCommands;
 import org.apache.geode.redis.internal.netty.Command;
@@ -39,7 +40,7 @@ public class MSetNXExecutor extends StringExecutor {
     // TODO: make this atomic
     for (int i = 1; i < commandElems.size(); i += 2) {
       byte[] keyArray = commandElems.get(i);
-      ByteArrayWrapper key = new ByteArrayWrapper(keyArray);
+      RedisKey key = new RedisKey(keyArray);
       if (keyCommands.exists(key)) {
         return RedisResponse.integer(NOT_SET);
       }
@@ -48,7 +49,7 @@ public class MSetNXExecutor extends StringExecutor {
     // none exist so now set them all
     for (int i = 1; i < commandElems.size(); i += 2) {
       byte[] keyArray = commandElems.get(i);
-      ByteArrayWrapper key = new ByteArrayWrapper(keyArray);
+      RedisKey key = new RedisKey(keyArray);
       byte[] valueArray = commandElems.get(i + 1);
       ByteArrayWrapper value = new ByteArrayWrapper(valueArray);
       stringCommands.set(key, value, null);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommands.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommands.java
index f69822d..f4f6f5c 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommands.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommands.java
@@ -18,43 +18,44 @@ import java.math.BigDecimal;
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 
 public interface RedisStringCommands {
-  long append(ByteArrayWrapper key, ByteArrayWrapper valueToAppend);
+  long append(RedisKey key, ByteArrayWrapper valueToAppend);
 
-  ByteArrayWrapper get(ByteArrayWrapper key);
+  ByteArrayWrapper get(RedisKey key);
 
-  boolean set(ByteArrayWrapper key, ByteArrayWrapper value, SetOptions options);
+  boolean set(RedisKey key, ByteArrayWrapper value, SetOptions options);
 
-  long incr(ByteArrayWrapper key);
+  long incr(RedisKey key);
 
-  long decr(ByteArrayWrapper key);
+  long decr(RedisKey key);
 
-  ByteArrayWrapper getset(ByteArrayWrapper key, ByteArrayWrapper value);
+  ByteArrayWrapper getset(RedisKey key, ByteArrayWrapper value);
 
-  long incrby(ByteArrayWrapper key, long increment);
+  long incrby(RedisKey key, long increment);
 
-  long decrby(ByteArrayWrapper key, long decrement);
+  long decrby(RedisKey key, long decrement);
 
-  ByteArrayWrapper getrange(ByteArrayWrapper key, long start, long end);
+  ByteArrayWrapper getrange(RedisKey key, long start, long end);
 
-  long bitcount(ByteArrayWrapper key, int start, int end);
+  long bitcount(RedisKey key, int start, int end);
 
-  long bitcount(ByteArrayWrapper key);
+  long bitcount(RedisKey key);
 
-  int strlen(ByteArrayWrapper key);
+  int strlen(RedisKey key);
 
-  int getbit(ByteArrayWrapper key, int offset);
+  int getbit(RedisKey key, int offset);
 
-  int setbit(ByteArrayWrapper key, long offset, int value);
+  int setbit(RedisKey key, long offset, int value);
 
-  BigDecimal incrbyfloat(ByteArrayWrapper key, BigDecimal increment);
+  BigDecimal incrbyfloat(RedisKey key, BigDecimal increment);
 
-  int bitop(String operation, ByteArrayWrapper destKey, List<ByteArrayWrapper> sources);
+  int bitop(String operation, RedisKey destKey, List<RedisKey> sources);
 
-  int bitpos(ByteArrayWrapper key, int bit, int start, Integer end);
+  int bitpos(RedisKey key, int bit, int start, Integer end);
 
-  int setrange(ByteArrayWrapper key, int offset, byte[] value);
+  int setrange(RedisKey key, int offset, byte[] value);
 
-  ByteArrayWrapper mget(ByteArrayWrapper key);
+  ByteArrayWrapper mget(RedisKey key);
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommandsFunctionInvoker.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommandsFunctionInvoker.java
index a588161..79b6cd2 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommandsFunctionInvoker.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/RedisStringCommandsFunctionInvoker.java
@@ -39,6 +39,7 @@ import java.util.List;
 import org.apache.geode.cache.Region;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 
 /**
@@ -49,102 +50,102 @@ import org.apache.geode.redis.internal.executor.RedisCommandsFunctionInvoker;
 public class RedisStringCommandsFunctionInvoker extends RedisCommandsFunctionInvoker
     implements RedisStringCommands {
 
-  public RedisStringCommandsFunctionInvoker(Region<ByteArrayWrapper, RedisData> region) {
+  public RedisStringCommandsFunctionInvoker(Region<RedisKey, RedisData> region) {
     super(region);
   }
 
   @Override
-  public long append(ByteArrayWrapper key, ByteArrayWrapper valueToAppend) {
+  public long append(RedisKey key, ByteArrayWrapper valueToAppend) {
     return invokeCommandFunction(key, APPEND, valueToAppend);
   }
 
   @Override
-  public ByteArrayWrapper get(ByteArrayWrapper key) {
+  public ByteArrayWrapper get(RedisKey key) {
     return invokeCommandFunction(key, GET);
   }
 
   @Override
-  public boolean set(ByteArrayWrapper key, ByteArrayWrapper value, SetOptions options) {
+  public boolean set(RedisKey key, ByteArrayWrapper value, SetOptions options) {
     return invokeCommandFunction(key, SET, value, options);
   }
 
   @Override
-  public long incr(ByteArrayWrapper key) {
+  public long incr(RedisKey key) {
     return invokeCommandFunction(key, INCR);
   }
 
   @Override
-  public long decr(ByteArrayWrapper key) {
+  public long decr(RedisKey key) {
     return invokeCommandFunction(key, DECR);
   }
 
   @Override
-  public ByteArrayWrapper getset(ByteArrayWrapper key, ByteArrayWrapper value) {
+  public ByteArrayWrapper getset(RedisKey key, ByteArrayWrapper value) {
     return invokeCommandFunction(key, GETSET, value);
   }
 
   @Override
-  public long incrby(ByteArrayWrapper key, long increment) {
+  public long incrby(RedisKey key, long increment) {
     return invokeCommandFunction(key, INCRBY, increment);
   }
 
   @Override
-  public long decrby(ByteArrayWrapper key, long decrement) {
+  public long decrby(RedisKey key, long decrement) {
     return invokeCommandFunction(key, DECRBY, decrement);
   }
 
   @Override
-  public ByteArrayWrapper getrange(ByteArrayWrapper key, long start, long end) {
+  public ByteArrayWrapper getrange(RedisKey key, long start, long end) {
     return invokeCommandFunction(key, GETRANGE, start, end);
   }
 
   @Override
-  public long bitcount(ByteArrayWrapper key, int start, int end) {
+  public long bitcount(RedisKey key, int start, int end) {
     return invokeCommandFunction(key, BITCOUNT, start, end);
   }
 
   @Override
-  public long bitcount(ByteArrayWrapper key) {
+  public long bitcount(RedisKey key) {
     return invokeCommandFunction(key, BITCOUNT);
   }
 
   @Override
-  public int strlen(ByteArrayWrapper key) {
+  public int strlen(RedisKey key) {
     return invokeCommandFunction(key, STRLEN);
   }
 
   @Override
-  public int getbit(ByteArrayWrapper key, int offset) {
+  public int getbit(RedisKey key, int offset) {
     return invokeCommandFunction(key, GETBIT, offset);
   }
 
   @Override
-  public int setbit(ByteArrayWrapper key, long offset, int value) {
+  public int setbit(RedisKey key, long offset, int value) {
     return invokeCommandFunction(key, SETBIT, offset, value);
   }
 
   @Override
-  public BigDecimal incrbyfloat(ByteArrayWrapper key, BigDecimal increment) {
+  public BigDecimal incrbyfloat(RedisKey key, BigDecimal increment) {
     return invokeCommandFunction(key, INCRBYFLOAT, increment);
   }
 
   @Override
-  public int bitop(String operation, ByteArrayWrapper destKey, List<ByteArrayWrapper> sources) {
+  public int bitop(String operation, RedisKey destKey, List<RedisKey> sources) {
     return invokeCommandFunction(destKey, BITOP, operation, sources);
   }
 
   @Override
-  public int bitpos(ByteArrayWrapper key, int bit, int start, Integer end) {
+  public int bitpos(RedisKey key, int bit, int start, Integer end) {
     return invokeCommandFunction(key, BITPOS, bit, start, end);
   }
 
   @Override
-  public int setrange(ByteArrayWrapper key, int offset, byte[] value) {
+  public int setrange(RedisKey key, int offset, byte[] value) {
     return invokeCommandFunction(key, SETRANGE, offset, value);
   }
 
   @Override
-  public ByteArrayWrapper mget(ByteArrayWrapper key) {
+  public ByteArrayWrapper mget(RedisKey key) {
     return invokeCommandFunction(key, MGET);
   }
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetBitExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetBitExecutor.java
index b56db58..654bd7d 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetBitExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetBitExecutor.java
@@ -17,7 +17,7 @@ package org.apache.geode.redis.internal.executor.string;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -36,7 +36,7 @@ public class SetBitExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
     RedisStringCommands stringCommands = getRedisStringCommands(context);
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     long offset;
     int value;
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetEXExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetEXExecutor.java
index fca7d74..c4d2c53 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetEXExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetEXExecutor.java
@@ -20,6 +20,7 @@ import static org.apache.geode.redis.internal.executor.string.SetOptions.Exists.
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -43,7 +44,7 @@ public class SetEXExecutor extends StringExecutor {
     List<byte[]> commandElems = command.getProcessedCommand();
     RedisStringCommands stringCommands = getRedisStringCommands(context);
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     byte[] value = commandElems.get(VALUE_INDEX);
 
     byte[] expirationArray = commandElems.get(2);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetExecutor.java
index 353ad35..a2b71fa 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetExecutor.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -38,7 +39,7 @@ public class SetExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command,
       ExecutionHandlerContext context) {
 
-    ByteArrayWrapper keyToSet = command.getKey();
+    RedisKey keyToSet = command.getKey();
     List<byte[]> commandElementsBytes = command.getProcessedCommand();
     List<byte[]> optionalParameterBytes = getOptionalParameters(commandElementsBytes);
     ByteArrayWrapper valueToSet = getValueToSet(commandElementsBytes);
@@ -58,7 +59,7 @@ public class SetExecutor extends StringExecutor {
     return commandElementsBytes.subList(3, commandElementsBytes.size());
   }
 
-  private RedisResponse doSet(ByteArrayWrapper key, ByteArrayWrapper value,
+  private RedisResponse doSet(RedisKey key, ByteArrayWrapper value,
       RedisStringCommands redisStringCommands, SetOptions setOptions) {
 
     boolean setCompletedSuccessfully = redisStringCommands.set(key, value, setOptions);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetNXExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetNXExecutor.java
index 5943e40..8aa6fae 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetNXExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetNXExecutor.java
@@ -19,6 +19,7 @@ import static org.apache.geode.redis.internal.executor.string.SetOptions.Exists.
 import java.util.List;
 
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -30,7 +31,7 @@ public class SetNXExecutor extends StringExecutor {
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     ByteArrayWrapper value = new ByteArrayWrapper(commandElems.get(VALUE_INDEX));
 
     RedisStringCommands stringCommands = getRedisStringCommands(context);
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetRangeExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetRangeExecutor.java
index 065535b..0e97175 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetRangeExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/SetRangeExecutor.java
@@ -17,7 +17,7 @@ package org.apache.geode.redis.internal.executor.string;
 
 import java.util.List;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.redis.internal.netty.Command;
@@ -33,7 +33,7 @@ public class SetRangeExecutor extends StringExecutor {
   @Override
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     List<byte[]> commandElems = command.getProcessedCommand();
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
 
     int offset;
     try {
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/StrlenExecutor.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/StrlenExecutor.java
index d9f575f..eb3a3f4 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/StrlenExecutor.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/string/StrlenExecutor.java
@@ -14,7 +14,7 @@
  */
 package org.apache.geode.redis.internal.executor.string;
 
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 import org.apache.geode.redis.internal.netty.Command;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -25,7 +25,7 @@ public class StrlenExecutor extends StringExecutor {
   public RedisResponse executeCommand(Command command, ExecutionHandlerContext context) {
     RedisStringCommands stringCommands = getRedisStringCommands(context);
 
-    ByteArrayWrapper key = command.getKey();
+    RedisKey key = command.getKey();
     int length = stringCommands.strlen(key);
     return RedisResponse.integer(length);
   }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
index 6aa154b..e91d872 100755
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Command.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
 
 import org.apache.geode.redis.internal.RedisCommandType;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.RedisResponse;
 
 /**
@@ -103,6 +104,15 @@ public class Command {
   }
 
   /**
+   * Used to get the command element list when every argument is also a key
+   *
+   * @return List of command elements in form of {@link List}
+   */
+  public List<RedisKey> getProcessedCommandWrapperKeys() {
+    return this.commandElems.stream().map(RedisKey::new).collect(Collectors.toList());
+  }
+
+  /**
    * Getter method for the command type
    *
    * @return The command type
@@ -132,12 +142,12 @@ public class Command {
     }
   }
 
-  public ByteArrayWrapper getKey() {
+  public RedisKey getKey() {
     if (this.commandElems.size() > 1) {
       if (this.bytes == null) {
-        this.bytes = new ByteArrayWrapper(this.commandElems.get(1));
+        this.bytes = new RedisKey(this.commandElems.get(1));
       }
-      return this.bytes;
+      return (RedisKey) this.bytes;
     } else {
       return null;
     }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
index 86f1532..35b6459 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSub.java
@@ -19,8 +19,8 @@ package org.apache.geode.redis.internal.pubsub;
 import java.util.List;
 
 import org.apache.geode.cache.Region;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.GlobPattern;
 import org.apache.geode.redis.internal.netty.Client;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -39,7 +39,7 @@ public interface PubSub {
    * @return the number of messages published
    */
   long publish(
-      Region<ByteArrayWrapper, RedisData> dataRegion,
+      Region<RedisKey, RedisData> dataRegion,
       byte[] channel, byte[] message);
 
   /**
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
index 0ce538b..6150569 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/pubsub/PubSubImpl.java
@@ -34,8 +34,8 @@ import org.apache.geode.cache.partition.PartitionRegionInfo;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.internal.cache.execute.InternalFunction;
 import org.apache.geode.logging.internal.log4j.api.LogService;
-import org.apache.geode.redis.internal.data.ByteArrayWrapper;
 import org.apache.geode.redis.internal.data.RedisData;
+import org.apache.geode.redis.internal.data.RedisKey;
 import org.apache.geode.redis.internal.executor.GlobPattern;
 import org.apache.geode.redis.internal.netty.Client;
 import org.apache.geode.redis.internal.netty.ExecutionHandlerContext;
@@ -63,9 +63,7 @@ public class PubSubImpl implements PubSub {
   }
 
   @Override
-  public long publish(
-      Region<ByteArrayWrapper, RedisData> dataRegion,
-      byte[] channel, byte[] message) {
+  public long publish(Region<RedisKey, RedisData> dataRegion, byte[] channel, byte[] message) {
     PartitionRegionInfo info = PartitionRegionHelper.getPartitionRegionInfo(dataRegion);
     Set<DistributedMember> membersWithDataRegion = new HashSet<>();
     for (PartitionMemberInfo memberInfo : info.getPartitionMemberInfo()) {
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/ByteArrayWrapperJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/ByteArrayWrapperJUnitTest.java
deleted file mode 100644
index 8937f70..0000000
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/ByteArrayWrapperJUnitTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
- * or implied. See the License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.geode.redis.internal.data;
-
-import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS;
-import static org.apache.geode.redis.internal.RegionProvider.REDIS_SLOTS_PER_BUCKET;
-import static org.assertj.core.api.Assertions.assertThat;
-
-import org.junit.Test;
-
-import org.apache.geode.redis.internal.executor.cluster.CRC16;
-
-public class ByteArrayWrapperJUnitTest {
-
-  @Test
-  public void testRoutingId_withHashtags() {
-    ByteArrayWrapper baw = new ByteArrayWrapper("name{user1000}".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user1000"));
-
-    baw = new ByteArrayWrapper("{user1000".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("{user1000"));
-
-    baw = new ByteArrayWrapper("}user1000{".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("}user1000{"));
-
-    baw = new ByteArrayWrapper("user{}1000".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user{}1000"));
-
-    baw = new ByteArrayWrapper("user}{1000".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user}{1000"));
-
-    baw = new ByteArrayWrapper("{user1000}}bar".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user1000"));
-
-    baw = new ByteArrayWrapper("foo{user1000}{bar}".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("user1000"));
-
-    baw = new ByteArrayWrapper("foo{}{user1000}".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("foo{}{user1000}"));
-
-    baw = new ByteArrayWrapper("{}{user1000}".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("{}{user1000}"));
-
-    baw = new ByteArrayWrapper("foo{{user1000}}bar".getBytes());
-    assertThat(baw.getRoutingId()).isEqualTo(calculateRoutingId("{user1000"));
-  }
-
-  private int calculateRoutingId(String data) {
-    return (CRC16.calculate(data.getBytes(), 0, data.length()) % REDIS_SLOTS)
-        / REDIS_SLOTS_PER_BUCKET;
-  }
-}
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
index a003361..75399d9 100644
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
@@ -121,7 +121,7 @@ public class RedisHashTest {
   @SuppressWarnings("unchecked")
   @Test
   public void hset_stores_delta_that_is_stable() throws IOException {
-    Region<ByteArrayWrapper, RedisData> region = Mockito.mock(Region.class);
+    Region<RedisKey, RedisData> region = Mockito.mock(Region.class);
     RedisHash o1 = createRedisHash("k1", "v1", "k2", "v2");
     ByteArrayWrapper k3 = createByteArrayWrapper("k3");
     ByteArrayWrapper v3 = createByteArrayWrapper("v3");
@@ -143,7 +143,7 @@ public class RedisHashTest {
   @SuppressWarnings("unchecked")
   @Test
   public void hdel_stores_delta_that_is_stable() throws IOException {
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisHash o1 = createRedisHash("k1", "v1", "k2", "v2");
     ByteArrayWrapper k1 = createByteArrayWrapper("k1");
     ArrayList<ByteArrayWrapper> removes = new ArrayList<>();
@@ -163,7 +163,7 @@ public class RedisHashTest {
   @SuppressWarnings("unchecked")
   @Test
   public void setExpirationTimestamp_stores_delta_that_is_stable() throws IOException {
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisHash o1 = createRedisHash("k1", "v1", "k2", "v2");
     o1.setExpirationTimestamp(region, null, 999);
     assertThat(o1.hasDelta()).isTrue();
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisKeyJUnitTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisKeyJUnitTest.java
new file mode 100644
index 0000000..535d01d
--- /dev/null
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisKeyJUnitTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.data;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.internal.HeapDataOutputStream;
+import org.apache.geode.internal.InternalDataSerializer;
+import org.apache.geode.internal.serialization.ByteArrayDataInput;
+import org.apache.geode.internal.serialization.DataSerializableFixedID;
+import org.apache.geode.redis.internal.executor.cluster.CRC16;
+
+public class RedisKeyJUnitTest {
+
+  @BeforeClass
+  public static void classSetup() {
+    InternalDataSerializer.getDSFIDSerializer()
+        .registerDSFID(DataSerializableFixedID.REDIS_KEY, RedisKey.class);
+  }
+
+  @Test
+  public void testRoutingId_withHashtags() {
+    RedisKey key = new RedisKey("name{user1000}".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("user1000"));
+
+    key = new RedisKey("{user1000".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("{user1000"));
+
+    key = new RedisKey("}user1000{".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("}user1000{"));
+
+    key = new RedisKey("user{}1000".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("user{}1000"));
+
+    key = new RedisKey("user}{1000".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("user}{1000"));
+
+    key = new RedisKey("{user1000}}bar".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("user1000"));
+
+    key = new RedisKey("foo{user1000}{bar}".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("user1000"));
+
+    key = new RedisKey("foo{}{user1000}".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("foo{}{user1000}"));
+
+    key = new RedisKey("{}{user1000}".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("{}{user1000}"));
+
+    key = new RedisKey("foo{{user1000}}bar".getBytes());
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate("{user1000"));
+
+    key = new RedisKey(new byte[] {});
+    assertThat(key.getCrc16()).isEqualTo(CRC16.calculate(""));
+  }
+
+  @Test
+  public void testSerialization_withPositiveSignedShortCRC16() throws Exception {
+    RedisKey keyOut = new RedisKey("012345".getBytes());
+    assertThat((short) keyOut.getCrc16()).isPositive();
+
+    HeapDataOutputStream out = new HeapDataOutputStream(100);
+    DataSerializer.writeObject(keyOut, out);
+    ByteArrayDataInput in = new ByteArrayDataInput(out.toByteArray());
+
+    RedisKey keyIn = DataSerializer.readObject(in);
+    assertThat(keyIn).isEqualTo(keyOut);
+  }
+
+  @Test
+  public void testSerialization_withNegativeSignedShortCRC16() throws Exception {
+    RedisKey keyOut = new RedisKey("k2".getBytes());
+    assertThat((short) keyOut.getCrc16()).isNegative();
+
+    HeapDataOutputStream out = new HeapDataOutputStream(100);
+    DataSerializer.writeObject(keyOut, out);
+    ByteArrayDataInput in = new ByteArrayDataInput(out.toByteArray());
+
+    RedisKey keyIn = DataSerializer.readObject(in);
+    assertThat(keyIn).isEqualTo(keyOut);
+  }
+
+}
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java
index 1a3c0d0..4a82413 100644
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java
@@ -112,7 +112,7 @@ public class RedisSetTest {
   @SuppressWarnings("unchecked")
   @Test
   public void sadd_stores_delta_that_is_stable() throws IOException {
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisSet o1 = createRedisSet(1, 2);
     ByteArrayWrapper member3 = new ByteArrayWrapper(new byte[] {3});
     ArrayList<ByteArrayWrapper> adds = new ArrayList<>();
@@ -132,7 +132,7 @@ public class RedisSetTest {
   @SuppressWarnings("unchecked")
   @Test
   public void srem_stores_delta_that_is_stable() throws IOException {
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisSet o1 = createRedisSet(1, 2);
     ByteArrayWrapper member1 = new ByteArrayWrapper(new byte[] {1});
     ArrayList<ByteArrayWrapper> removes = new ArrayList<>();
@@ -152,7 +152,7 @@ public class RedisSetTest {
   @SuppressWarnings("unchecked")
   @Test
   public void setExpirationTimestamp_stores_delta_that_is_stable() throws IOException {
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisSet o1 = createRedisSet(1, 2);
     o1.setExpirationTimestamp(region, null, 999);
     assertThat(o1.hasDelta()).isTrue();
diff --git a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java
index 1c2ffbc..9883ecd 100644
--- a/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java
+++ b/geode-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java
@@ -78,9 +78,9 @@ public class RedisStringTest {
 
   @Test
   public void appendResizesByteArray() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisString redisString = new RedisString(new ByteArrayWrapper(new byte[] {0, 1}));
     ByteArrayWrapper part2 = new ByteArrayWrapper(new byte[] {2, 3, 4, 5});
     int redisStringSize = redisString.strlen();
@@ -91,9 +91,9 @@ public class RedisStringTest {
 
   @Test
   public void appendStoresStableDelta() throws IOException {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisString o1 = new RedisString(new ByteArrayWrapper(new byte[] {0, 1}));
     ByteArrayWrapper part2 = new ByteArrayWrapper(new byte[] {2, 3});
     o1.append(part2, region, null);
@@ -131,10 +131,10 @@ public class RedisStringTest {
 
   @Test
   public void incrThrowsArithmeticErrorWhenNotALong() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0', ' ', '1'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0', ' ', '1'});
     RedisString string = new RedisString(byteArrayWrapper);
     assertThatThrownBy(() -> string.incr(region, byteArrayWrapper))
         .isInstanceOf(NumberFormatException.class);
@@ -142,10 +142,10 @@ public class RedisStringTest {
 
   @Test
   public void incrErrorsWhenValueOverflows() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(
         // max value for signed long
         new byte[] {'9', '2', '2', '3', '3', '7', '2', '0', '3', '6', '8', '5', '4', '7', '7', '5',
             '8', '0', '7'});
@@ -156,10 +156,10 @@ public class RedisStringTest {
 
   @Test
   public void incrIncrementsValueAtGivenKey() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0'});
     RedisString string = new RedisString(byteArrayWrapper);
     string.incr(region, byteArrayWrapper);
     assertThat(string.get().toString()).isEqualTo("11");
@@ -167,10 +167,10 @@ public class RedisStringTest {
 
   @Test
   public void incrbyThrowsNumberFormatExceptionWhenNotALong() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0', ' ', '1'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0', ' ', '1'});
     RedisString string = new RedisString(byteArrayWrapper);
     assertThatThrownBy(() -> string.incrby(region, byteArrayWrapper, 2L))
         .isInstanceOf(NumberFormatException.class);
@@ -178,10 +178,10 @@ public class RedisStringTest {
 
   @Test
   public void incrbyErrorsWhenValueOverflows() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(
         // max value for signed long
         new byte[] {'9', '2', '2', '3', '3', '7', '2', '0', '3', '6', '8', '5', '4', '7', '7', '5',
             '8', '0', '7'});
@@ -192,10 +192,10 @@ public class RedisStringTest {
 
   @Test
   public void incrbyIncrementsValueByGivenLong() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0'});
     RedisString string = new RedisString(byteArrayWrapper);
     string.incrby(region, byteArrayWrapper, 2L);
     assertThat(string.get().toString()).isEqualTo("12");
@@ -203,10 +203,10 @@ public class RedisStringTest {
 
   @Test
   public void incrbyfloatThrowsArithmeticErrorWhenNotADouble() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0', ' ', '1'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0', ' ', '1'});
     RedisString string = new RedisString(byteArrayWrapper);
     assertThatThrownBy(() -> string.incrbyfloat(region, byteArrayWrapper, new BigDecimal("1.1")))
         .isInstanceOf(NumberFormatException.class);
@@ -214,10 +214,10 @@ public class RedisStringTest {
 
   @Test
   public void incrbyfloatIncrementsValueByGivenFloat() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0'});
     RedisString string = new RedisString(byteArrayWrapper);
     string.incrbyfloat(region, byteArrayWrapper, new BigDecimal("2.20"));
     assertThat(string.get().toString()).isEqualTo("12.20");
@@ -225,10 +225,10 @@ public class RedisStringTest {
 
   @Test
   public void decrThrowsNumberFormatExceptionWhenNotALong() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {0});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {0});
     RedisString string = new RedisString(byteArrayWrapper);
     assertThatThrownBy(() -> string.decr(region, byteArrayWrapper))
         .isInstanceOf(NumberFormatException.class);
@@ -236,10 +236,10 @@ public class RedisStringTest {
 
   @Test
   public void decrThrowsArithmeticExceptionWhenDecrementingMin() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(
         new byte[] {'-', '9', '2', '2', '3', '3', '7', '2', '0', '3', '6', '8', '5', '4', '7', '7',
             '5',
             '8', '0', '8'});
@@ -250,10 +250,10 @@ public class RedisStringTest {
 
   @Test
   public void decrDecrementsValue() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0'});
     RedisString string = new RedisString(byteArrayWrapper);
     string.decr(region, byteArrayWrapper);
     assertThat(string.get().toString()).isEqualTo("9");
@@ -261,10 +261,10 @@ public class RedisStringTest {
 
   @Test
   public void decrbyThrowsNumberFormatExceptionWhenNotALong() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {1});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {1});
     RedisString string = new RedisString(byteArrayWrapper);
     assertThatThrownBy(() -> string.decrby(region, byteArrayWrapper, 2))
         .isInstanceOf(NumberFormatException.class);
@@ -272,10 +272,10 @@ public class RedisStringTest {
 
   @Test
   public void decrbyThrowsArithmeticExceptionWhenDecrementingMin() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(
         new byte[] {'-', '9', '2', '2', '3', '3', '7', '2', '0', '3', '6', '8', '5', '4', '7', '7',
             '5',
             '8', '0', '7'});
@@ -286,10 +286,10 @@ public class RedisStringTest {
 
   @Test
   public void decrbyDecrementsValue() {
-    // allows unchecked cast of mock to Region<ByteArrayWrapper, RedisData>
+    // allows unchecked cast of mock to Region<RedisKey, RedisData>
     @SuppressWarnings("unchecked")
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
-    ByteArrayWrapper byteArrayWrapper = new ByteArrayWrapper(new byte[] {'1', '0'});
+    Region<RedisKey, RedisData> region = mock(Region.class);
+    RedisKey byteArrayWrapper = new RedisKey(new byte[] {'1', '0'});
     RedisString string = new RedisString(byteArrayWrapper);
     string.decrby(region, byteArrayWrapper, 2);
     assertThat(string.get().toString()).isEqualTo("8");
@@ -338,7 +338,7 @@ public class RedisStringTest {
   @SuppressWarnings("unchecked")
   @Test
   public void setExpirationTimestamp_stores_delta_that_is_stable() throws IOException {
-    Region<ByteArrayWrapper, RedisData> region = mock(Region.class);
+    Region<RedisKey, RedisData> region = mock(Region.class);
     RedisString o1 = new RedisString(new ByteArrayWrapper(new byte[] {0, 1}));
     o1.setExpirationTimestamp(region, null, 999);
     assertThat(o1.hasDelta()).isTrue();
diff --git a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
index 3d78610..9fcebb1 100644
--- a/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
+++ b/geode-serialization/src/main/java/org/apache/geode/internal/serialization/DataSerializableFixedID.java
@@ -461,7 +461,9 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
   short UPDATE_ENTRY_VERSION_MESSAGE = 158;
   short PR_UPDATE_ENTRY_VERSION_MESSAGE = 159;
 
-  // 160 through 164 unused
+  short REDIS_KEY = 160;
+
+  // 161 through 164 unused
 
   short PR_FETCH_BULK_ENTRIES_MESSAGE = 165;
   short PR_FETCH_BULK_ENTRIES_REPLY_MESSAGE = 166;