You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/08/27 16:37:50 UTC
[geode] branch develop updated: GEODE-5631:
failedBatchRemovalMessageKeys not used after GII (#2375)
This is an automated email from the ASF dual-hosted git repository.
nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 85953f0 GEODE-5631: failedBatchRemovalMessageKeys not used after GII (#2375)
85953f0 is described below
commit 85953f085b13815405cb9bf3c5b1bca77c3c9a5e
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Mon Aug 27 09:37:45 2018 -0700
GEODE-5631: failedBatchRemovalMessageKeys not used after GII (#2375)
* After GII a flag is set to indicate that failedBatchRemovalMessageKeys has been processed
* If this flag is set, no more entries will be put into failedBatchRemovalMessageKeys.
---
.../internal/cache/AbstractBucketRegionQueue.java | 12 +++++++++++
.../geode/internal/cache/BucketRegionQueue.java | 1 +
.../wan/parallel/ParallelQueueRemovalMessage.java | 10 ++++++---
.../ParallelQueueRemovalMessageJUnitTest.java | 24 ++++++++++++++++++++++
4 files changed, 44 insertions(+), 3 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index fe95659..fc06ceb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -501,6 +501,18 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
failedBatchRemovalMessageKeys.add(key);
}
+ public boolean isFailedBatchRemovalMessageKeysClearedFlag() {
+ return failedBatchRemovalMessageKeysClearedFlag;
+ }
+
+ public void setFailedBatchRemovalMessageKeysClearedFlag(
+ boolean failedBatchRemovalMessageKeysClearedFlag) {
+ this.failedBatchRemovalMessageKeysClearedFlag = failedBatchRemovalMessageKeysClearedFlag;
+ }
+
+ private boolean failedBatchRemovalMessageKeysClearedFlag = false;
+
+
public ConcurrentHashSet<Object> getFailedBatchRemovalMessageKeys() {
return this.failedBatchRemovalMessageKeys;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 314d0cc..712c0b8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -188,6 +188,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
}
}
}
+ setFailedBatchRemovalMessageKeysClearedFlag(true);
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index 6d47266..401094d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -187,7 +187,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
}
}
- private void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key,
+ void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key,
PartitionedRegion prQ) {
final boolean isDebugEnabled = logger.isDebugEnabled();
try {
@@ -207,8 +207,12 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
}
// add the key to failedBatchRemovalMessageQueue.
// This is to handle the last scenario in #49196
- brq.addToFailedBatchRemovalMessageKeys(key);
-
+ // But if GII is already completed and FailedBatchRemovalMessageKeys
+ // are already cleared then no keys should be added to it as they will
+ // never be cleared and increase the memory footprint.
+ if (!brq.isFailedBatchRemovalMessageKeysClearedFlag()) {
+ brq.addToFailedBatchRemovalMessageKeys(key);
+ }
} catch (ForceReattemptException fe) {
if (isDebugEnabled) {
logger.debug(
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 7633044..6a5b495 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -17,10 +17,14 @@ package org.apache.geode.internal.cache.wan.parallel;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
@@ -37,6 +41,7 @@ import org.mockito.stubbing.Answer;
import org.apache.geode.cache.AttributesFactory;
import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.EvictionAction;
import org.apache.geode.cache.EvictionAttributes;
import org.apache.geode.cache.Operation;
@@ -46,11 +51,13 @@ import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.Scope;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketAdvisor;
import org.apache.geode.internal.cache.BucketRegionQueue;
import org.apache.geode.internal.cache.BucketRegionQueueHelper;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EvictionAttributesImpl;
+import org.apache.geode.internal.cache.ForceReattemptException;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalRegionArguments;
import org.apache.geode.internal.cache.KeyInfo;
@@ -180,6 +187,23 @@ public class ParallelQueueRemovalMessageJUnitTest {
}
@Test
+ public void ifIsFailedBatchRemovalMessageKeysClearedFlagSetThenAddToFailedBatchRemovalMessageKeysNotCalled()
+ throws ForceReattemptException {
+ ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage();
+ Object object = new Object();
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ AbstractBucketRegionQueue brq = mock(AbstractBucketRegionQueue.class);
+ doThrow(new EntryNotFoundException("ENTRY NOT FOUND")).when(brq).destroyKey(object);
+ when(brq.isFailedBatchRemovalMessageKeysClearedFlag()).thenReturn(true);
+ doNothing().when(brq).addToFailedBatchRemovalMessageKeys(object);
+ pqrm.destroyKeyFromBucketQueue(brq, object, partitionedRegion);
+ verify(brq, times(1)).destroyKey(object);
+ verify(brq, times(1)).isFailedBatchRemovalMessageKeysClearedFlag();
+ verify(brq, times(0)).addToFailedBatchRemovalMessageKeys(object);
+
+ }
+
+ @Test
public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue()
throws Exception {
// Validate initial BucketRegionQueue state