You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by on...@apache.org on 2021/03/04 02:43:48 UTC
[geode] 02/04: GEODE-8894 allow individual deltas to trigger bucket
size recalculation (#5978)
This is an automated email from the ASF dual-hosted git repository.
onichols pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
commit d1b8319d7bc46de6b10480ebb0c9df41aeae894e
Author: Ray Ingles <ri...@pivotal.io>
AuthorDate: Tue Feb 23 12:20:45 2021 -0500
GEODE-8894 allow individual deltas to trigger bucket size recalculation (#5978)
* add interface for forcing size recalculation on buckets
* Allow individual deltas to trigger bucket size recalculation
* remove all deprecated methods and redundant tests
* update Javadoc and remove Event instance variable
* add EntryEventImpl unit tests for size recalc
* remove unused classes/code
* remove unused TestKey
* reorganize and clarify region creation
Co-authored-by: Ray Ingles <ri...@vmware.com>
(cherry-picked from 3a21c2852746f19755ac302f584ca5b8908eae2e)
---
.../internal/cache/DeltaFaultInDUnitTest.java | 6 +-
.../cache/DeltaForceSizingFlagDUnitTest.java | 315 +++++++++++++++++++++
.../org/apache/geode/internal/cache/TestDelta.java | 28 +-
.../src/main/java/org/apache/geode/Delta.java | 35 ++-
.../apache/geode/internal/cache/BucketRegion.java | 1 +
.../geode/internal/cache/EntryEventImpl.java | 19 +-
.../apache/geode/internal/cache/LocalRegion.java | 3 -
.../geode/internal/cache/EntryEventImplTest.java | 56 ++++
8 files changed, 437 insertions(+), 26 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java
index 33af502..3f233d6 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaFaultInDUnitTest.java
@@ -32,10 +32,6 @@ import org.apache.geode.test.dunit.SerializableRunnable;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
-/**
- * Test that the bucket size does not go negative when we fault out and in a delta object.
- *
- */
public class DeltaFaultInDUnitTest extends JUnit4CacheTestCase {
@@ -45,7 +41,7 @@ public class DeltaFaultInDUnitTest extends JUnit4CacheTestCase {
}
@Test
- public void test() throws Exception {
+ public void bucketSizeShould_notGoNegative_onFaultInDeltaObject() throws Exception {
final Host host = Host.getHost(0);
VM vm0 = host.getVM(0);
VM vm1 = host.getVM(1);
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
new file mode 100644
index 0000000..df95e6e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/DeltaForceSizingFlagDUnitTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.File;
+import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.logging.log4j.Logger;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.DiskStoreFactory;
+import org.apache.geode.cache.EvictionAction;
+import org.apache.geode.cache.EvictionAttributes;
+import org.apache.geode.cache.InterestPolicy;
+import org.apache.geode.cache.PartitionAttributes;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.Scope;
+import org.apache.geode.cache.SubscriptionAttributes;
+import org.apache.geode.cache.util.ObjectSizer;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+/**
+ * Tests the use of the per-delta "forceRecalculateSize" flag.
+ */
+
+public class DeltaForceSizingFlagDUnitTest {
+ private static final String TEST_REGION_NAME = "forceResizeTestRegionName";
+ public static final String SMALLER_DELTA_DATA = "12345";
+ public static final String LARGER_DELTA_DATA = "1234567890";
+ public static final String DELTA_KEY = "a_key";
+ public static final String RR_DISK_STORE_NAME = "_forceRecalculateSize_replicate_store";
+ private static final Logger logger = LogService.getLogger();
+
+ @Rule
+ public ClusterStartupRule cluster = new ClusterStartupRule();
+
+ protected MemberVM locator;
+ protected MemberVM server1;
+ protected MemberVM server2;
+
+ @Before
+ public void setup() {
+ int locatorPort;
+ locator = cluster.startLocatorVM(0);
+ locatorPort = locator.getPort();
+
+ server1 = cluster.startServerVM(1, locatorPort);
+ server2 = cluster.startServerVM(2, locatorPort);
+ }
+
+ @Test
+ public void testRRMemLRUDelta() {
+ doRRMemLRUDeltaTest(false);
+ }
+
+ @Test
+ public void testRRMemLRUDeltaAndFlag() {
+ doRRMemLRUDeltaTest(true);
+ }
+
+ @Test
+ public void testPRNoLRUDelta() {
+ doPRNoLRUDeltaTest(false);
+ }
+
+ @Test
+ public void testPRNoLRUAndFlagDelta() {
+ doPRNoLRUDeltaTest(true);
+ }
+
+ private void doRRMemLRUDeltaTest(boolean shouldSizeChange) {
+ VM vm1 = server1.getVM();
+ VM vm2 = server2.getVM();
+
+ createRR(server1);
+ createRR(server2);
+ TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, shouldSizeChange);
+ put(vm1, delta1);
+
+ assertValueType(vm1, ValueType.RAW_VALUE);
+ assertValueType(vm2, ValueType.CD_SERIALIZED);
+ assertThat(getObjectSizerInvocations(vm1)).isEqualTo(1);
+ assertThat(getObjectSizerInvocations(vm2)).isEqualTo(0);
+
+ long origEvictionSize0 = getSizeFromEvictionStats(vm1);
+ long origEvictionSize1 = getSizeFromEvictionStats(vm2);
+ delta1.info = LARGER_DELTA_DATA;
+ delta1.hasDelta = true;
+ // Update the delta
+ put(vm1, delta1);
+
+ assertValueType(vm1, ValueType.RAW_VALUE);
+ assertValueType(vm2, ValueType.CD_DESERIALIZED);
+
+ assertThat(getObjectSizerInvocations(vm1)).isEqualTo(2);
+
+ long finalEvictionSize0 = getSizeFromEvictionStats(vm1);
+ long finalEvictionSize1 = getSizeFromEvictionStats(vm2);
+ assertThat(finalEvictionSize0 - origEvictionSize0).isEqualTo(5);
+ if (shouldSizeChange) {
+ assertThat(getObjectSizerInvocations(vm2)).isEqualTo(1);
+ // I'm not sure what the change in size should be, because we went
+ // from serialized to deserialized
+ assertThat(finalEvictionSize1 - origEvictionSize1).isNotEqualTo(0);
+ } else {
+ // we invoke the sizer once when we deserialize the original to apply the delta to it
+ assertThat(getObjectSizerInvocations(vm2)).isEqualTo(0);
+ assertThat(finalEvictionSize1 - origEvictionSize1).isEqualTo(0);
+ }
+ }
+
+ private void doPRNoLRUDeltaTest(boolean shouldSizeChange) {
+ VM vm1 = server1.getVM();
+ VM vm2 = server2.getVM();
+
+ createPR(server1);
+ createPR(server2);
+
+ TestDelta delta1 = new TestDelta(false, SMALLER_DELTA_DATA, shouldSizeChange);
+ put(vm1, delta1);
+ long origPRSize0 = getSizeFromPRStats(vm1);
+ long origPRSize1 = getSizeFromPRStats(vm2);
+
+ // Update the delta
+ delta1.info = LARGER_DELTA_DATA;
+ delta1.hasDelta = true;
+ put(vm1, delta1);
+ long finalPRSize0 = getSizeFromPRStats(vm1);
+ long finalPRSize1 = getSizeFromPRStats(vm2);
+
+ if (shouldSizeChange) {
+ // I'm not sure what the change in size should be, because we went
+ // from serialized to deserialized
+ assertThat(finalPRSize0 - origPRSize0).isNotEqualTo(0);
+ assertThat(finalPRSize1 - origPRSize1).isNotEqualTo(0);
+ } else {
+ assertThat(finalPRSize0 - origPRSize0).isEqualTo(0);
+ assertThat(finalPRSize1 - origPRSize1).isEqualTo(0);
+ }
+ }
+
+ private long getSizeFromPRStats(VM vm0) {
+ return vm0.invoke("getSizeFromPRStats", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+ if (region instanceof PartitionedRegion) {
+ long total = 0;
+ PartitionedRegion pr = (PartitionedRegion) region;
+ int totalNumBuckets = pr.getPartitionAttributes().getTotalNumBuckets();
+ for (int i = 0; i < totalNumBuckets; i++) {
+ total += pr.getDataStore().getBucketSize(i);
+ }
+ return total;
+ } else {
+ return 0L;
+ }
+ });
+ }
+
+ private long getSizeFromEvictionStats(VM vm0) {
+ return vm0.invoke("getSizeFromEvictionStats", () -> {
+
+ Cache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+ return region.getEvictionCounter();
+ });
+ }
+
+ private int getObjectSizerInvocations(VM vm0) {
+ return vm0.invoke("getObjectSizerInvocations", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+ return getObjectSizerInvocations(region);
+ });
+ }
+
+ private void put(VM vm0, final Object value) {
+ vm0.invoke("Put data", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+ region.put(DeltaForceSizingFlagDUnitTest.DELTA_KEY, value);
+ });
+ }
+
+ protected static int getObjectSizerInvocations(InternalRegion region) {
+ TestObjectSizer sizer = (TestObjectSizer) region.getEvictionAttributes().getObjectSizer();
+ int result = sizer.invocations.get();
+ logger.info("objectSizerInvocations=" + result);
+ return result;
+ }
+
+ private void createRR(MemberVM memberVM) {
+ memberVM.invoke("Create replicateRegion", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+
+ DiskStoreFactory diskStoreFactory = cache.createDiskStoreFactory();
+ diskStoreFactory.setDiskDirs(getMyDiskDirs());
+ diskStoreFactory.create(RR_DISK_STORE_NAME);
+
+ RegionFactory<Integer, TestDelta> regionFactory = cache.createRegionFactory();
+ regionFactory.setDataPolicy(DataPolicy.REPLICATE);
+ regionFactory.setDiskStoreName(RR_DISK_STORE_NAME);
+ regionFactory.setDiskSynchronous(true);
+ regionFactory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(1,
+ new TestObjectSizer(), EvictionAction.OVERFLOW_TO_DISK));
+ regionFactory.setScope(Scope.DISTRIBUTED_ACK);
+
+ regionFactory.create(TEST_REGION_NAME);
+ });
+ }
+
+ private void assertValueType(VM vm, final ValueType expectedType) {
+ vm.invoke("assertValueType", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+ InternalRegion region = (InternalRegion) cache.getRegion(TEST_REGION_NAME);
+ Object value = region.getValueInVM(DeltaForceSizingFlagDUnitTest.DELTA_KEY);
+ switch (expectedType) {
+ case RAW_VALUE:
+ assertThat(value).isNotInstanceOf(CachedDeserializable.class);
+ break;
+ case CD_SERIALIZED:
+ assertThat(value).isInstanceOf(CachedDeserializable.class);
+
+ Object serializedValue = ((CachedDeserializable) value).getValue();
+ assertThat(serializedValue).isInstanceOf(byte[].class);
+ break;
+ case CD_DESERIALIZED:
+ assertThat(value).isInstanceOf(CachedDeserializable.class);
+
+ Object deserializedValue = ((CachedDeserializable) value).getValue();
+ assertThat(deserializedValue).isNotInstanceOf(byte[].class);
+ break;
+ case EVICTED:
+ assertThat(value).isNull();
+ break;
+ }
+ });
+ }
+
+ private static File[] getMyDiskDirs() {
+ long random = new Random().nextLong();
+ File file = new File(Long.toString(random));
+ assertThat(file.mkdirs()).isTrue();
+ return new File[] {file};
+ }
+
+ private void createPR(MemberVM memberVM) {
+ memberVM.invoke("Create partitioned region", () -> {
+ Cache cache = ClusterStartupRule.getCache();
+ assertThat(cache).isNotNull();
+
+ PartitionAttributesFactory<Integer, TestDelta> paf =
+ new PartitionAttributesFactory<>();
+ paf.setRedundantCopies(1);
+ PartitionAttributes<Integer, TestDelta> prAttr = paf.create();
+
+ RegionFactory<Integer, TestDelta> regionFactory = cache.createRegionFactory();
+ regionFactory.setDataPolicy(DataPolicy.PARTITION);
+ regionFactory.setDiskSynchronous(true);
+ regionFactory.setPartitionAttributes(prAttr);
+ regionFactory.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.ALL));
+ regionFactory.create(TEST_REGION_NAME);
+ });
+ }
+
+ private static class TestObjectSizer implements ObjectSizer {
+ private final AtomicInteger invocations = new AtomicInteger();
+
+ @Override
+ public int sizeof(Object o) {
+ logger.info("TestObjectSizer invoked");
+ if (o instanceof TestDelta) {
+ invocations.incrementAndGet();
+ return ((TestDelta) o).info.length();
+ }
+ if (o instanceof Integer) {
+ return 0;
+ }
+ throw new RuntimeException("Unexpected type to be sized " + o.getClass() + ", object=" + o);
+ }
+ }
+
+ enum ValueType {
+ RAW_VALUE, CD_SERIALIZED, CD_DESERIALIZED, EVICTED
+ }
+}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java
index cc6ab47..792330e 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/TestDelta.java
@@ -33,12 +33,28 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
public int deserializations;
public int deltas;
public int clones;
+ public boolean forceRecalculateSize;
public TestDelta() {}
public TestDelta(boolean hasDelta, String info) {
this.hasDelta = hasDelta;
this.info = info;
+ this.forceRecalculateSize = false;
+ }
+
+ public TestDelta(boolean hasDelta, String info, boolean forceRecalculateSize) {
+ this.hasDelta = hasDelta;
+ this.info = info;
+ this.forceRecalculateSize = forceRecalculateSize;
+ }
+
+ @Override
+ public String toString() {
+ return "TestDelta{" +
+ "info='" + info + "'" +
+ "forceRecalculateSize='" + forceRecalculateSize + "'" +
+ '}';
}
public synchronized void checkFields(final int serializations, final int deserializations,
@@ -51,9 +67,9 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
@Override
public synchronized void fromDelta(DataInput in) throws IOException, InvalidDeltaException {
- // new Exception("DAN - From Delta Called").printStackTrace();
this.hasDelta = true;
info = DataSerializer.readString(in);
+ forceRecalculateSize = DataSerializer.readBoolean(in);
deltas++;
}
@@ -63,14 +79,18 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
}
@Override
+ public boolean getForceRecalculateSize() {
+ return forceRecalculateSize;
+ }
+
+ @Override
public synchronized void toDelta(DataOutput out) throws IOException {
- // new Exception("DAN - To Delta Called").printStackTrace();
DataSerializer.writeString(info, out);
+ DataSerializer.writeBoolean(forceRecalculateSize, out);
}
@Override
public synchronized void fromData(DataInput in) throws IOException, ClassNotFoundException {
- // new Exception("DAN - From Data Called").printStackTrace();
info = DataSerializer.readString(in);
serializations = in.readInt();
deserializations = in.readInt();
@@ -81,7 +101,6 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
@Override
public synchronized void toData(DataOutput out) throws IOException {
- // new Exception("DAN - To Data Called").printStackTrace();
serializations++;
DataSerializer.writeString(info, out);
out.writeInt(serializations);
@@ -92,7 +111,6 @@ public class TestDelta implements Delta, DataSerializable, Cloneable {
@Override
public synchronized Object clone() throws CloneNotSupportedException {
- // new Exception("DAN - Clone Called").printStackTrace();
clones++;
return super.clone();
}
diff --git a/geode-core/src/main/java/org/apache/geode/Delta.java b/geode-core/src/main/java/org/apache/geode/Delta.java
index 6104a48..8249f74 100755
--- a/geode-core/src/main/java/org/apache/geode/Delta.java
+++ b/geode-core/src/main/java/org/apache/geode/Delta.java
@@ -23,17 +23,16 @@ import java.io.IOException;
* This interface defines a contract between the application and GemFire that allows GemFire to
* determine whether an application object contains a delta, allows GemFire to extract the delta
* from an application object, and generate a new application object by applying a delta to an
- * existing application object. The difference in object state is contained in the
- * {@link DataOutput} and {@link DataInput} parameters.
+ * existing application object. The difference in object state is contained in the {@link
+ * DataOutput} and {@link DataInput} parameters.
*
* @since GemFire 6.1
- *
*/
public interface Delta {
/**
- * Returns true if this object has pending changes it can write out as a delta.
- * Returns false if this object must be transmitted in its entirety.
+ * Returns true if this object has pending changes it can write out as a delta. Returns false if
+ * this object must be transmitted in its entirety.
*/
boolean hasDelta();
@@ -42,8 +41,8 @@ public interface Delta {
* presence of a delta by calling {@link Delta#hasDelta()} on the object. The delta is written to
* the {@link DataOutput} object provided by GemFire.
*
+ * <p>
* Any delta state should be reset in this method.
- *
*/
void toDelta(DataOutput out) throws IOException;
@@ -53,7 +52,29 @@ public interface Delta {
* This method throws an {@link InvalidDeltaException} when the delta in the {@link DataInput}
* cannot be applied to the object. GemFire automatically handles an {@link InvalidDeltaException}
* by reattempting the update by sending the full application object.
- *
*/
void fromDelta(DataInput in) throws IOException, InvalidDeltaException;
+
+ /**
+ * By default, entry sizes are not recalculated when deltas are applied. This optimizes for the
+ * case where the size of an entry does not change. However, if an entry size does increase or
+ * decrease, this default behavior can result in the memory usage statistics becoming inaccurate.
+ * These are used to monitor the health of Geode instances, and for balancing memory usage across
+ * partitioned regions.
+ *
+ * <p>
+ * There is a system property, gemfire.DELTAS_RECALCULATE_SIZE, which can be used to cause all
+ * deltas to trigger entry size recalculation when deltas are applied. By default, this is set
+ * to 'false' because of potential performance impacts when every delta triggers a recalculation.
+ *
+ * <p>
+ * To allow entry size recalculation on a per-delta basis, classes that extend the Delta interface
+ * should override this method to return 'true'. This may impact performance of specific delta
+ * types, but will not globally affect the performance of other Geode delta operations.
+ *
+ * @since 1.14
+ */
+ default boolean getForceRecalculateSize() {
+ return false;
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
index d565407..af5ebd0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegion.java
@@ -1864,6 +1864,7 @@ public class BucketRegion extends DistributedRegion implements Bucket {
return;
}
Object instance = cd.getValue();
+
if (instance instanceof org.apache.geode.Delta
&& ((org.apache.geode.Delta) instance).hasDelta()) {
try (HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT)) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
index 5e2b9c9..ac0a5e2 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EntryEventImpl.java
@@ -26,11 +26,13 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.CopyHelper;
import org.apache.geode.DataSerializer;
+import org.apache.geode.Delta;
import org.apache.geode.DeltaSerializationException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.SerializationException;
import org.apache.geode.SystemFailure;
+import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EntryOperation;
import org.apache.geode.cache.Operation;
@@ -262,7 +264,6 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
protected EntryEventImpl(final InternalRegion region, Operation op, Object key,
@Retained(ENTRY_EVENT_NEW_VALUE) Object newVal, Object callbackArgument, boolean originRemote,
DistributedMember distributedMember, boolean generateCallbacks, boolean initializeId) {
-
this.region = region;
InternalDistributedSystem ds =
(InternalDistributedSystem) region.getCache().getDistributedSystem();
@@ -1521,7 +1522,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
if (obj instanceof byte[] || obj == null || obj instanceof CachedDeserializable
|| obj == Token.NOT_AVAILABLE || Token.isInvalidOrRemoved(obj)
// don't serialize delta object already serialized
- || obj instanceof org.apache.geode.Delta) { // internal delta
+ || obj instanceof Delta) { // internal delta
return obj;
}
final CachedDeserializable cd;
@@ -1713,10 +1714,10 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
// This is a horrible hack, but we need to get the size of the object
// When we store an entry. This code is only used when we do a put
// in the primary.
- if (v instanceof org.apache.geode.Delta && getRegion().isUsedForPartitionedRegionBucket()) {
+ if (v instanceof Delta && getRegion().isUsedForPartitionedRegionBucket()) {
int vSize;
Object ov = basicGetOldValue();
- if (ov instanceof CachedDeserializable && !GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) {
+ if (ov instanceof CachedDeserializable && !(shouldRecalculateSize((Delta) v))) {
vSize = ((CachedDeserializable) ov).getValueSizeInBytes();
} else {
vSize = CachedDeserializableFactory.calcMemSize(v, getRegion().getObjectSizer(), false);
@@ -1835,7 +1836,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
boolean deltaBytesApplied = false;
try (ByteArrayDataInput in = new ByteArrayDataInput(getDeltaBytes())) {
long start = getRegion().getCachePerfStats().getTime();
- ((org.apache.geode.Delta) value).fromDelta(in);
+ ((Delta) value).fromDelta(in);
getRegion().getCachePerfStats().endDeltaUpdate(start);
deltaBytesApplied = true;
} catch (RuntimeException rte) {
@@ -1858,7 +1859,7 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
if (wasCD) {
CachedDeserializable old = (CachedDeserializable) oldValueInVM;
int valueSize;
- if (GemFireCacheImpl.DELTAS_RECALCULATE_SIZE) {
+ if (shouldRecalculateSize((Delta) value)) {
valueSize =
CachedDeserializableFactory.calcMemSize(value, getRegion().getObjectSizer(), false);
} else {
@@ -1878,6 +1879,12 @@ public class EntryEventImpl implements InternalEntryEvent, InternalCacheEvent,
}
}
+ @VisibleForTesting
+ protected static boolean shouldRecalculateSize(Delta value) {
+ return GemFireCacheImpl.DELTAS_RECALCULATE_SIZE
+ || value.getForceRecalculateSize();
+ }
+
void setTXEntryOldValue(Object oldVal, boolean mustBeAvailable) {
if (Token.isInvalidOrRemoved(oldVal)) {
oldVal = null;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
index 1833039..dbe3ced 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/LocalRegion.java
@@ -151,7 +151,6 @@ import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.DistributionManager;
-import org.apache.geode.distributed.internal.DistributionStats;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.ResourceEvent;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -1728,7 +1727,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
}
if (extractDelta && ((Delta) value).hasDelta()) {
try (HeapDataOutputStream hdos = new HeapDataOutputStream(KnownVersion.CURRENT)) {
- long start = DistributionStats.getStatTime();
try {
((Delta) value).toDelta(hdos);
} catch (RuntimeException re) {
@@ -1737,7 +1735,6 @@ public class LocalRegion extends AbstractRegion implements LoaderHelperFactory,
throw new DeltaSerializationException("Caught exception while sending delta", e);
}
event.setDeltaBytes(hdos.toByteArray());
- getCachePerfStats().endDeltaPrepared(start);
}
}
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java
index 44227ea..6768ca2 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/EntryEventImplTest.java
@@ -29,9 +29,11 @@ import static org.mockito.Mockito.when;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.apache.geode.Delta;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.SerializedCacheValue;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
@@ -913,6 +915,60 @@ public class EntryEventImplTest {
assertThat(event.isTransactional()).isFalse();
}
+ @Test
+ public void shouldRecalculateSize_returnsTrue_ifGetForceRecalculateSizeIsTrue_andDELTAS_RECALCULATE_SIZEisTrue() {
+ GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = true;
+ Delta deltaValue = mock(Delta.class);
+ when(deltaValue.getForceRecalculateSize())
+ .thenReturn(true);
+
+ boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+ assertThat(value).isTrue();
+ }
+
+ @Test
+ public void shouldRecalculateSize_returnsTrue_ifDELTAS_RECALCULATE_SIZEisTrue_andGetForceRecalculateSizeIsFalse() {
+ GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = true;
+ Delta deltaValue = mock(Delta.class);
+ when(deltaValue.getForceRecalculateSize())
+ .thenReturn(false);
+
+ boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+ assertThat(value).isTrue();
+ }
+
+ @Test
+ public void shouldRecalculateSize_returnsTrue_ifGetForceRecalculateSizeIsTrue_andDELTAS_RECALCULATE_SIZEIsFalse() {
+ GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false;
+ Delta deltaValue = mock(Delta.class);
+ when(deltaValue.getForceRecalculateSize())
+ .thenReturn(true);
+
+ boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+ assertThat(value).isTrue();
+ }
+
+
+ @Test
+ public void shouldRecalculateSize_returnsFalse_ifBothDELTAS_RECALCULATE_SIZEIsFalse_andGetForceRecalculateSizeIsFalse() {
+ GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false;
+ Delta deltaValue = mock(Delta.class);
+ when(deltaValue.getForceRecalculateSize())
+ .thenReturn(false);
+
+ boolean value = EntryEventImpl.shouldRecalculateSize(deltaValue);
+
+ assertThat(value).isFalse();
+ }
+
+ @After
+ public void tearDown() {
+ GemFireCacheImpl.DELTAS_RECALCULATE_SIZE = false;
+ }
+
private static class EntryEventImplWithOldValuesDisabled extends EntryEventImpl {
@Override
protected boolean areOldValuesEnabled() {