You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2018/09/12 18:48:07 UTC

[geode] branch develop updated: GEODE-5605: After removeAll/PutAll messages are processed on replicas, check for cache close is done. (#2450)

This is an automated email from the ASF dual-hosted git repository.

agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new d9bb24d  GEODE-5605: After removeAll/PutAll messages are processed on replicas, check for cache close is done. (#2450)
d9bb24d is described below

commit d9bb24d95bd7d310ae0ce693fe1d84774c2f521c
Author: agingade <ag...@pivotal.io>
AuthorDate: Wed Sep 12 11:47:57 2018 -0700

    GEODE-5605: After removeAll/PutAll messages are processed on replicas, check for cache close is done. (#2450)
    
    GEODE-5605: After removeAll/PutAll messages are processed on replicas, check for cache close is done.
---
 .../geode/internal/cache/DistributedRegion.java    |  4 ++
 .../geode/internal/cache/PartitionedRegion.java    | 12 ++++++
 .../cache/partitioned/PutAllPRMessage.java         | 28 ++++++++-----
 .../cache/partitioned/RemoveAllPRMessage.java      | 28 ++++++++-----
 .../internal/cache/PartitionedRegionTest.java      | 45 ++++++++++++++++++++
 .../cache/partitioned/PutAllPRMessageTest.java     | 49 ++++++++++++++++++++++
 .../cache/partitioned/RemoveAllPRMessageTest.java  | 28 +++++++++++++
 7 files changed, 172 insertions(+), 22 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d9f5903..42dcd98 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2477,6 +2477,10 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
             this.getFullPath()), ex);
       }
     }
+    waitForCurrentOperations();
+  }
+
+  protected void waitForCurrentOperations() {
     // Fix for #48066 - make sure that region operations are completely
     // distributed to peers before destroying the region.
     Boolean flushOnClose =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 86b35dc..61829b8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -5307,6 +5307,18 @@ public class PartitionedRegion extends LocalRegion
           retryTime = new RetryTimeKeeper(this.retryTimeout);
         }
         currentTarget = getOrCreateNodeForBucketWrite(bucketId, retryTime);
+      } catch (EntryNotFoundException entryNotFoundException) {
+        if (!event.isPossibleDuplicate()) {
+          throw entryNotFoundException;
+        }
+        // EntryNotFoundException during retry attempt. The operation
+        // may have been applied during the initial attempt.
+        if (logger.isDebugEnabled()) {
+          logger.debug(
+              "EntryNotFoundException is ignored during retry attempt for destroy operation. "
+                  + event);
+        }
+        return;
       }
 
       // If we get here, the attempt failed.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index b6450b3..a56ea79 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -507,17 +507,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
             // encounter cacheWriter exception
             partialKeys.saveFailedKey(key, cwe);
           } finally {
-            try {
-              // Only PutAllPRMessage knows if the thread id is fake. Event has no idea.
-              // So we have to manually set useFakeEventId for this DPAO
-              dpao.setUseFakeEventId(true);
-              r.checkReadiness();
-              bucketRegion.getDataView().postPutAll(dpao, this.versions, bucketRegion);
-            } finally {
-              if (lockedForPrimary) {
-                bucketRegion.doUnlockForPrimary();
-              }
-            }
+            doPostPutAll(r, dpao, bucketRegion, lockedForPrimary);
           }
           if (partialKeys.hasFailure()) {
             partialKeys.addKeysAndVersions(this.versions);
@@ -559,6 +549,22 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
     return true;
   }
 
+  void doPostPutAll(PartitionedRegion r, DistributedPutAllOperation dpao,
+      BucketRegion bucketRegion, boolean lockedForPrimary) {
+    try {
+      // Only PutAllPRMessage knows if the thread id is fake. Event has no idea.
+      // So we have to manually set useFakeEventId for this DPAO
+      dpao.setUseFakeEventId(true);
+      r.checkReadiness();
+      bucketRegion.getDataView().postPutAll(dpao, this.versions, bucketRegion);
+      r.checkReadiness();
+    } finally {
+      if (lockedForPrimary) {
+        bucketRegion.doUnlockForPrimary();
+      }
+    }
+  }
+
   public VersionedObjectList getVersions() {
     return this.versions;
   }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index d9f1f47..bfc009d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -516,17 +516,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
             // encounter cacheWriter exception
             partialKeys.saveFailedKey(key, cwe);
           } finally {
-            try {
-              // Only RemoveAllPRMessage knows if the thread id is fake. Event has no idea.
-              // So we have to manually set useFakeEventId for this op
-              op.setUseFakeEventId(true);
-              r.checkReadiness();
-              bucketRegion.getDataView().postRemoveAll(op, this.versions, bucketRegion);
-            } finally {
-              if (lockedForPrimary) {
-                bucketRegion.doUnlockForPrimary();
-              }
-            }
+            doPostRemoveAll(r, op, bucketRegion, lockedForPrimary);
           }
           if (partialKeys.hasFailure()) {
             partialKeys.addKeysAndVersions(this.versions);
@@ -567,6 +557,22 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
     return true;
   }
 
