You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by do...@apache.org on 2021/05/11 18:07:34 UTC

[geode] branch develop updated: GEODE-9136: make RedisData implement Sizeable (#6296)

This is an automated email from the ASF dual-hosted git repository.

donalevans pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 6a0eba2  GEODE-9136: make RedisData implement Sizeable (#6296)
6a0eba2 is described below

commit 6a0eba25d5ed5cc7146ce6374d39dd12b22745f3
Author: Hale Bales <hb...@vmware.com>
AuthorDate: Tue May 11 11:06:14 2021 -0700

    GEODE-9136: make RedisData implement Sizeable (#6296)
    
     - use Sizeable in RedisString, RedisSet, and RedisHash
     - add unit tests of bytes in use for all three classes
     - update memory overhead tests to reflect decrease in overhead
     - calculate estimated size in unit tests and use constants in the classes
    
    RedisData should implement Sizeable to increase the accuracy of the estimation of bytes in use. It is important that we are close to the right size in order for rebalancing to work properly, however the exact value is not important. These changes make the size for sets and hashes be within 5% of the size measured by using reflection. The accuracy changes as you add entries, and as entries get larger, because of the way that hashes, sets, and ByteArrayWrappers get resized. As we fill u [...]
    
    RedisHash and RedisSet both use constants to account for the overheads of:
    
     - an empty (RedisHash/RedisSet)
     - an empty (HashMap/HashSet) within the (RedisHash/RedisSet)
     - each member in the (hash/set)
     - RedisHash also has an overhead for the first value in the hash.
    We decided to use constants in the RedisData classes in order to reduce the number of calculations we have to do every time we add a new key. The math used to derive those constants live in the RedisSetTest and RedisHashTest classes. The tests of the constants will fail if the internal implementation of the classes changes. If the overheads decrease, then the constants need to be updated to reflect that. If the overheads increase, think through the changes before adding to the overhea [...]
    There is a magical +5 in RedisSetTest.perMemberOverheadConstant_shouldMatchCalculatedValue. We think it came from the resizing, and that 5 happens to work ok, even though it should be dynamically calculated. But as previously said, the benefits do not outweigh the cost.
---
 .../test/dunit/rules/RedisClusterStartupRule.java  |   7 +
 .../data/PartitionedRegionStatsUpdateTest.java     | 419 +++++++++++++++++++++
 .../hash/MemoryOverheadIntegrationTest.java        |   2 +-
 .../string/AbstractAppendIntegrationTest.java      |  41 ++
 .../codeAnalysis/sanctionedDataSerializables.txt   |   8 +-
 .../geode/redis/internal/GeodeRedisServer.java     |   8 +
 .../geode/redis/internal/GeodeRedisService.java    |   6 +
 .../redis/internal/data/AbstractRedisData.java     |  10 +-
 .../geode/redis/internal/data/NullRedisData.java   |   5 +
 .../geode/redis/internal/data/RedisData.java       |   7 +-
 .../geode/redis/internal/data/RedisHash.java       | 146 ++++---
 .../apache/geode/redis/internal/data/RedisSet.java |  74 +++-
 .../geode/redis/internal/data/RedisString.java     |  11 +-
 .../geode/redis/internal/data/RedisHashTest.java   | 376 +++++++++++++++++-
 .../geode/redis/internal/data/RedisSetTest.java    | 246 ++++++++++++
 .../geode/redis/internal/data/RedisStringTest.java |  94 +++++
 16 files changed, 1367 insertions(+), 93 deletions(-)

diff --git a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
index 103c175..e88502b 100644
--- a/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
+++ b/geode-apis-compatible-with-redis/src/commonTest/java/org/apache/geode/test/dunit/rules/RedisClusterStartupRule.java
@@ -92,4 +92,11 @@ public class RedisClusterStartupRule extends ClusterStartupRule {
       service.setEnableUnsupported(enableUnsupported);
     });
   }
+
+  public Long getDataStoreBytesInUseForDataRegion(MemberVM vm) {
+    return vm.invoke(() -> {
+      GeodeRedisService service = ClusterStartupRule.getCache().getService(GeodeRedisService.class);
+      return service.getDataStoreBytesInUseForDataRegion();
+    });
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/data/PartitionedRegionStatsUpdateTest.java b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/data/PartitionedRegionStatsUpdateTest.java
new file mode 100644
index 0000000..4421048
--- /dev/null
+++ b/geode-apis-compatible-with-redis/src/distributedTest/java/org/apache/geode/redis/internal/data/PartitionedRegionStatsUpdateTest.java
@@ -0,0 +1,419 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+
+package org.apache.geode.redis.internal.data;
+
+import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Properties;
+
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Ignore;
+import org.junit.Test;
+import redis.clients.jedis.Jedis;
+
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.dunit.rules.RedisClusterStartupRule;
+
+public class PartitionedRegionStatsUpdateTest {
+
+  @ClassRule
+  public static RedisClusterStartupRule clusterStartUpRule = new RedisClusterStartupRule(3);
+
+  private static MemberVM server1;
+  private static MemberVM server2;
+
+  private static Jedis jedis1;
+  private static Jedis jedis2;
+
+  private static final int JEDIS_TIMEOUT = Math.toIntExact(GeodeAwaitility.getTimeout().toMillis());
+  private static final String LOCAL_HOST = "127.0.0.1";
+  public static final String STRING_KEY = "string key";
+  public static final String SET_KEY = "set key";
+  public static final String HASH_KEY = "hash key";
+  public static final String LONG_APPEND_VALUE = String.valueOf(Integer.MAX_VALUE);
+  public static final String FIELD = "field";
+
+  @BeforeClass
+  public static void classSetup() {
+    Properties locatorProperties = new Properties();
+    locatorProperties.setProperty(MAX_WAIT_TIME_RECONNECT, "15000");
+
+    MemberVM locator = clusterStartUpRule.startLocatorVM(0, locatorProperties);
+    int locatorPort = locator.getPort();
+
+    server1 = clusterStartUpRule.startRedisVM(1, locatorPort);
+    int redisServerPort1 = clusterStartUpRule.getRedisPort(1);
+    jedis1 = new Jedis(LOCAL_HOST, redisServerPort1, JEDIS_TIMEOUT);
+
+    server2 = clusterStartUpRule.startRedisVM(2, locatorPort);
+    int redisServerPort2 = clusterStartUpRule.getRedisPort(1);
+    jedis2 = new Jedis(LOCAL_HOST, redisServerPort2, JEDIS_TIMEOUT);
+  }
+
+  @Before
+  public void setup() {
+    jedis1.flushAll();
+  }
+
+  @Test
+  public void should_showIncreaseInDatastoreBytesInUse_givenStringValueSizeIncreases() {
+    String LONG_APPEND_VALUE = String.valueOf(Integer.MAX_VALUE);
+    jedis1.set(STRING_KEY, "value");
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 1000; i++) {
+      jedis1.append(STRING_KEY, LONG_APPEND_VALUE);
+    }
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isGreaterThan(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showDecreaseInDatastoreBytesInUse_givenStringValueDeleted() {
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    jedis1.set(STRING_KEY, "value");
+
+    long intermediateDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+    assertThat(intermediateDataStoreBytesInUse).isGreaterThan(initialDataStoreBytesInUse);
+
+    jedis1.del(STRING_KEY);
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showDecreaseInDatastoreBytesInUse_givenStringValueShortened() {
+    jedis1.set(STRING_KEY, "longer value");
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    jedis1.set(STRING_KEY, "value");
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isLessThan(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_resetMemoryUsage_givenFlushAllCommand() {
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(initialDataStoreBytesInUse).isEqualTo(0L);
+
+    jedis1.set(STRING_KEY, "value");
+
+    jedis1.flushAll();
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showNoIncreaseInDatastoreBytesInUse_givenStringValueSizeDoesNotIncrease() {
+    jedis1.set(STRING_KEY, "value");
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 1000; i++) {
+      jedis1.set(STRING_KEY, "value");
+    }
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showIncreaseInDatastoreBytesInUse_givenSetValueSizeIncreases() {
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 1000; i++) {
+      jedis1.sadd(SET_KEY, "value" + i);
+    }
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isGreaterThan(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showNoIncreaseInDatastoreBytesInUse_givenSetValueSizeDoesNotIncrease() {
+    jedis1.sadd(SET_KEY, "value");
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 1000; i++) {
+      jedis1.sadd(SET_KEY, "value");
+    }
+
+    assertThat(jedis1.scard(SET_KEY)).isEqualTo(1);
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showDecreaseInDatastoreBytesInUse_givenSetValueDeleted() {
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    jedis1.sadd(SET_KEY, "value");
+
+    long intermediateDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+    assertThat(intermediateDataStoreBytesInUse).isGreaterThan(initialDataStoreBytesInUse);
+
+    jedis1.del(SET_KEY);
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showDecreaseInDatastoreBytesInUse_givenSetValueSizeDecreases() {
+    for (int i = 0; i < 10; i++) {
+      jedis1.sadd(SET_KEY, "value" + i);
+    }
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 10; i++) {
+      jedis1.srem(SET_KEY, "value" + i);
+    }
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isLessThan(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showIncreaseInDatastoreBytesInUse_givenHashValueSizeIncreases() {
+    jedis1.hset(HASH_KEY, FIELD, "value");
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 1000; i++) {
+      jedis1.hset(HASH_KEY, FIELD + i, LONG_APPEND_VALUE);
+    }
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isGreaterThan(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showDecreaseInDatastoreBytesInUse_givenHashValueDeleted() {
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    jedis1.hset(HASH_KEY, FIELD, "value");
+
+    long intermediateDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+    assertThat(intermediateDataStoreBytesInUse).isGreaterThan(initialDataStoreBytesInUse);
+
+    jedis1.del(HASH_KEY);
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showNoIncreaseInDatastoreBytesInUse_givenHSetDoesNotIncreaseHashSize() {
+    jedis2.hset(HASH_KEY, FIELD, "initialvalue"); // two hsets are required to force
+    jedis2.hset(HASH_KEY, FIELD, "value"); // deserialization on both servers
+    // otherwise primary/secondary can disagree on size, and which server is primary varies
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+
+    for (int i = 0; i < 10; i++) {
+      jedis2.hset(HASH_KEY, FIELD, "value");
+    }
+
+    assertThat(jedis2.hgetAll(HASH_KEY).size()).isEqualTo(1);
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showIncreaseInDatastoreBytesInUse_givenHSetNXIncreasesHashSize() {
+    jedis1.hset(HASH_KEY, FIELD, "value");
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 1000; i++) {
+      jedis1.hsetnx(HASH_KEY, FIELD + i, "value");
+    }
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isGreaterThan(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showNoIncreaseInDatastoreBytesInUse_givenHSetNXDoesNotIncreaseHashSize() {
+    jedis1.hset(HASH_KEY, FIELD, "value");
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 1000; i++) {
+      jedis1.hsetnx(HASH_KEY, FIELD, "value");
+    }
+
+    long finalDataStoreBytesInUse = clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(finalDataStoreBytesInUse).isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  /******* confirm that the other member agrees upon size *******/
+
+  @Test
+  public void should_showMembersAgreeUponUsedHashMemory_afterDeltaPropagation() {
+    jedis1.hset(HASH_KEY, FIELD, "initialvalue"); // two hsets are required to force
+    jedis1.hset(HASH_KEY, FIELD, "finalvalue"); // deserialization on both servers
+    // otherwise primary/secondary can disagree on size, and which server is primary varies
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+
+    for (int i = 0; i < 10; i++) {
+      jedis1.hset(HASH_KEY, FIELD, "finalvalue");
+    }
+
+    assertThat(jedis1.hgetAll(HASH_KEY).size()).isEqualTo(1);
+
+    long server2FinalDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+    long server1FinalDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    assertThat(server1FinalDataStoreBytesInUse)
+        .isEqualTo(server2FinalDataStoreBytesInUse)
+        .isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showMembersAgreeUponUsedSetMemory_afterDeltaPropagationWhenAddingMembers() {
+    jedis1.sadd(SET_KEY, "other"); // two sadds are required to force
+    jedis1.sadd(SET_KEY, "value"); // deserialization on both servers
+    // otherwise primary/secondary can disagree on size, and which server is primary varies
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+
+    for (int i = 0; i < 10; i++) {
+      jedis1.sadd(SET_KEY, "value");
+    }
+
+    assertThat(jedis1.scard(SET_KEY)).isEqualTo(2);
+
+    long server1FinalDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+    long server2FinalDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+
+    assertThat(server1FinalDataStoreBytesInUse)
+        .isEqualTo(server2FinalDataStoreBytesInUse)
+        .isEqualTo(initialDataStoreBytesInUse);
+  }
+
+  @Test
+  public void should_showMembersAgreeUponUsedSetMemory_afterDeltaPropagationWhenRemovingMembers() {
+    String value1 = "value1";
+    String value2 = "value2";
+    jedis1.sadd(SET_KEY, value1); // two sadds are required to force
+    jedis1.sadd(SET_KEY, value2); // deserialization on both servers
+    // otherwise primary/secondary can disagree on size, and which server is primary varies
+
+    jedis1.sadd(SET_KEY, "value3");
+
+    jedis2.srem(SET_KEY, value1, value2);
+
+    assertThat(jedis1.scard(SET_KEY)).isEqualTo(1);
+
+    long server1DataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+    long server2DataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+
+    assertThat(server1DataStoreBytesInUse).isEqualTo(server2DataStoreBytesInUse);
+  }
+
+  @Test
+  @Ignore("find a way to force deserialization on both members before enabling")
+  public void should_showMembersAgreeUponUsedStringMemory_afterDeltaPropagation() {
+    String value = "value";
+
+    jedis1.set(STRING_KEY, "12345"); // two sets are required to force
+    jedis1.set(STRING_KEY, value); // deserialization on both servers
+    // otherwise primary/secondary can disagree on size, and which server is primary varies
+
+    long initialDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+
+    for (int i = 0; i < 10; i++) {
+      jedis1.set(STRING_KEY, value);
+    }
+
+    assertThat(jedis1.exists(STRING_KEY)).isTrue();
+    assertThat(jedis2.exists(STRING_KEY)).isTrue();
+
+    assertThat(jedis1.get(STRING_KEY)).isEqualTo(value);
+    assertThat(jedis2.get(STRING_KEY)).isEqualTo(value);
+
+    long server1FinalDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server1);
+    long server2FinalDataStoreBytesInUse =
+        clusterStartUpRule.getDataStoreBytesInUseForDataRegion(server2);
+
+    assertThat(server1FinalDataStoreBytesInUse)
+        .isEqualTo(initialDataStoreBytesInUse);
+
+    assertThat(server2FinalDataStoreBytesInUse)
+        .isEqualTo(initialDataStoreBytesInUse);
+  }
+}
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
index 506365f..b856b79 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/hash/MemoryOverheadIntegrationTest.java
@@ -56,7 +56,7 @@ public class MemoryOverheadIntegrationTest extends AbstractMemoryOverheadIntegra
     result.put(Measurement.STRING, 201);
     result.put(Measurement.SET, 386);
     result.put(Measurement.SET_ENTRY, 72);
-    result.put(Measurement.HASH, 554);
+    result.put(Measurement.HASH, 543);
     result.put(Measurement.HASH_ENTRY, 106);
 
     return result;
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractAppendIntegrationTest.java b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractAppendIntegrationTest.java
index 4e84570..bfc811a 100755
--- a/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractAppendIntegrationTest.java
+++ b/geode-apis-compatible-with-redis/src/integrationTest/java/org/apache/geode/redis/internal/executor/string/AbstractAppendIntegrationTest.java
@@ -20,7 +20,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.junit.After;
 import org.junit.Before;
@@ -105,6 +107,26 @@ public abstract class AbstractAppendIntegrationTest implements RedisIntegrationT
   }
 
   @Test
+  public void testAppend_actuallyIncreasesBucketSize() {
+    int listSize = 1000;
+    String key = "key";
+
+    Map<String, String> info = getInfo(jedis);
+    Long previousMemValue = Long.valueOf(info.get("used_memory"));
+
+    jedis.set(key, "initial");
+    for (int i = 0; i < listSize; i++) {
+      jedis.append(key, "morestuff");
+    }
+
+    info = getInfo(jedis);
+    Long finalMemValue = Long.valueOf(info.get("used_memory"));
+
+
+    assertThat(finalMemValue).isGreaterThan(previousMemValue);
+  }
+
+  @Test
   public void testAppend_withUTF16KeyAndValue() throws IOException {
     String test_utf16_string = "最𐐷𤭢";
     byte[] testBytes = test_utf16_string.getBytes(StandardCharsets.UTF_16);
@@ -131,4 +153,23 @@ public abstract class AbstractAppendIntegrationTest implements RedisIntegrationT
     }
     return strings;
   }
+
+  /**
+   * Convert the values returned by the INFO command into a basic param:value map.
+   */
+  static Map<String, String> getInfo(Jedis jedis) {
+    Map<String, String> results = new HashMap<>();
+    String rawInfo = jedis.info();
+
+    for (String line : rawInfo.split("\r\n")) {
+      int colonIndex = line.indexOf(":");
+      if (colonIndex > 0) {
+        String key = line.substring(0, colonIndex);
+        String value = line.substring(colonIndex + 1);
+        results.put(key, value);
+      }
+    }
+
+    return results;
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 0512760..1bfd0a9 100644
--- a/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ b/geode-apis-compatible-with-redis/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -11,16 +11,16 @@ fromData,8
 toData,8
 
 org/apache/geode/redis/internal/data/RedisHash,2
-toData,15
-fromData,15
+toData,26
+fromData,26
 
 org/apache/geode/redis/internal/data/RedisKey,2
 fromData,20
 toData,17
 
 org/apache/geode/redis/internal/data/RedisSet,2
-toData,15
-fromData,15
+toData,26
+fromData,26
 
 org/apache/geode/redis/internal/data/RedisString,2
 toData,26
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
index d0e20fd..dcf1aff 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisServer.java
@@ -23,6 +23,7 @@ import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.statistics.StatisticsClock;
 import org.apache.geode.internal.statistics.StatisticsClockFactory;
 import org.apache.geode.logging.internal.executors.LoggingExecutors;
@@ -150,4 +151,11 @@ public class GeodeRedisServer {
       shutdown = true;
     }
   }
+
+  @VisibleForTesting
+  protected Long getDataStoreBytesInUseForDataRegion() {
+    PartitionedRegion dataRegion = (PartitionedRegion) this.getRegionProvider().getDataRegion();
+    long dataStoreBytesInUse = dataRegion.getPrStats().getDataStoreBytesInUse();
+    return dataStoreBytesInUse;
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java
index 0885556..de3fa3a 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/GeodeRedisService.java
@@ -17,6 +17,7 @@ package org.apache.geode.redis.internal;
 
 import org.apache.logging.log4j.Logger;
 
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.Cache;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ResourceEvent;
@@ -128,4 +129,9 @@ public class GeodeRedisService implements CacheService, ResourceEventsListener {
   public void setEnableUnsupported(boolean unsupported) {
     redisServer.setAllowUnsupportedCommands(unsupported);
   }
+
+  @VisibleForTesting
+  public Long getDataStoreBytesInUseForDataRegion() {
+    return redisServer.getDataStoreBytesInUseForDataRegion();
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
index ee8acac..7eda753 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/AbstractRedisData.java
@@ -138,10 +138,7 @@ public abstract class AbstractRedisData implements RedisData {
       return false;
     }
     long now = System.currentTimeMillis();
-    if (now < expireTimestamp) {
-      return false;
-    }
-    return true;
+    return now >= expireTimestamp;
   }
 
   @Override
@@ -150,10 +147,7 @@ public abstract class AbstractRedisData implements RedisData {
     if (expireTimestamp == NO_EXPIRATION) {
       return false;
     }
-    if (now < expireTimestamp) {
-      return false;
-    }
-    return true;
+    return now >= expireTimestamp;
   }
 
   @Override
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
index b31e912..1f97f59 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/NullRedisData.java
@@ -122,4 +122,9 @@ public class NullRedisData implements RedisData {
   public void fromDelta(DataInput in) throws InvalidDeltaException {
     throw new UnsupportedOperationException();
   }
+
+  @Override
+  public int getSizeInBytes() {
+    return 0;
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
index fddb58b..d4bce1d 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisData.java
@@ -20,8 +20,9 @@ package org.apache.geode.redis.internal.data;
 import org.apache.geode.Delta;
 import org.apache.geode.cache.Region;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
+import org.apache.geode.internal.size.Sizeable;
 
-public interface RedisData extends Delta, DataSerializableFixedID {
+public interface RedisData extends Delta, DataSerializableFixedID, Sizeable {
 
 
   /**
@@ -57,4 +58,8 @@ public interface RedisData extends Delta, DataSerializableFixedID {
 
   boolean rename(Region<RedisKey, RedisData> region, RedisKey oldKey, RedisKey newKey);
 
+  default boolean getForceRecalculateSize() {
+    return true;
+  }
+
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
index a114973..41c834c 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisHash.java
@@ -54,27 +54,37 @@ import org.apache.geode.redis.internal.netty.Coder;
 
 public class RedisHash extends AbstractRedisData {
   private HashMap<ByteArrayWrapper, ByteArrayWrapper> hash;
-  private ConcurrentHashMap<UUID, List<ByteArrayWrapper>> hScanSnapShots;
-  private ConcurrentHashMap<UUID, Long> hScanSnapShotCreationTimes;
+  private final ConcurrentHashMap<UUID, List<ByteArrayWrapper>> hScanSnapShots;
+  private final ConcurrentHashMap<UUID, Long> hScanSnapShotCreationTimes;
   private ScheduledExecutorService HSCANSnapshotExpirationExecutor = null;
 
-  private static int defaultHscanSnapshotsExpireCheckFrequency =
+  private int sizeInBytes;
+
+  // the following constants were calculated using reflection and math. you can find the tests for
+  // these values in RedisHashTest, which show the way these numbers were calculated. the constants
+  // have the advantage of saving us a lot of computation that would happen every time a new key was
+  // added. if our internal implementation changes, these values may be incorrect. the tests will
+  // catch this change. an increase in overhead should be carefully considered.
+  protected static final int BASE_REDIS_HASH_OVERHEAD = 232;
+  protected static final int HASH_MAP_VALUE_PAIR_OVERHEAD = 96;
+  protected static final int SIZE_OF_OVERHEAD_OF_FIRST_PAIR = 96;
+
+  private static final int defaultHscanSnapshotsExpireCheckFrequency =
       Integer.getInteger("redis.hscan-snapshot-cleanup-interval", 30000);
 
-  private static int defaultHscanSnapshotsMillisecondsToLive =
+  private static final int defaultHscanSnapshotsMillisecondsToLive =
       Integer.getInteger("redis.hscan-snapshot-expiry", 30000);
 
   private int HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS;
   private int MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE;
 
+
   @VisibleForTesting
   public RedisHash(List<ByteArrayWrapper> fieldsToSet, int hscanSnapShotExpirationCheckFrequency,
       int minimumLifeForHscanSnaphot) {
     this();
-
-    this.HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS =
-        hscanSnapShotExpirationCheckFrequency;
-    this.MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE = minimumLifeForHscanSnaphot;
+    HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS = hscanSnapShotExpirationCheckFrequency;
+    MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE = minimumLifeForHscanSnaphot;
 
     Iterator<ByteArrayWrapper> iterator = fieldsToSet.iterator();
     while (iterator.hasNext()) {
@@ -90,26 +100,22 @@ public class RedisHash extends AbstractRedisData {
 
   // for serialization
   public RedisHash() {
-    this.hash = new HashMap<>();
-    this.hScanSnapShots = new ConcurrentHashMap<>();
-    this.hScanSnapShotCreationTimes = new ConcurrentHashMap<>();
+    hash = new HashMap<>();
+    hScanSnapShots = new ConcurrentHashMap<>();
+    hScanSnapShotCreationTimes = new ConcurrentHashMap<>();
 
-    this.HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS =
-        this.defaultHscanSnapshotsExpireCheckFrequency;
+    HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS = defaultHscanSnapshotsExpireCheckFrequency;
 
-    this.MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE =
-        this.defaultHscanSnapshotsMillisecondsToLive;
-  }
+    MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE = defaultHscanSnapshotsMillisecondsToLive;
 
+    sizeInBytes = BASE_REDIS_HASH_OVERHEAD;
+  }
 
   private void expireHScanSnapshots() {
-
-    this.hScanSnapShotCreationTimes.entrySet().forEach(entry -> {
-      Long creationTime = entry.getValue();
+    hScanSnapShotCreationTimes.forEach((client, creationTime) -> {
       long millisecondsSinceCreation = currentTimeMillis() - creationTime;
 
       if (millisecondsSinceCreation >= MINIMUM_MILLISECONDS_FOR_HSCAN_SNAPSHOTS_TO_LIVE) {
-        UUID client = entry.getKey();
         removeHSCANSnapshot(client);
       }
     });
@@ -117,22 +123,22 @@ public class RedisHash extends AbstractRedisData {
 
   @VisibleForTesting
   public ConcurrentHashMap<UUID, List<ByteArrayWrapper>> getHscanSnapShots() {
-    return this.hScanSnapShots;
+    return hScanSnapShots;
   }
 
   private void startHscanSnapshotScheduledRemoval() {
-    final int DELAY = this.HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS;
+    final int DELAY = HSCAN_SNAPSHOTS_EXPIRE_CHECK_FREQUENCY_MILLISECONDS;
 
-    this.HSCANSnapshotExpirationExecutor =
+    HSCANSnapshotExpirationExecutor =
         newSingleThreadScheduledExecutor("GemFireRedis-HSCANSnapshotRemoval-");
 
-    this.HSCANSnapshotExpirationExecutor.scheduleWithFixedDelay(
+    HSCANSnapshotExpirationExecutor.scheduleWithFixedDelay(
         this::expireHScanSnapshots, DELAY, DELAY, MILLISECONDS);
   }
 
   private void shutDownHscanSnapshotScheduledRemoval() {
-    this.HSCANSnapshotExpirationExecutor.shutdown();
-    this.HSCANSnapshotExpirationExecutor = null;
+    HSCANSnapshotExpirationExecutor.shutdown();
+    HSCANSnapshotExpirationExecutor = null;
   }
 
   /**
@@ -144,6 +150,7 @@ public class RedisHash extends AbstractRedisData {
   public synchronized void toData(DataOutput out, SerializationContext context) throws IOException {
     super.toData(out, context);
     DataSerializer.writeHashMap(hash, out);
+    DataSerializer.writeInteger(sizeInBytes, out);
   }
 
   @Override
@@ -151,6 +158,7 @@ public class RedisHash extends AbstractRedisData {
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
     hash = DataSerializer.readHashMap(in);
+    sizeInBytes = DataSerializer.readInteger(in);
   }
 
   @Override
@@ -160,16 +168,50 @@ public class RedisHash extends AbstractRedisData {
 
 
   private synchronized ByteArrayWrapper hashPut(ByteArrayWrapper field, ByteArrayWrapper value) {
-    return hash.put(field, value);
+    if (hash.isEmpty()) {
+      sizeInBytes += SIZE_OF_OVERHEAD_OF_FIRST_PAIR;
+    }
+
+    ByteArrayWrapper oldvalue = hash.put(field, value);
+
+    if (oldvalue == null) {
+      sizeInBytes += calculateSizeOfNewFieldValuePair(field, value);
+    } else {
+      sizeInBytes += value.length() - oldvalue.length();
+    }
+
+    return oldvalue;
   }
 
   private synchronized ByteArrayWrapper hashPutIfAbsent(ByteArrayWrapper field,
       ByteArrayWrapper value) {
-    return hash.putIfAbsent(field, value);
+    if (hash.isEmpty()) {
+      sizeInBytes += SIZE_OF_OVERHEAD_OF_FIRST_PAIR;
+    }
+
+    ByteArrayWrapper oldvalue = hash.putIfAbsent(field, value);
+
+    if (oldvalue == null) {
+      sizeInBytes += calculateSizeOfNewFieldValuePair(field, value);
+    }
+    return oldvalue;
+  }
+
+  private int calculateSizeOfNewFieldValuePair(ByteArrayWrapper field, ByteArrayWrapper value) {
+    return HASH_MAP_VALUE_PAIR_OVERHEAD + field.length() + value.length();
   }
 
   private synchronized ByteArrayWrapper hashRemove(ByteArrayWrapper field) {
-    return hash.remove(field);
+    ByteArrayWrapper oldValue = hash.remove(field);
+    if (oldValue != null) {
+      sizeInBytes -= calculateSizeOfNewFieldValuePair(field, oldValue);
+    }
+
+    if (hash.isEmpty()) {
+      sizeInBytes = BASE_REDIS_HASH_OVERHEAD;
+    }
+
+    return oldValue;
   }
 
   @Override
@@ -220,6 +262,7 @@ public class RedisHash extends AbstractRedisData {
       }
     }
     storeChanges(region, key, deltaInfo);
+
     return fieldsAdded;
   }
 
@@ -287,8 +330,7 @@ public class RedisHash extends AbstractRedisData {
   }
 
   public ImmutablePair<Integer, List<Object>> hscan(UUID clientID, Pattern matchPattern,
-      int count,
-      int startCursor) {
+      int count, int startCursor) {
 
     List<ByteArrayWrapper> keysToScan = getSnapShotOfKeySet(clientID);
 
@@ -310,33 +352,28 @@ public class RedisHash extends AbstractRedisData {
   }
 
   private void removeHSCANSnapshot(UUID clientID) {
-    this.hScanSnapShots.remove(clientID);
-    this.hScanSnapShotCreationTimes.remove(clientID);
+    hScanSnapShots.remove(clientID);
+    hScanSnapShotCreationTimes.remove(clientID);
 
-    if (this.hScanSnapShots.isEmpty()) {
+    if (hScanSnapShots.isEmpty()) {
       shutDownHscanSnapshotScheduledRemoval();
     }
   }
 
-  @SuppressWarnings("unchecked")
   private Pair<Integer, List<Object>> getResultsPair(List<ByteArrayWrapper> keysSnapShot,
-      int startCursor,
-      int count,
-      Pattern matchPattern) {
+      int startCursor, int count, Pattern matchPattern) {
 
     int indexOfKeys = startCursor;
 
-    List<ByteArrayWrapper> resultList = new ArrayList<>();
+    List<Object> resultList = new ArrayList<>();
 
     for (int index = startCursor; index < keysSnapShot.size(); index++) {
-
       if ((index - startCursor) == count) {
         break;
       }
 
       ByteArrayWrapper key = keysSnapShot.get(index);
       indexOfKeys++;
-
       ByteArrayWrapper value = hash.get(key);
       if (value == null) {
         continue;
@@ -355,13 +392,11 @@ public class RedisHash extends AbstractRedisData {
 
     Integer numberOfIterationsCompleted = indexOfKeys - startCursor;
 
-    return new ImmutablePair(numberOfIterationsCompleted, resultList);
+    return new ImmutablePair<>(numberOfIterationsCompleted, resultList);
   }
 
   private int getCursorValueToReturn(int startCursor,
-      int numberOfIterationsCompleted,
-      List<ByteArrayWrapper> keySnapshot) {
-
+      int numberOfIterationsCompleted, List<ByteArrayWrapper> keySnapshot) {
     if (startCursor + numberOfIterationsCompleted >= keySnapshot.size()) {
       return 0;
     }
@@ -369,12 +404,11 @@ public class RedisHash extends AbstractRedisData {
     return (startCursor + numberOfIterationsCompleted);
   }
 
-  @SuppressWarnings("unchecked")
   private List<ByteArrayWrapper> getSnapShotOfKeySet(UUID clientID) {
-    List<ByteArrayWrapper> keySnapShot = this.hScanSnapShots.get(clientID);
+    List<ByteArrayWrapper> keySnapShot = hScanSnapShots.get(clientID);
 
     if (keySnapShot == null) {
-      if (this.hScanSnapShots.isEmpty()) {
+      if (hScanSnapShots.isEmpty()) {
         startHscanSnapshotScheduledRemoval();
       }
       keySnapShot = createKeySnapShot(clientID);
@@ -382,20 +416,18 @@ public class RedisHash extends AbstractRedisData {
     return keySnapShot;
   }
 
-  @SuppressWarnings("unchecked")
   private List<ByteArrayWrapper> createKeySnapShot(UUID clientID) {
 
     List<ByteArrayWrapper> keySnapShot = new ArrayList<>(hash.keySet());
 
-    this.hScanSnapShots.put(clientID, keySnapShot);
-    this.hScanSnapShotCreationTimes.put(clientID, currentTimeMillis());
+    hScanSnapShots.put(clientID, keySnapShot);
+    hScanSnapShotCreationTimes.put(clientID, currentTimeMillis());
 
     return keySnapShot;
   }
 
   public long hincrby(Region<RedisKey, RedisData> region, RedisKey key,
-      ByteArrayWrapper field, long increment)
-      throws NumberFormatException, ArithmeticException {
+      ByteArrayWrapper field, long increment) throws NumberFormatException, ArithmeticException {
     ByteArrayWrapper oldValue = hash.get(field);
     if (oldValue == null) {
       ByteArrayWrapper newValue = new ByteArrayWrapper(Coder.longToBytes(increment));
@@ -430,8 +462,7 @@ public class RedisHash extends AbstractRedisData {
   }
 
   public BigDecimal hincrbyfloat(Region<RedisKey, RedisData> region, RedisKey key,
-      ByteArrayWrapper field, BigDecimal increment)
-      throws NumberFormatException {
+      ByteArrayWrapper field, BigDecimal increment) throws NumberFormatException {
     ByteArrayWrapper oldValue = hash.get(field);
     if (oldValue == null) {
       ByteArrayWrapper newValue = new ByteArrayWrapper(Coder.bigDecimalToBytes(increment));
@@ -505,4 +536,9 @@ public class RedisHash extends AbstractRedisData {
   public KnownVersion[] getSerializationVersions() {
     return null;
   }
+
+  @Override
+  public int getSizeInBytes() {
+    return sizeInBytes;
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
index a7844d5..0eaa737 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisSet.java
@@ -35,9 +35,9 @@ import java.util.regex.Pattern;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 
+import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.Region;
-import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.serialization.DeserializationContext;
 import org.apache.geode.internal.serialization.KnownVersion;
 import org.apache.geode.internal.serialization.SerializationContext;
@@ -46,20 +46,43 @@ import org.apache.geode.redis.internal.delta.DeltaInfo;
 import org.apache.geode.redis.internal.delta.RemsDeltaInfo;
 
 public class RedisSet extends AbstractRedisData {
-
   private HashSet<ByteArrayWrapper> members;
 
-  @SuppressWarnings("unchecked")
+  // the following constants were calculated using reflection and math. you can find the tests for
+  // these values in RedisSetTest, which show the way these numbers were calculated. the constants
+  // have the advantage of saving us a lot of computation that would happen every time a new key was
+  // added. if our internal implementation changes, these values may be incorrect. the tests will
+  // catch this change. an increase in overhead should be carefully considered.
+  // Note: the per member overhead is known to not be constant. it changes as more members are
+  // added, and/or as the members get longer
+  protected static final int BASE_REDIS_SET_OVERHEAD = 112;
+  protected static final int PER_MEMBER_OVERHEAD = 77;
+  protected static final int INTERNAL_HASH_SET_STORAGE_OVERHEAD = 86;
+
+  private int sizeInBytes;
+
   RedisSet(Collection<ByteArrayWrapper> members) {
+    this();
+
     if (members instanceof HashSet) {
       this.members = (HashSet<ByteArrayWrapper>) members;
     } else {
       this.members = new HashSet<>(members);
     }
+
+    if (members.size() > 0) {
+      sizeInBytes += INTERNAL_HASH_SET_STORAGE_OVERHEAD;
+    }
+
+    for (ByteArrayWrapper value : this.members) {
+      sizeInBytes += PER_MEMBER_OVERHEAD + value.length();
+    }
   }
 
   // for serialization
-  public RedisSet() {}
+  public RedisSet() {
+    sizeInBytes += BASE_REDIS_SET_OVERHEAD;
+  }
 
   Pair<BigInteger, List<Object>> sscan(Pattern matchPattern, int count, BigInteger cursor) {
 
@@ -198,14 +221,16 @@ public class RedisSet extends AbstractRedisData {
   @Override
   public synchronized void toData(DataOutput out, SerializationContext context) throws IOException {
     super.toData(out, context);
-    InternalDataSerializer.writeHashSet(members, out);
+    DataSerializer.writeHashSet(members, out);
+    DataSerializer.writeInteger(sizeInBytes, out);
   }
 
   @Override
   public void fromData(DataInput in, DeserializationContext context)
       throws IOException, ClassNotFoundException {
     super.fromData(in, context);
-    members = InternalDataSerializer.readHashSet(in);
+    members = DataSerializer.readHashSet(in);
+    sizeInBytes = DataSerializer.readInteger(in);
   }
 
   @Override
@@ -214,23 +239,39 @@ public class RedisSet extends AbstractRedisData {
   }
 
   private synchronized boolean membersAdd(ByteArrayWrapper memberToAdd) {
-    return members.add(memberToAdd);
+    boolean isAdded = members.add(memberToAdd);
+    if (isAdded) {
+      sizeInBytes += PER_MEMBER_OVERHEAD + memberToAdd.length();
+      if (members.size() == 1) {
+        sizeInBytes += INTERNAL_HASH_SET_STORAGE_OVERHEAD;
+      }
+    }
+    return isAdded;
   }
 
   private boolean membersRemove(ByteArrayWrapper memberToRemove) {
-    return members.remove(memberToRemove);
+    boolean isRemoved = members.remove(memberToRemove);
+    if (isRemoved) {
+      sizeInBytes -= PER_MEMBER_OVERHEAD + memberToRemove.length();
+      if (members.isEmpty()) {
+        sizeInBytes = BASE_REDIS_SET_OVERHEAD;
+      }
+    }
+    return isRemoved;
   }
 
-  private synchronized boolean membersAddAll(AddsDeltaInfo addsDeltaInfo) {
-    return members.addAll(addsDeltaInfo.getAdds());
+  private synchronized void membersAddAll(AddsDeltaInfo addsDeltaInfo) {
+    ArrayList<ByteArrayWrapper> adds = addsDeltaInfo.getAdds();
+    sizeInBytes += adds.stream().mapToInt(a -> a.length() + PER_MEMBER_OVERHEAD).sum();
+    members.addAll(adds);
   }
 
-  private synchronized boolean membersRemoveAll(RemsDeltaInfo remsDeltaInfo) {
-    return members.removeAll(remsDeltaInfo.getRemoves());
+  private synchronized void membersRemoveAll(RemsDeltaInfo remsDeltaInfo) {
+    ArrayList<ByteArrayWrapper> removes = remsDeltaInfo.getRemoves();
+    sizeInBytes -= removes.stream().mapToInt(a -> a.length() + PER_MEMBER_OVERHEAD).sum();
+    members.removeAll(removes);
   }
 
-
-
   /**
    * @param membersToAdd members to add to this set; NOTE this list may by
    *        modified by this call
@@ -317,4 +358,9 @@ public class RedisSet extends AbstractRedisData {
   public KnownVersion[] getSerializationVersions() {
     return null;
   }
+
+  @Override
+  public int getSizeInBytes() {
+    return sizeInBytes;
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java
index f27d105..df04cdb 100644
--- a/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java
+++ b/geode-apis-compatible-with-redis/src/main/java/org/apache/geode/redis/internal/data/RedisString.java
@@ -35,11 +35,15 @@ import org.apache.geode.redis.internal.executor.string.SetOptions;
 import org.apache.geode.redis.internal.netty.Coder;
 
 public class RedisString extends AbstractRedisData {
-
   private int appendSequence;
 
   private ByteArrayWrapper value;
 
+  // this value is empirically derived using ReflectionObjectSizer, which provides an exact size
+  // of the object. It can't be used directly because of its performance impact. This value causes
+  // the size we keep track of to converge to the actual size as it increases.
+  protected static final int BASE_REDIS_STRING_OVERHEAD = 64;
+
   public RedisString(ByteArrayWrapper value) {
     this.value = value;
   }
@@ -729,4 +733,9 @@ public class RedisString extends AbstractRedisData {
   public KnownVersion[] getSerializationVersions() {
     return null;
   }
+
+  @Override
+  public int getSizeInBytes() {
+    return BASE_REDIS_STRING_OVERHEAD + value.length();
+  }
 }
diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
index 198b747..060baa0 100644
--- a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
+++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisHashTest.java
@@ -16,19 +16,27 @@
 
 package org.apache.geode.redis.internal.data;
 
+import static java.lang.Math.round;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.geode.redis.internal.data.RedisHash.BASE_REDIS_HASH_OVERHEAD;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.data.Offset.offset;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.assertj.core.data.Offset;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -40,10 +48,12 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.serialization.ByteArrayDataInput;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.size.ReflectionObjectSizer;
 import org.apache.geode.redis.internal.netty.Coder;
 import org.apache.geode.test.awaitility.GeodeAwaitility;
 
 public class RedisHashTest {
+  private final ReflectionObjectSizer reflectionObjectSizer = ReflectionObjectSizer.getInstance();
 
   @BeforeClass
   public static void beforeClass() {
@@ -73,15 +83,7 @@ public class RedisHashTest {
     assertThat(o2).isEqualTo(o1);
   }
 
-  private RedisHash createRedisHash(String k1, String v1, String k2, String v2) {
-    ArrayList<ByteArrayWrapper> elements = new ArrayList<>();
-    elements.add(createByteArrayWrapper(k1));
-    elements.add(createByteArrayWrapper(v1));
-    elements.add(createByteArrayWrapper(k2));
-    elements.add(createByteArrayWrapper(v2));
-    return new RedisHash(elements);
-  }
-
+  /************* Equals *************/
   @Test
   public void equals_returnsFalse_givenDifferentExpirationTimes() {
     RedisHash o1 = createRedisHash("k1", "v1", "k2", "v2");
@@ -117,6 +119,7 @@ public class RedisHashTest {
     assertThat(o2).isEqualTo(o1);
   }
 
+  /************* HSET *************/
   @SuppressWarnings("unchecked")
   @Test
   public void hset_stores_delta_that_is_stable() throws IOException {
@@ -159,6 +162,7 @@ public class RedisHashTest {
     assertThat(o2).isEqualTo(o1);
   }
 
+  /************* Expiration *************/
   @SuppressWarnings("unchecked")
   @Test
   public void setExpirationTimestamp_stores_delta_that_is_stable() throws IOException {
@@ -176,6 +180,7 @@ public class RedisHashTest {
     assertThat(o2).isEqualTo(o1);
   }
 
+  /************* HSCAN *************/
   @Test
   public void hscanSnaphots_shouldBeEmpty_givenHscanHasNotBeenCalled() {
     RedisHash subject = createRedisHash(100);
@@ -258,11 +263,364 @@ public class RedisHashTest {
     });
   }
 
+  /************* Hash Size *************/
+  /******* constants *******/
+  // these tests contain the math that was used to derive the constants in RedisHash. If these tests
+  // start failing, it is because the overhead of RedisHash has changed. If it has decreased, good
+  // job! You can change the constants in RedisHash to reflect that. If it has increased, carefully
+  // consider that increase before changing the constants.
+  @Test
+  public void constantBaseRedisHashOverhead_shouldEqualCalculatedOverhead() {
+    RedisHash hash = new RedisHash();
+    int baseRedisHashOverhead = reflectionObjectSizer.sizeof(hash);
+
+    assertThat(baseRedisHashOverhead).isEqualTo(BASE_REDIS_HASH_OVERHEAD);
+    assertThat(hash.getSizeInBytes()).isEqualTo(baseRedisHashOverhead);
+  }
+
+  @Test
+  public void constantValuePairOverhead_shouldEqualCalculatedOverhead() {
+    int sizeOfDataForOneFieldValuePair = 16; // initial byte[]s are 8 bytes each
+
+    HashMap<ByteArrayWrapper, ByteArrayWrapper> tempHashmap = new HashMap<>();
+
+    ByteArrayWrapper field1 = new ByteArrayWrapper("a".getBytes());
+    ByteArrayWrapper value1 = new ByteArrayWrapper("b".getBytes());
+    ByteArrayWrapper field2 = new ByteArrayWrapper("c".getBytes());
+    ByteArrayWrapper value2 = new ByteArrayWrapper("d".getBytes());
+
+    tempHashmap.put(field1, value1);
+    int oneEntryHashMapSize = reflectionObjectSizer.sizeof(tempHashmap);
+
+    tempHashmap.put(field2, value2);
+    int twoEntriesHashMapSize = reflectionObjectSizer.sizeof(tempHashmap);
+
+    int expectedValuePairOverhead = twoEntriesHashMapSize - oneEntryHashMapSize
+        - sizeOfDataForOneFieldValuePair;
+
+    assertThat(RedisHash.HASH_MAP_VALUE_PAIR_OVERHEAD).isEqualTo(expectedValuePairOverhead);
+  }
+
+  @Test
+  public void constantFirstPairOverhead_shouldEqual_calculatedOverhead() {
+    HashMap<ByteArrayWrapper, ByteArrayWrapper> tempHashmap = new HashMap<>();
+    int emptyHashMapSize = reflectionObjectSizer.sizeof(tempHashmap);
+
+    ByteArrayWrapper field = new ByteArrayWrapper("a".getBytes());
+    ByteArrayWrapper value = new ByteArrayWrapper("b".getBytes());
+
+    tempHashmap.put(field, value);
+    int oneEntryHashMapSize = reflectionObjectSizer.sizeof(tempHashmap);
+
+    int expectedFirstPairOverhead = oneEntryHashMapSize - emptyHashMapSize
+        - RedisHash.HASH_MAP_VALUE_PAIR_OVERHEAD;
+
+    assertThat(RedisHash.SIZE_OF_OVERHEAD_OF_FIRST_PAIR).isEqualTo(expectedFirstPairOverhead);
+  }
+
+  /******* constructor *******/
+
+  @Test
+  public void should_calculateSize_closeToROSSize_ofIndividualInstanceWithSingleValue() {
+    ArrayList<ByteArrayWrapper> data = new ArrayList<>();
+    data.add(new ByteArrayWrapper("field".getBytes()));
+    data.add(new ByteArrayWrapper("valuethatisverylonggggggggg".getBytes()));
+
+    RedisHash redisHash = new RedisHash(data);
+
+    final int expected = reflectionObjectSizer.sizeof(redisHash);
+    final int actual = redisHash.getSizeInBytes();
+
+    final Offset<Integer> offset = Offset.offset((int) round(expected * 0.05));
+
+    assertThat(actual).isCloseTo(expected, offset);
+  }
+
+  @Test
+  public void should_calculateSize_closeToROSSize_ofIndividualInstanceWithMultipleValues() {
+    RedisHash redisHash =
+        createRedisHash("aSuperLongField", "value", "field", "aSuperLongValue");
+
+    final int expected = reflectionObjectSizer.sizeof(redisHash);
+    final int actual = redisHash.getSizeInBytes();
+
+    final Offset<Integer> offset = Offset.offset((int) round(expected * 0.05));
+
+    assertThat(actual).isCloseTo(expected, offset);
+  }
+
+  @Test
+  public void should_calculateSize_closeToROSSize_withManyEntries() {
+    final String baseField = "longerbase";
+    final String baseValue = "base";
+
+    ArrayList<ByteArrayWrapper> elements = new ArrayList<>();
+    for (int i = 0; i < 10_000; i++) {
+      elements.add(createByteArrayWrapper(baseField + i));
+      elements.add(createByteArrayWrapper(baseValue + i));
+    }
+    RedisHash hash = new RedisHash(elements);
+
+    Integer actual = hash.getSizeInBytes();
+    int expected = reflectionObjectSizer.sizeof(hash);
+    Offset<Integer> offset = offset((int) round(expected * 0.07));
+
+    assertThat(actual).isCloseTo(expected, offset);
+  }
+
+  /******* put *******/
+  @SuppressWarnings("unchecked")
+  @Test
+  public void hsetShould_calculateSize_equalToSizeCalculatedInConstructor_forMultipleEntries() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String baseField = "field";
+    final String baseValue = "value";
+
+    final Region region = mock(Region.class);
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(Object.class, Object.class)).thenReturn(returnData);
+
+    RedisHash hash = new RedisHash();
+    List<ByteArrayWrapper> data = new ArrayList<>();
+    for (int i = 0; i < 10_000; i++) {
+      data.add(new ByteArrayWrapper((baseField + i).getBytes()));
+      data.add(new ByteArrayWrapper((baseValue + i).getBytes()));
+    }
+
+    hash.hset(region, key, data, false);
+    RedisHash expectedRedisHash = new RedisHash(new ArrayList<>(data));
+
+    assertThat(hash.getSizeInBytes()).isEqualTo(expectedRedisHash.getSizeInBytes());
+  }
+
+  @Test
+  public void hsetShould_calculateSizeDifference_whenUpdatingExistingEntry_newIsShorterThanOld() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String field = "field";
+    final String initialValue = "initialValue";
+    final String finalValue = "finalValue";
+
+    testThatSizeIsUpdatedWhenUpdatingValue(key, field, initialValue, finalValue);
+  }
+
+  @Test
+  public void hsetShould_calculateSizeDifference_whenUpdatingExistingEntry_oldIsShorterThanNew() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String field = "field";
+    final String initialValue = "initialValue";
+    final String finalValue = "longerfinalValue";
+
+    testThatSizeIsUpdatedWhenUpdatingValue(key, field, initialValue, finalValue);
+  }
+
+  @Test
+  public void hsetShould_calculateSizeDifference_whenUpdatingExistingEntry_valuesAreSameLength() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String field = "field";
+    final String initialValue = "initialValue";
+    final String finalValue = "finalValueee";
+
+    testThatSizeIsUpdatedWhenUpdatingValue(key, field, initialValue, finalValue);
+  }
+
+  public void testThatSizeIsUpdatedWhenUpdatingValue(final RedisKey key, final String field,
+      final String initialValue, final String finalValue) {
+    final Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+
+    RedisHash hash = new RedisHash();
+    List<ByteArrayWrapper> initialData = new ArrayList<>();
+    initialData.add(new ByteArrayWrapper(field.getBytes()));
+    initialData.add(new ByteArrayWrapper(initialValue.getBytes()));
+
+    hash.hset(region, key, initialData, false);
+    RedisHash expectedRedisHash = new RedisHash(new ArrayList<>(initialData));
+
+    List<ByteArrayWrapper> finalData = new ArrayList<>();
+    finalData.add(new ByteArrayWrapper(field.getBytes()));
+    finalData.add(new ByteArrayWrapper(finalValue.getBytes()));
+
+    hash.hset(region, key, finalData, false);
+
+    int expectedUpdatedRedisHashSize = expectedRedisHash.getSizeInBytes()
+        + (finalValue.getBytes().length - initialValue.getBytes().length);
+
+    assertThat(hash.getSizeInBytes()).isEqualTo(expectedUpdatedRedisHashSize);
+  }
+
+  /******* put if absent *******/
+  @Test
+  public void putIfAbsentShould_calculateSizeEqualToSizeCalculatedInConstructor_forMultipleUniqueEntries() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String baseField = "field";
+    final String baseValue = "value";
+
+    final Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+
+    RedisHash hash = new RedisHash();
+    List<ByteArrayWrapper> data = new ArrayList<>();
+    for (int i = 0; i < 10_000; i++) {
+      data.add(new ByteArrayWrapper((baseField + i).getBytes()));
+      data.add(new ByteArrayWrapper((baseValue + i).getBytes()));
+    }
+
+    hash.hset(region, key, data, true);
+    RedisHash expectedRedisHash = new RedisHash(new ArrayList<>(data));
+    Offset<Integer> offset = offset((int) round(expectedRedisHash.getSizeInBytes() * 0.05));
+
+    assertThat(hash.getSizeInBytes()).isCloseTo(expectedRedisHash.getSizeInBytes(), offset);
+  }
+
+  @Test
+  public void putIfAbsentShould_notChangeSize_whenSameDataIsSetTwice() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String baseField = "field";
+    final String baseValue = "value";
+
+    final Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+
+    RedisHash hash = new RedisHash();
+    List<ByteArrayWrapper> data = new ArrayList<>();
+    for (int i = 0; i < 10_000; i++) {
+      data.add(new ByteArrayWrapper((baseField + i).getBytes()));
+      data.add(new ByteArrayWrapper((baseValue + i).getBytes()));
+    }
+
+    hash.hset(region, key, data, true);
+
+    int expectedSize = hash.getSizeInBytes();
+
+    hash.hset(region, key, data, true);
+
+    assertThat(hash.getSizeInBytes()).isEqualTo(expectedSize);
+  }
+
+  @Test
+  public void putIfAbsent_shouldNotChangeSize_whenPuttingToExistingFields() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String baseField = "field";
+    final String initialBaseValue = "value";
+    final String finalBaseValue = "longerValue";
+
+    final Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+
+    RedisHash hash = new RedisHash();
+    List<ByteArrayWrapper> initialData = new ArrayList<>();
+    for (int i = 0; i < 10_000; i++) {
+      initialData.add(new ByteArrayWrapper((baseField + i).getBytes()));
+      initialData.add(new ByteArrayWrapper((initialBaseValue + i).getBytes()));
+    }
+
+    hash.hset(region, key, initialData, true);
+
+    int expectedSize = hash.getSizeInBytes();
+
+    List<ByteArrayWrapper> finalData = new ArrayList<>();
+    for (int i = 0; i < 10_000; i++) {
+      finalData.add(new ByteArrayWrapper((baseField + i).getBytes()));
+      finalData.add(new ByteArrayWrapper((finalBaseValue + i).getBytes()));
+    }
+
+    hash.hset(region, key, finalData, true);
+
+    assertThat(hash.getSizeInBytes()).isEqualTo(expectedSize);
+  }
+
+  /******* remove *******/
+  @Test
+  public void sizeShouldDecrease_whenValueIsRemoved() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String baseField = "field";
+    final String baseValue = "value";
+    final Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+
+    List<ByteArrayWrapper> data = new ArrayList<>();
+    List<ByteArrayWrapper> dataToRemove = new ArrayList<>();
+    ByteArrayWrapper field1 = new ByteArrayWrapper((baseField + 1).getBytes());
+    ByteArrayWrapper value1 = new ByteArrayWrapper((baseValue + 1).getBytes());
+    ByteArrayWrapper field2 = new ByteArrayWrapper((baseField + 2).getBytes());
+    ByteArrayWrapper value2 = new ByteArrayWrapper((baseValue + 2).getBytes());
+    data.add(field1);
+    data.add(value1);
+    data.add(field2);
+    data.add(value2);
+    dataToRemove.add(field1);
+
+    RedisHash redisHash = new RedisHash(data);
+    int initialSize = redisHash.getSizeInBytes();
+
+    redisHash.hdel(region, key, dataToRemove);
+
+    int expectedSize = initialSize - RedisHash.HASH_MAP_VALUE_PAIR_OVERHEAD - field1.length();
+    Offset<Integer> offset = Offset.offset((int) round(expectedSize * 0.05));
+
+    assertThat(redisHash.getSizeInBytes()).isCloseTo(expectedSize, offset);
+  }
+
+  @Test
+  public void dataStoreBytesInUse_shouldReturnToHashOverhead_whenAllFieldsAreRemoved() {
+    final RedisKey key = new RedisKey("key".getBytes());
+    final String baseField = "field";
+    final String baseValue = "value";
+    final Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+
+    RedisHash hash = new RedisHash();
+    final int baseRedisHashOverhead = hash.getSizeInBytes();
+
+    List<ByteArrayWrapper> data = new ArrayList<>();
+    for (int i = 0; i < 100; i++) {
+      data.add(new ByteArrayWrapper((baseField + i).getBytes()));
+      data.add(new ByteArrayWrapper((baseValue + i).getBytes()));
+    }
+
+    hash.hset(region, key, data, false);
+
+    assertThat(hash.getSizeInBytes()).isGreaterThan(0);
+
+    for (int i = 0; i < 100; i++) {
+      List<ByteArrayWrapper> toRm = new ArrayList<>();
+      toRm.add(new ByteArrayWrapper((baseField + i).getBytes()));
+      hash.hdel(region, key, toRm);
+    }
+
+    assertThat(hash.getSizeInBytes()).isEqualTo(baseRedisHashOverhead);
+    assertThat(hash.hgetall()).isEmpty();
+  }
+
+  /************* Helper Methods *************/
   private RedisHash createRedisHash(int NumberOfFields) {
     ArrayList<ByteArrayWrapper> elements = createListOfDataElements(NumberOfFields);
     return new RedisHash(elements);
   }
 
+  private RedisHash createRedisHash(String k1, String v1, String k2, String v2) {
+    ArrayList<ByteArrayWrapper> elements = new ArrayList<>();
+
+    ByteArrayWrapper key1 = createByteArrayWrapper(k1);
+    ByteArrayWrapper value1 = createByteArrayWrapper(v1);
+    ByteArrayWrapper key2 = createByteArrayWrapper(k2);
+    ByteArrayWrapper value2 = createByteArrayWrapper(v2);
+
+    elements.add(key1);
+    elements.add(value1);
+
+    elements.add(key2);
+    elements.add(value2);
+
+    return new RedisHash(elements);
+  }
+
   private ArrayList<ByteArrayWrapper> createListOfDataElements(int NumberOfFields) {
     ArrayList<ByteArrayWrapper> elements = new ArrayList<>();
     for (int i = 0; i < NumberOfFields; i++) {
diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java
index 4a82413..20e5cc8 100644
--- a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java
+++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisSetTest.java
@@ -16,8 +16,14 @@
 
 package org.apache.geode.redis.internal.data;
 
+import static org.apache.geode.redis.internal.data.RedisSet.BASE_REDIS_SET_OVERHEAD;
+import static org.apache.geode.redis.internal.data.RedisSet.INTERNAL_HASH_SET_STORAGE_OVERHEAD;
+import static org.apache.geode.redis.internal.data.RedisSet.PER_MEMBER_OVERHEAD;
+import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.DataOutput;
 import java.io.IOException;
@@ -25,7 +31,10 @@ import java.lang.reflect.Modifier;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
+import java.util.Random;
 
+import org.assertj.core.data.Offset;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -36,8 +45,11 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.serialization.ByteArrayDataInput;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.size.ReflectionObjectSizer;
 
 public class RedisSetTest {
+  private final ReflectionObjectSizer reflectionObjectSizer = ReflectionObjectSizer.getInstance();
+  private final double percentTolerance = 0.05;
 
   @BeforeClass
   public static void beforeClass() {
@@ -165,4 +177,238 @@ public class RedisSetTest {
     o2.fromDelta(in);
     assertThat(o2).isEqualTo(o1);
   }
+
+  /************* test size of bytes in use *************/
+
+  /******* constructor *******/
+  @Test
+  public void should_calculateSize_equalToROS_withNoMembers() {
+    HashSet<ByteArrayWrapper> members = new HashSet<>();
+    RedisSet set = new RedisSet(members);
+
+    int expected = reflectionObjectSizer.sizeof(set);
+    int actual = set.getSizeInBytes();
+
+    assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_calculateSize_equalToROS_withSingleMember() {
+    HashSet<ByteArrayWrapper> members = new HashSet<>();
+    members.add(new ByteArrayWrapper("value".getBytes()));
+    RedisSet set = new RedisSet(members);
+
+    int expected = reflectionObjectSizer.sizeof(set);
+    int actual = set.getSizeInBytes();
+
+    assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_calculateSize_closeToROS_withVaryingMemberCounts() {
+    for (int i = 0; i < 1024; i += 16) {
+      RedisSet set = createRedisSetOfSpecifiedSize(i);
+
+      int expected = reflectionObjectSizer.sizeof(set);
+      Long actual = (long) set.getSizeInBytes();
+      Offset<Long> offset = Offset.offset(Math.round(expected * percentTolerance));
+
+      assertThat(actual).isCloseTo(expected, offset);
+    }
+  }
+
+  @Test
+  public void should_calculateSize_closeToROS_withVaryingMemberSize() {
+    for (int i = 0; i < 1_600; i++) {
+      RedisSet set = createRedisSetWithMemberOfSpecifiedSize(i * 64);
+      int expected = reflectionObjectSizer.sizeof(set);
+      Long actual = (long) set.getSizeInBytes();
+      Offset<Long> offset = Offset.offset(Math.round(expected * percentTolerance));
+
+      assertThat(actual).isCloseTo(expected, offset);
+    }
+  }
+
+  /******* sadd *******/
+  @Test
+  public void bytesInUse_sadd_withOneMember() {
+    RedisSet set = new RedisSet(new ArrayList<>());
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+    final RedisKey key = new RedisKey("key".getBytes());
+    String valueString = "value";
+
+    final ByteArrayWrapper value = new ByteArrayWrapper(valueString.getBytes());
+    ArrayList<ByteArrayWrapper> members = new ArrayList<>();
+    members.add(value);
+
+    set.sadd(members, region, key);
+
+    int actual = set.getSizeInBytes();
+    int expected = BASE_REDIS_SET_OVERHEAD + INTERNAL_HASH_SET_STORAGE_OVERHEAD
+        + PER_MEMBER_OVERHEAD + valueString.length();
+
+    assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void bytesInUse_sadd_withMultipleMembers() {
+    RedisSet set = new RedisSet(new ArrayList<>());
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+    final RedisKey key = new RedisKey("key".getBytes());
+    String baseString = "value";
+    int currentDataSize = 0;
+
+    for (int i = 0; i < 1_000; i++) {
+      ArrayList<ByteArrayWrapper> members = new ArrayList<>();
+      String valueString = baseString + i;
+      currentDataSize += valueString.length();
+      final ByteArrayWrapper value = new ByteArrayWrapper((valueString).getBytes());
+      members.add(value);
+      set.sadd(members, region, key);
+
+      long actual = set.getSizeInBytes();
+      long expected = BASE_REDIS_SET_OVERHEAD + INTERNAL_HASH_SET_STORAGE_OVERHEAD
+          + (PER_MEMBER_OVERHEAD * (i + 1)) + currentDataSize;
+      Offset<Long> offset = Offset.offset(Math.round(expected * percentTolerance));
+
+      assertThat(actual).isCloseTo(expected, offset);
+    }
+  }
+
+  /******* remove *******/
+  @Test
+  public void size_shouldDecrease_WhenValueIsRemoved() {
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+    final RedisKey key = new RedisKey("key".getBytes());
+    final ByteArrayWrapper value1 = new ByteArrayWrapper("value1".getBytes());
+    final ByteArrayWrapper value2 = new ByteArrayWrapper("value2".getBytes());
+
+    ArrayList<ByteArrayWrapper> members = new ArrayList<>();
+    members.add(value1);
+    members.add(value2);
+    RedisSet set = new RedisSet(members);
+
+    int initialSize = set.getSizeInBytes();
+
+    ArrayList<ByteArrayWrapper> membersToRemove = new ArrayList<>();
+    membersToRemove.add(value1);
+    set.srem(membersToRemove, region, key);
+
+    long finalSize = set.getSizeInBytes();
+    long expectedSize = initialSize - value1.length() - PER_MEMBER_OVERHEAD;
+    Offset<Long> offset = Offset.offset(Math.round(expectedSize * percentTolerance));
+
+    assertThat(finalSize).isCloseTo(expectedSize, offset);
+  }
+
+  @Test
+  public void remove_sizeShouldReturnToBaseOverhead_whenLastMemberIsRemoved() {
+    Region<RedisKey, RedisData> region = uncheckedCast(mock(Region.class));
+    final RedisData returnData = mock(RedisData.class);
+    when(region.put(any(RedisKey.class), any(RedisData.class))).thenReturn(returnData);
+    final RedisKey key = new RedisKey("key".getBytes());
+    final ByteArrayWrapper value = new ByteArrayWrapper("value".getBytes());
+
+    ArrayList<ByteArrayWrapper> members = new ArrayList<>();
+    members.add(value);
+    RedisSet set = new RedisSet(members);
+
+    assertThat(set.getSizeInBytes()).isGreaterThan(BASE_REDIS_SET_OVERHEAD);
+
+    set.srem(members, region, key);
+
+    int finalSize = set.getSizeInBytes();
+
+    assertThat(finalSize).isEqualTo(BASE_REDIS_SET_OVERHEAD);
+  }
+
+  /******** constants *******/
+  // these tests contain the math that was used to derive the constants in RedisSet. If these tests
+  // start failing, it is because the overhead of RedisSet has changed. If it has decreased, good
+  // job! You can change the constants in RedisSet to reflect that. If it has increased, carefully
+  // consider that increase before changing the constants.
+  // Note: the per member overhead is known to not be constant. it changes as more members are
+  // added, and/or as the members get longer
+  @Test
+  public void baseOverheadConstant_shouldMatchCalculatedValue() {
+    HashSet<ByteArrayWrapper> members = new HashSet<>();
+    int baseRedisSetOverhead = reflectionObjectSizer.sizeof(new RedisSet(members));
+
+    assertThat(baseRedisSetOverhead).isEqualTo(BASE_REDIS_SET_OVERHEAD);
+  }
+
+  @Test
+  public void perMemberOverheadConstant_shouldMatchCalculatedValue() {
+    HashSet<ByteArrayWrapper> tempHashSet = new HashSet<>();
+    ByteArrayWrapper member1 = new ByteArrayWrapper("ab".getBytes());
+    ByteArrayWrapper member2 = new ByteArrayWrapper("bc".getBytes());
+    tempHashSet.add(member1);
+    int oneEntryHashSetSize = reflectionObjectSizer.sizeof(tempHashSet);
+
+    tempHashSet.add(member2);
+    int twoEntriesHashSetSize = reflectionObjectSizer.sizeof(tempHashSet);
+
+    int perMemberOverhead = twoEntriesHashSetSize - oneEntryHashSetSize + 5;
+
+    assertThat(perMemberOverhead).isEqualTo(PER_MEMBER_OVERHEAD);
+  }
+
+  @Test
+  public void internalHashsetStorageOverheadConstant_shouldMatchCalculatedValue() {
+    HashSet<ByteArrayWrapper> tempHashSet = new HashSet<>();
+    int baseHashSetSize = reflectionObjectSizer.sizeof(tempHashSet);
+
+    ByteArrayWrapper baw1 = new ByteArrayWrapper("a".getBytes());
+    ByteArrayWrapper baw2 = new ByteArrayWrapper("b".getBytes());
+
+    tempHashSet.add(baw1);
+    tempHashSet.add(baw2);
+
+    int twoEntryHashSetSize = reflectionObjectSizer.sizeof(tempHashSet);
+
+    int internalHashsetStorageOverhead =
+        twoEntryHashSetSize - (2 * PER_MEMBER_OVERHEAD) - baseHashSetSize;
+
+    assertThat(internalHashsetStorageOverhead).isEqualTo(INTERNAL_HASH_SET_STORAGE_OVERHEAD);
+  }
+
+  /******* helper methods *******/
+  private RedisSet createRedisSetOfSpecifiedSize(int setSize) {
+    ArrayList<ByteArrayWrapper> arrayList = new ArrayList<>();
+    for (int i = 0; i < setSize; i++) {
+      arrayList.add(new ByteArrayWrapper(("abcdefgh" + i).getBytes()));
+    }
+    return new RedisSet(arrayList);
+  }
+
+  private RedisSet createRedisSetWithMemberOfSpecifiedSize(int memberSize) {
+    ArrayList<ByteArrayWrapper> arrayList = new ArrayList<>();
+    ByteArrayWrapper member =
+        new ByteArrayWrapper(createMemberOfSpecifiedSize("a", memberSize).getBytes());
+    if (member.length() > 0) {
+      arrayList.add(member);
+    }
+    return new RedisSet(arrayList);
+  }
+
+  private String createMemberOfSpecifiedSize(final String base, final int stringSize) {
+    Random random = new Random();
+    if (base.length() > stringSize) {
+      return "";
+    }
+    StringBuilder sb = new StringBuilder(stringSize);
+    sb.append(base);
+    for (int i = base.length(); i < stringSize; i++) {
+      int randy = random.nextInt(10);
+      sb.append(randy);
+    }
+    return sb.toString();
+  }
+
 }
diff --git a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java
index 9883ecd..50481bb 100644
--- a/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java
+++ b/geode-apis-compatible-with-redis/src/test/java/org/apache/geode/redis/internal/data/RedisStringTest.java
@@ -16,6 +16,7 @@
 
 package org.apache.geode.redis.internal.data;
 
+import static org.apache.geode.redis.internal.data.RedisString.BASE_REDIS_STRING_OVERHEAD;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.Mockito.mock;
@@ -35,8 +36,10 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.serialization.ByteArrayDataInput;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.size.ReflectionObjectSizer;
 
 public class RedisStringTest {
+  private final ReflectionObjectSizer reflectionObjectSizer = ReflectionObjectSizer.getInstance();
 
   @BeforeClass
   public static void beforeClass() {
@@ -351,4 +354,95 @@ public class RedisStringTest {
     o2.fromDelta(in);
     assertThat(o2).isEqualTo(o1);
   }
+
+  /************* Size in Bytes Tests *************/
+  /******* constructors *******/
+  @Test
+  public void should_calculateSize_equalToROSSize_ofLargeStrings() {
+    String javaString = makeStringOfSpecifiedSize(10_000);
+    RedisString string = new RedisString(new ByteArrayWrapper(javaString.getBytes()));
+
+    int actual = string.getSizeInBytes();
+    int expected = reflectionObjectSizer.sizeof(string);
+
+    assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void should_calculateSize_equalToROSSize_ofStringOfVariousSizes() {
+    String javaString;
+    for (int i = 0; i < 512; i += 8) {
+      javaString = makeStringOfSpecifiedSize(i);
+      RedisString string = new RedisString(new ByteArrayWrapper(javaString.getBytes()));
+
+      int expected = reflectionObjectSizer.sizeof(string);
+      int actual = string.getSizeInBytes();
+
+      assertThat(actual).isEqualTo(expected);
+    }
+  }
+
+  /******* changing values *******/
+  @Test
+  public void changingStringValue_toShorterString_shouldDecreaseSizeInBytes() {
+    String baseString = "baseString";
+    RedisString string = new RedisString(new ByteArrayWrapper((baseString + "asdf").getBytes()));
+
+    int initialSize = string.getSizeInBytes();
+
+    string.set(new ByteArrayWrapper(baseString.getBytes()));
+
+    int finalSize = string.getSizeInBytes();
+
+    assertThat(finalSize).isEqualTo(initialSize - 4);
+  }
+
+  @Test
+  public void changingStringValue_toLongerString_shouldIncreaseSizeInBytes() {
+    String baseString = "baseString";
+    RedisString string = new RedisString(new ByteArrayWrapper(baseString.getBytes()));
+
+    int initialSize = string.getSizeInBytes();
+
+    string.set(new ByteArrayWrapper((baseString + "asdf").getBytes()));
+
+    int finalSize = string.getSizeInBytes();
+
+    assertThat(finalSize).isEqualTo(initialSize + 4);
+  }
+
+  @Test
+  public void changingStringValue_toEmptyString_shouldDecreaseSizeInBytes_toBaseSize() {
+    String baseString = "baseString";
+    RedisString string = new RedisString(new ByteArrayWrapper((baseString + "asdf").getBytes()));
+
+    string.set(new ByteArrayWrapper("".getBytes()));
+
+    int finalSize = string.getSizeInBytes();
+
+    assertThat(finalSize).isEqualTo(BASE_REDIS_STRING_OVERHEAD);
+  }
+
+  /******* constants *******/
+  // this test contains the math that was used to derive the constants in RedisString. If this test
+  // starts failing, it is because the overhead of RedisString has changed. If it has decreased,
+  // good job! You can change the constant in RedisString to reflect that. If it has increased,
+  // carefully consider that increase before changing the constant.
+  @Test
+  public void overheadConstants_shouldMatchCalculatedValue() {
+    RedisString redisString = new RedisString(new ByteArrayWrapper("".getBytes()));
+    int calculatedSize = reflectionObjectSizer.sizeof(redisString);
+
+    assertThat(BASE_REDIS_STRING_OVERHEAD).isEqualTo(calculatedSize);
+  }
+
+  /******* helper methods *******/
+
+  private String makeStringOfSpecifiedSize(final int stringSize) {
+    StringBuffer sb = new StringBuffer(stringSize);
+    for (int i = 0; i < stringSize; i++) {
+      sb.append("a");
+    }
+    return sb.toString();
+  }
 }