You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by nn...@apache.org on 2018/08/27 16:37:50 UTC

[geode] branch develop updated: GEODE-5631: failedBatchRemovalMessageKeys not used after GII (#2375)

This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 85953f0  GEODE-5631: failedBatchRemovalMessageKeys not used after GII (#2375)
85953f0 is described below

commit 85953f085b13815405cb9bf3c5b1bca77c3c9a5e
Author: Nabarun Nag <na...@users.noreply.github.com>
AuthorDate: Mon Aug 27 09:37:45 2018 -0700

    GEODE-5631: failedBatchRemovalMessageKeys not used after GII (#2375)
    
            * After GII a flag is set to indicate that failedBatchRemovalMessageKeys has been processed
    	* If this flag is set, no more entries will be put into failedBatchRemovalMessageKeys.
---
 .../internal/cache/AbstractBucketRegionQueue.java  | 12 +++++++++++
 .../geode/internal/cache/BucketRegionQueue.java    |  1 +
 .../wan/parallel/ParallelQueueRemovalMessage.java  | 10 ++++++---
 .../ParallelQueueRemovalMessageJUnitTest.java      | 24 ++++++++++++++++++++++
 4 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
index fe95659..fc06ceb 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/AbstractBucketRegionQueue.java
@@ -501,6 +501,18 @@ public abstract class AbstractBucketRegionQueue extends BucketRegion {
     failedBatchRemovalMessageKeys.add(key);
   }
 
+  public boolean isFailedBatchRemovalMessageKeysClearedFlag() {
+    return failedBatchRemovalMessageKeysClearedFlag;
+  }
+
+  public void setFailedBatchRemovalMessageKeysClearedFlag(
+      boolean failedBatchRemovalMessageKeysClearedFlag) {
+    this.failedBatchRemovalMessageKeysClearedFlag = failedBatchRemovalMessageKeysClearedFlag;
+  }
+
+  private boolean failedBatchRemovalMessageKeysClearedFlag = false;
+
+
   public ConcurrentHashSet<Object> getFailedBatchRemovalMessageKeys() {
     return this.failedBatchRemovalMessageKeys;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
index 314d0cc..712c0b8 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/BucketRegionQueue.java
@@ -188,6 +188,7 @@ public class BucketRegionQueue extends AbstractBucketRegionQueue {
         }
       }
     }
+    setFailedBatchRemovalMessageKeysClearedFlag(true);
   }
 
   @Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index 6d47266..401094d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -187,7 +187,7 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
     }
   }
 
-  private void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key,
+  void destroyKeyFromBucketQueue(AbstractBucketRegionQueue brq, Object key,
       PartitionedRegion prQ) {
     final boolean isDebugEnabled = logger.isDebugEnabled();
     try {
@@ -207,8 +207,12 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
       }
       // add the key to failedBatchRemovalMessageQueue.
       // This is to handle the last scenario in #49196
-      brq.addToFailedBatchRemovalMessageKeys(key);
-
+      // But if GII is already completed and FailedBatchRemovalMessageKeys
+      // are already cleared then no keys should be added to it as they will
+      // never be cleared and increase the memory footprint.
+      if (!brq.isFailedBatchRemovalMessageKeysClearedFlag()) {
+        brq.addToFailedBatchRemovalMessageKeys(key);
+      }
     } catch (ForceReattemptException fe) {
       if (isDebugEnabled) {
         logger.debug(
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
index 7633044..6a5b495 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -17,10 +17,14 @@ package org.apache.geode.internal.cache.wan.parallel;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
@@ -37,6 +41,7 @@ import org.mockito.stubbing.Answer;
 
 import org.apache.geode.cache.AttributesFactory;
 import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.EvictionAction;
 import org.apache.geode.cache.EvictionAttributes;
 import org.apache.geode.cache.Operation;
@@ -46,11 +51,13 @@ import org.apache.geode.cache.RegionAttributes;
 import org.apache.geode.cache.Scope;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
 import org.apache.geode.internal.cache.BucketAdvisor;
 import org.apache.geode.internal.cache.BucketRegionQueue;
 import org.apache.geode.internal.cache.BucketRegionQueueHelper;
 import org.apache.geode.internal.cache.EntryEventImpl;
 import org.apache.geode.internal.cache.EvictionAttributesImpl;
+import org.apache.geode.internal.cache.ForceReattemptException;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalRegionArguments;
 import org.apache.geode.internal.cache.KeyInfo;
@@ -180,6 +187,23 @@ public class ParallelQueueRemovalMessageJUnitTest {
   }
 
   @Test
+  public void ifIsFailedBatchRemovalMessageKeysClearedFlagSetThenAddToFailedBatchRemovalMessageKeysNotCalled()
+      throws ForceReattemptException {
+    ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage();
+    Object object = new Object();
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    AbstractBucketRegionQueue brq = mock(AbstractBucketRegionQueue.class);
+    doThrow(new EntryNotFoundException("ENTRY NOT FOUND")).when(brq).destroyKey(object);
+    when(brq.isFailedBatchRemovalMessageKeysClearedFlag()).thenReturn(true);
+    doNothing().when(brq).addToFailedBatchRemovalMessageKeys(object);
+    pqrm.destroyKeyFromBucketQueue(brq, object, partitionedRegion);
+    verify(brq, times(1)).destroyKey(object);
+    verify(brq, times(1)).isFailedBatchRemovalMessageKeysClearedFlag();
+    verify(brq, times(0)).addToFailedBatchRemovalMessageKeys(object);
+
+  }
+
+  @Test
   public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue()
       throws Exception {
     // Validate initial BucketRegionQueue state