+  void doPostRemoveAll(PartitionedRegion r, DistributedRemoveAllOperation op,
+      BucketRegion bucketRegion, boolean lockedForPrimary) {
+    try {
+      // Only RemoveAllPRMessage knows if the thread id is fake. Event has no idea.
+      // So we have to manually set useFakeEventId for this op
+      op.setUseFakeEventId(true);
+      r.checkReadiness();
+      bucketRegion.getDataView().postRemoveAll(op, this.versions, bucketRegion);
+      r.checkReadiness();
+    } finally {
+      if (lockedForPrimary) {
+        bucketRegion.doUnlockForPrimary();
+      }
+    }
+  }
+
   public VersionedObjectList getVersions() {
     return this.versions;
   }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 2e1db35..3b6dc2a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -15,12 +15,14 @@
 package org.apache.geode.internal.cache;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.ThrowableAssert.catchThrowable;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isNull;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -37,6 +39,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.EntryNotFoundException;
 import org.apache.geode.cache.Operation;
 import org.apache.geode.cache.PartitionAttributesFactory;
 import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -175,4 +178,46 @@ public class PartitionedRegionTest {
     verify(spyPR, times(1)).getNodeForBucketWrite(anyInt(), isNull());
   }
 
+  @Test
+  public void destroyInBucketRemoteOperationThrowsEntryNotFoundException() throws Exception {
+    int bucketId = 0;
+    Object expectedOldValue = "expectedOldValue";
+    KeyInfo keyInfo = mock(KeyInfo.class);
+    when(keyInfo.getBucketId()).thenReturn(bucketId);
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+    when(entryEventImpl.getKeyInfo()).thenReturn(keyInfo);
+    InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+    PartitionedRegion spyPR = spy(partitionedRegion);
+    doReturn(primaryMember).when(spyPR).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+    doThrow(EntryNotFoundException.class).when(spyPR).destroyRemotely(eq(primaryMember),
+        eq(bucketId), eq(entryEventImpl), eq(expectedOldValue));
+
+    Throwable thrown = catchThrowable(() -> {
+      spyPR.destroyInBucket(entryEventImpl, expectedOldValue);
+    });
+
+    assertThat(thrown).isInstanceOf(EntryNotFoundException.class);
+    verify(spyPR, times(1)).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+  }
+
+  @Test
+  public void destroyInBucketRemoteOperationDoesNotThrowEntryNotFoundExceptionForPossibleDuplicateEvent()
+      throws Exception {
+    int bucketId = 0;
+    Object expectedOldValue = "expectedOldValue";
+    EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+    when(entryEventImpl.isPossibleDuplicate()).thenReturn(true);
+    KeyInfo keyInfo = mock(KeyInfo.class);
+    when(keyInfo.getBucketId()).thenReturn(bucketId);
+    when(entryEventImpl.getKeyInfo()).thenReturn(keyInfo);
+    InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+    PartitionedRegion spyPR = spy(partitionedRegion);
+    doReturn(primaryMember).when(spyPR).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+    doThrow(EntryNotFoundException.class).when(spyPR).destroyRemotely(eq(primaryMember),
+        eq(bucketId), eq(entryEventImpl), eq(expectedOldValue));
+
+    spyPR.destroyInBucket(entryEventImpl, expectedOldValue);
+
+    verify(spyPR, times(1)).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+  }
 }
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
new file mode 100644
index 0000000..75650ff
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedPutAllOperation;
+import org.apache.geode.internal.cache.InternalDataView;
+import org.apache.geode.internal.cache.PartitionedRegion;
+
+public class PutAllPRMessageTest {
+
+  @Test
+  public void doPostPutAllCallsCheckReadinessBeforeAndAfter() throws Exception {
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    DistributedPutAllOperation distributedPutAllOperation = mock(DistributedPutAllOperation.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalDataView internalDataView = mock(InternalDataView.class);
+    when(bucketRegion.getDataView()).thenReturn(internalDataView);
+    PutAllPRMessage putAllPRMessage = new PutAllPRMessage();
+
+    putAllPRMessage.doPostPutAll(partitionedRegion, distributedPutAllOperation, bucketRegion, true);
+
+    InOrder inOrder = inOrder(partitionedRegion, internalDataView);
+    inOrder.verify(partitionedRegion).checkReadiness();
+    inOrder.verify(internalDataView).postPutAll(any(), any(), any());
+    inOrder.verify(partitionedRegion).checkReadiness();
+  }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
index e9e4fb0..f8f7102 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
@@ -14,12 +14,20 @@
  */
 package org.apache.geode.internal.cache.partitioned;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import org.junit.Test;
+import org.mockito.InOrder;
 
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
+import org.apache.geode.internal.cache.InternalDataView;
+import org.apache.geode.internal.cache.PartitionedRegion;
 
 public class RemoveAllPRMessageTest {
 
@@ -32,4 +40,24 @@ public class RemoveAllPRMessageTest {
 
     verify(mockRemoveAllPRMessage, times(1)).appendFields(stringBuilder);
   }
+
+
+  @Test
+  public void doPostRemoveAllCallsCheckReadinessBeforeAndAfter() throws Exception {
+    PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+    DistributedRemoveAllOperation distributedRemoveAllOperation =
+        mock(DistributedRemoveAllOperation.class);
+    BucketRegion bucketRegion = mock(BucketRegion.class);
+    InternalDataView internalDataView = mock(InternalDataView.class);
+    when(bucketRegion.getDataView()).thenReturn(internalDataView);
+    RemoveAllPRMessage removeAllPRMessage = new RemoveAllPRMessage();
+
+    removeAllPRMessage.doPostRemoveAll(partitionedRegion, distributedRemoveAllOperation,
+        bucketRegion, true);
+
+    InOrder inOrder = inOrder(partitionedRegion, internalDataView);
+    inOrder.verify(partitionedRegion).checkReadiness();
+    inOrder.verify(internalDataView).postRemoveAll(any(), any(), any());
+    inOrder.verify(partitionedRegion).checkReadiness();
+  }
 }