You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by on...@apache.org on 2021/03/04 02:43:48 UTC

[geode] 02/04: GEODE-8894 allow individual deltas to trigger bucket size recalculation (#5978)

This is an automated email from the ASF dual-hosted git repository.

onichols pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d1b8319d7bc46de6b10480ebb0c9df41aeae894e
Author: Ray Ingles <ri...@pivotal.io>
AuthorDate: Tue Feb 23 12:20:45 2021 -0500

    GEODE-8894 allow individual deltas to trigger bucket size recalculation (#5978)
    
    * add interface for forcing size recalculation on buckets
    * Allow individual deltas to trigger bucket size recalculation
    * remove all deprecated methods and redundant tests
    * update Javadoc and remove Event instance variable
    *  add EntryEventImpl unit tests for size recalc
    * remove unused classes/code
    * remove unused TestKey
    * reorganize and clarify region creation
    
    Co-authored-by: Ray Ingles <ri...@vmware.com>
    
    (cherry-picked from 3a21c2852746f19755ac302f584ca5b8908eae2e)
---
 .../internal/cache/DeltaFaultInDUnitTest.java      |   6 +-
 .../cache/DeltaForceSizingFlagDUnitTest.java       | 315 +++++++++++++++++++++
 .../org/apache/geode/internal/cache/TestDelta.java |  28 +-
 .../src/main/java/org/apache/geode/Delta.java      |  35 ++-
 .../apache/geode/internal/cache/BucketRegion.java  |   1 +
 .../geode/internal/cache/EntryEventImpl.java       |  19 +-
 .../apache/geode/internal/cache/LocalRegion.java   |   3 -
 .../geode/internal/cache/EntryEventImplTest.java   |  56 ++++
 8 files changed, 437 insertions(+), 26 deletions(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java
index 33af502..3f233d6 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java
@@ -32,10 +32,6 @@ import org.apache.geode.test.dunit.SerializableRunnable;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
 
-/**
- * Test that the bucket size does not go negative when we fault out and in a delta object.
- *
- */
 
 public class DeltaFaultInDUnitTest extends JUnit4CacheTestCase {
 
@@ -45,7 +41,7 @@ public class DeltaFaultInDUnitTest extends JUnit4CacheTestCase {
   }
 
   @Test
-  public void test() throws Exception {
+  public void bucketSizeShould_notGoNegative_onFaultInDeltaObject() throws Exception {
     final Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
new file mode 100644
index 0000000..df95e6e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+  private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+  public static final String SMALLER_DELTA_DATA = "12345";
+  public static final String LARGER_DELTA_DATA = "1234567890";
+  public static final String DELTA_KEY = "a_key";
+  public static final String RR_DISK_STORE_NAME = "_forceRecalculateSize_replicate_store";
+  private static final Logger logger = LogService.getLogger();
+
+  @Rule
+  public ClusterStartupRule cluster = new ClusterStartupRule();
+
+  protected MemberVM locator;
+  protected MemberVM server1;
+  protected MemberVM server2;
+
+  @Before
+  public void setup() {
+    int locatorPort;
+    locator = cluster.startLocatorVM(0);
+    locatorPort = locator.getPort();
+
+    server1 = cluster.startServerVM(1, locatorPort);
+    server2 = cluster.startServerVM(2, locatorPort);
+  }
+
+  @Test
+  public void testRRMemLRUDelta() {
+    doRRMemLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testRRMemLRUDeltaAndFlag() {
+    doRRMemLRUDeltaTest(true);
+  }
+
+  @Test
+  public void testPRNoLRUDelta() {
+    doPRNoLRUDeltaTest(false);
+  }
+
+  @Test
+  public void testPRNoLRUAndFlagDelta() {
+    doPRNoLRUDeltaTest(true);
+  }
+
+  private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createRR(server1);
+    createRR(server2);
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, shouldSizeChange);
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_SERIALIZED);
+    assertThat(getObjectSizerInvocations(vm1)).isEqualTo(1);
+    assertThat(getObjectSizerInvocations(vm2)).isEqualTo(0);
+
+    long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    // Update the delta
+    put(vm1, delta1);
+
+    assertValueType(vm1, ValueType.RAW_VALUE);
+    assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+    assertThat(getObjectSizerInvocations(vm1)).isEqualTo(2);
+
+    long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+    long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+    assertThat(finalEvictionSize0 - origEvictionSize0).isEqualTo(5);
+    if (shouldSizeChange) {
+      assertThat(getObjectSizerInvocations(vm2)).isEqualTo(1);
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertThat(finalEvictionSize1 - origEvictionSize1).isNotEqualTo(0);
+    } else {
+      // we invoke the sizer once when we deserialize the original to apply the delta to it
+      assertThat(getObjectSizerInvocations(vm2)).isEqualTo(0);
+      assertThat(finalEvictionSize1 - origEvictionSize1).isEqualTo(0);
+    }
+  }
+
+  private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+    VM vm1 = server1.getVM();
+    VM vm2 = server2.getVM();
+
+    createPR(server1);
+    createPR(server2);
+
+    TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, shouldSizeChange);
+    put(vm1, delta1);
+    long origPRSize0 = getSizeFromPRStats(vm1);
+    long origPRSize1 = getSizeFromPRStats(vm2);
+
+    // Update the delta
+    delta1.info = LARGER_DELTA_DATA;
+    delta1.hasDelta = true;
+    put(vm1, delta1);
+    long finalPRSize0 = getSizeFromPRStats(vm1);
+    long finalPRSize1 = getSizeFromPRStats(vm2);
+
+    if (shouldSizeChange) {
+      // I'm not sure what the change in size should be, because we went
+      // from serialized to deserialized
+      assertThat(finalPRSize0 - origPRSize0).isNotEqualTo(0);
+      assertThat(finalPRSize1 - origPRSize1).isNotEqualTo(0);
+    } else {
+      assertThat(finalPRSize0 - origPRSize0).isEqualTo(0);
+      assertThat(finalPRSize1 - origPRSize1).isEqualTo(0);
+    }
+  }
+
+  private long getSizeFromPRStats(VM vm0) {
+    return vm0.invoke("getSizeFromPRStats", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+      if (region instanceof PartitionedRegion) {
+        long total = 0;
+        PartitionedRegion pr = (PartitionedRegion) region;
+        int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+        for (int i = 0; i < totalNumBuckets; i++) {
+          total += pr.getDataStore().getBucketSize(i);
+        }
+        return total;
+      } else {
+        return 0L;
+      }
+    });
+  }
+
+  private long getSizeFromEvictionStats(VM vm0) {
+    return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+      Cache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+      return region.getEvictionCounter();
+    });
+  }
+
+  private int getObjectSizerInvocations(VM vm0) {
+    return vm0.invoke("getObjectSizerInvocations", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+      return getObjectSizerInvocations(region);
+    });
+  }
+
+  private void put(VM vm0, final Object value) {
+    vm0.invoke("Put data", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+      region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+    });
+  }
+
+  protected static int getObjectSizerInvocations(InternalRegion region) {
+    TestObjectSizer sizer = (TestObjectSizer) region.getEvictionAttributes().getObjectSizer();
+    int result = sizer.invocations.get();
+    logger.info("objectSizerInvocations=" + result);
+    return result;
+  }
+
+  private void createRR(MemberVM memberVM) {
+    memberVM.invoke("Create replicateRegion", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+
+      DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+      diskStoreFactory.setDiskDirs(getMyDiskDirs());
+      diskStoreFactory.create(RR_DISK_STORE_NAME);
+
+      RegionFactory<Integer, TestDelta> regionFactory = cache.createRegionFactory();
+      regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+      regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+          new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+      regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private void assertValueType(VM vm, final ValueType expectedType) {
+    vm.invoke("assertValueType", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+      InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+      Object value = region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+      switch (expectedType) {
+        case RAW_VALUE:
+          assertThat(value).isNotInstanceOf(CachedDeserializable.class);
+          break;
+        case CD_SERIALIZED:
+          assertThat(value).isInstanceOf(CachedDeserializable.class);
+
+          Object serializedValue = ((CachedDeserializable) value).getValue();
+          assertThat(serializedValue).isInstanceOf(byte[].class);
+          break;
+        case CD_DESERIALIZED:
+          assertThat(value).isInstanceOf(CachedDeserializable.class);
+
+          Object deserializedValue = ((CachedDeserializable) value).getValue();
+          assertThat(deserializedValue).isNotInstanceOf(byte[].class);
+          break;
+        case EVICTED:
+          assertThat(value).isNull();
+          break;
+      }
+    });
+  }
+
+  private static File[] getMyDiskDirs() {
+    long random = new Random().nextLong();
+    File file = new File(Long.toString(random));
+    assertThat(file.mkdirs()).isTrue();
+    return new File[] {file};
+  }
+
+  private void createPR(MemberVM memberVM) {
+    memberVM.invoke("Create partitioned region", () -> {
+      Cache cache = ClusterStartupRule.getCache();
+      assertThat(cache).isNotNull();
+
+      PartitionAttributesFactory<Integer, TestDelta> paf =
+          new PartitionAttributesFactory<>();
+      paf.setRedundantCopies(1);
+      PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+
+      RegionFactory<Integer, TestDelta> regionFactory = cache.createRegionFactory();
+      regionFactory.setDataPolicy(DataPolicy.PARTITION);
+      regionFactory.setDiskSynchronous(true);
+      regionFactory.setPartitionAttributes(prAttr);
+      regionFactory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+      regionFactory.create(TEST_REGION_NAME);
+    });
+  }
+
+  private static class TestObjectSizer implements ObjectSizer {
+    private final AtomicInteger invocations = new AtomicInteger();
+
+    @Override
+    public int sizeof(Object o) {
+      logger.info("TestObjectSizer invoked");
+      if (o instanceof TestDelta) {
+        invocations.incrementAndGet();
+        return ((TestDelta) o).info.length();
+      }
+      if (o instanceof Integer) {
+        return 0;
+      }
+      throw new RuntimeException("Unexpected type to be sized " + o.getClass() + ", object=" + o);
+    }
+  }
+
+  enum ValueType {
+    RAW_VALUE, CD_SERIALIZED, CD_DESERIALIZED, EVICTED
+  }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java
index cc6ab47..792330e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java
@@ -33,12 +33,28 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
   public int deserializations;
   public int deltas;
   public int clones;
+  public boolean forceRecalculateSize;
 
   public TestDelta() {}
 
   public TestDelta(boolean hasDelta, String info) {
     this.hasDelta = hasDelta;
     this.info = info;
+    this.forceRecalculateSize = false;
+  }
+
+  public TestDelta(boolean hasDelta, String info, boolean forceRecalculateSize) {
+    this.hasDelta = hasDelta;
+    this.info = info;
+    this.forceRecalculateSize = forceRecalculateSize;
+  }
+
+  @Override
+  public String toString() {
+    return "TestDelta{" +
+        "info='" + info + "'" +
+        "forceRecalculateSize='" + forceRecalculateSize + "'" +
+        '}';
   }
 
   public synchronized void checkFields(final int serializations, final int deserializations,
@@ -51,9 +67,9 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
 
   @Override
   public synchronized void fromDelta(DataInput in) throws IOException, InvalidDeltaException {
-    // new Exception("DAN - From Delta Called").printStackTrace();
     this.hasDelta = true;
     info = DataSerializer.readString(in);
+    forceRecalculateSize = DataSerializer.readBoolean(in);
     deltas++;
   }
 
@@ -63,14 +79,18 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
   }
 
   @Override
+  public boolean getForceRecalculateSize() {
+    return forceRecalculateSize;
+  }
+
+  @Override
   public synchronized void toDelta(DataOutput out) throws IOException {
-    // new Exception("DAN - To Delta Called").printStackTrace();
     DataSerializer.writeString(info, out);
+    DataSerializer.writeBoolean(forceRecalculateSize, out);
   }
 
   @Override
   public synchronized void fromData(DataInput in) throws IOException, ClassNotFoundException {
-    // new Exception("DAN - From Data Called").printStackTrace();
     info = DataSerializer.readString(in);
     serializations = in.readInt();
     deserializations = in.readInt();
@@ -81,7 +101,6 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
 
   @Override
   public synchronized void toData(DataOutput out) throws IOException {
-    // new Exception("DAN - To Data Called").printStackTrace();
     serializations++;
     DataSerializer.writeString(info, out);
     out.writeInt(serializations);
@@ -92,7 +111,6 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
 
   @Override
   public synchronized Object clone() throws CloneNotSupportedException {
-    // new Exception("DAN - Clone Called").printStackTrace();
     clones++;
     return super.clone();
   }
diff --git a/geode-core/src/main/java/org/apache/geode/Delta.java b/geode-core/src/main/java/org/apache/geode/Delta.java
index 6104a48..8249f74 100755
--- a/geode-core/src/main/java/org/apache/geode/Delta.java
+++ b/geode-core/src/main/java/org/apache/geode/Delta.java
@@ -23,17 +23,16 @@ import java.io.IOException;
  * This interface defines a contract between the application and GemFire that allows GemFire to
  * determine whether an application object contains a delta, allows GemFire to extract the delta
  * from an application object, and generate a new application object by applying a delta to an
- * existing application object. The difference in object state is contained in the
- * {@link DataOutput} and {@link DataInput} parameters.
+ * existing application object. The difference in object state is contained in the {@link
+ * DataOutput} and {@link DataInput} parameters.
  *
  * @since GemFire 6.1
- *
  */
 public interface Delta {
 
   /**
-   * Returns true if this object has pending changes it can write out as a delta.
-   * Returns false if this object must be transmitted in its entirety.
+   * Returns true if this object has pending changes it can write out as a delta. Returns false if
+   * this object must be transmitted in its entirety.
    */
   boolean hasDelta();
 
@@ -42,8 +41,8 @@ public interface Delta {
    * presence of a delta by calling {@link Delta#hasDelta()} on the object. The delta is written to
    * the {@link DataOutput} object provided by GemFire.
    *
+   * <p>
    * Any delta state should be reset in this method.
-   *
    */
   void toDelta(DataOutput out) throws IOException;
 
@@ -53,7 +52,29 @@ public interface Delta {
    * This method throws an {@link InvalidDeltaException} when the delta in the {@link DataInput}
    * cannot be applied to the object. GemFire automatically handles an {@link InvalidDeltaException}
    * by reattempting the update by sending the full application object.
-   *
    */
   void fromDelta(DataInput in) throws IOException, InvalidDeltaException;
+
+  /**
+   * By default, entry sizes are not recalculated when deltas are applied. This optimizes for the
+   * case where the size of an entry does not change. However, if an entry size does increase or
+   * decrease, this default behavior can result in the memory usage statistics becoming inaccurate.
+   * These are used to monitor the health of Geode instances, and for balancing memory usage across
+   * partitioned regions.
+   *
+   * <p>
+   * There is a system property, gemfire.DELTAS_RECALCULATE_SIZE, which can be used to cause all
+   * deltas to trigger entry size recalculation when deltas are applied. By default, this is set
+   * to 'false' because of potential performance impacts when every delta triggers a recalculation.
+   *
+   * <p>
+   * To allow entry size recalculation on a per-delta basis, classes that extend the Delta interface
+   * should override this method to return 'true'. This may impact performance of specific delta
+   * types, but will not globally affect the performance of other Geode delta operations.
+   *
+   * @since 1.14
+   */
+  default boolean getForceRecalculateSize() {
+    return false;
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index d565407..af5ebd0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -1864,6 +1864,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
         return;
       }
       Object instance = cd.getValue();
+
       if (instance instanceof org.apache.geode.Delta
           && ((org.apache.geode.Delta) instance).hasDelta()) {
         try (HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT)) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 5e2b9c9..ac0a5e2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -26,11 +26,13 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.CopyHelper;
 import org.apache.geode.DataSerializer;
+import org.apache.geode.Delta;
 import org.apache.geode.DeltaSerializationException;
 import org.apache.geode.GemFireIOException;
 import org.apache.geode.InvalidDeltaException;
 import org.apache.geode.SerializationException;
 import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.EntryOperation;
 import org.apache.geode.cache.Operation;
@@ -262,7 +264,6 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
   protected EntryEventImpl(final InternalRegion region, Operation op, Object key,
       @Retained(ENTRY_EVENT_NEW_VALUE) Object newVal, Object callbackArgument, boolean originRemote,
       DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) {
-
     this.region = region;
     InternalDistributedSystem ds =
         (InternalDistributedSystem) region.getCache().getDistributedSystem();
@@ -1521,7 +1522,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
     if (obj instanceof byte[] || obj == null || obj instanceof CachedDeserializable
         || obj == Token.NOT_AVAILABLE || Token.isInvalidOrRemoved(obj)
         // don't serialize delta object already serialized
-        || obj instanceof org.apache.geode.Delta) { // internal delta
+        || obj instanceof Delta) { // internal delta
       return obj;
     }
     final CachedDeserializable cd;
@@ -1713,10 +1714,10 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
     // This is a horrible hack, but we need to get the size of the object
     // When we store an entry. This code is only used when we do a put
     // in the primary.
-    if (v instanceof org.apache.geode.Delta && getRegion().isUsedForPartitionedRegionBucket()) {
+    if (v instanceof Delta && getRegion().isUsedForPartitionedRegionBucket()) {
       int vSize;
       Object ov = basicGetOldValue();
-      if (ov instanceof CachedDeserializable && !GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) {
+      if (ov instanceof CachedDeserializable && !(shouldRecalculateSize((Delta) v))) {
         vSize = ((CachedDeserializable) ov).getValueSizeInBytes();
       } else {
         vSize = CachedDeserializableFactory.calcMemSize(v, getRegion().getObjectSizer(), false);
@@ -1835,7 +1836,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
       boolean deltaBytesApplied = false;
       try (ByteArrayDataInput in = new ByteArrayDataInput(getDeltaBytes())) {
         long start = getRegion().getCachePerfStats().getTime();
-        ((org.apache.geode.Delta) value).fromDelta(in);
+        ((Delta) value).fromDelta(in);
         getRegion().getCachePerfStats().endDeltaUpdate(start);
         deltaBytesApplied = true;
       } catch (RuntimeException rte) {
@@ -1858,7 +1859,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
       if (wasCD) {
         CachedDeserializable old = (CachedDeserializable) oldValueInVM;
         int valueSize;
-        if (GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) {
+        if (shouldRecalculateSize((Delta) value)) {
           valueSize =
               CachedDeserializableFactory.calcMemSize(value, getRegion().getObjectSizer(), false);
         } else {
@@ -1878,6 +1879,12 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
     }
   }
 
+  @VisibleForTesting
+  protected static boolean shouldRecalculateSize(Delta value) {
+    return GemFireCacheImpl.DELTAS_RECALCULATE_SIZE
+        || value.getForceRecalculateSize();
+  }
+
   void setTXEntryOldValue(Object oldVal, boolean mustBeAvailable) {
     if (Token.isInvalidOrRemoved(oldVal)) {
       oldVal = null;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 1833039..dbe3ced 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -151,7 +151,6 @@ import org.apache.geode.cache.wan.GatewaySender;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DistributionAdvisor;
 import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.DistributionStats;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ResourceEvent;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -1728,7 +1727,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
         }
         if (extractDelta && ((Delta) value).hasDelta()) {
           try (HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT)) {
-            long start = DistributionStats.getStatTime();
             try {
               ((Delta) value).toDelta(hdos);
             } catch (RuntimeException re) {
@@ -1737,7 +1735,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
               throw new DeltaSerializationException("Caught exception while sending delta", e);
             }
             event.setDeltaBytes(hdos.toByteArray());
-            getCachePerfStats().endDeltaPrepared(start);
           }
         }
       }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java
index 44227ea..6768ca2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java
@@ -29,9 +29,11 @@ import static org.mockito.Mockito.when;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.geode.Delta;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.SerializedCacheValue;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -913,6 +915,60 @@ public class EntryEventImplTest {
     assertThat(event.isTransactional()).isFalse();
   }
 
+  @Test
+  public void shouldRecalculateSize_returnsTrue_ifGetForceRecalculateSizeIsTrue_andDELTAS_RECALCULATE_SIZEisTrue() {
+    GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = true;
+    Delta deltaValue = mock(Delta.class);
+    when(deltaValue.getForceRecalculateSize())
+        .thenReturn(true);
+
+    boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+    assertThat(value).isTrue();
+  }
+
+  @Test
+  public void shouldRecalculateSize_returnsTrue_ifDELTAS_RECALCULATE_SIZEisTrue_andGetForceRecalculateSizeIsFalse() {
+    GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = true;
+    Delta deltaValue = mock(Delta.class);
+    when(deltaValue.getForceRecalculateSize())
+        .thenReturn(false);
+
+    boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+    assertThat(value).isTrue();
+  }
+
+  @Test
+  public void shouldRecalculateSize_returnsTrue_ifGetForceRecalculateSizeIsTrue_andDELTAS_RECALCULATE_SIZEIsFalse() {
+    GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false;
+    Delta deltaValue = mock(Delta.class);
+    when(deltaValue.getForceRecalculateSize())
+        .thenReturn(true);
+
+    boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+    assertThat(value).isTrue();
+  }
+
+
+  @Test
+  public void shouldRecalculateSize_returnsFalse_ifBothDELTAS_RECALCULATE_SIZEIsFalse_andGetForceRecalculateSizeIsFalse() {
+    GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false;
+    Delta deltaValue = mock(Delta.class);
+    when(deltaValue.getForceRecalculateSize())
+        .thenReturn(false);
+
+    boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+    assertThat(value).isFalse();
+  }
+
+  @After
+  public void tearDown() {
+    GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false;
+  }
+
   private static class EntryEventImplWithOldValuesDisabled extends EntryEventImpl {
     @Override
     protected boolean areOldValuesEnabled() {