You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2018/09/12 18:48:07 UTC
[geode] branch develop updated: GEODE-5605: After removeAll/PutAll
messages are processed on replicas, check for cache close is done. (#2450)
This is an automated email from the ASF dual-hosted git repository.
agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new d9bb24d GEODE-5605: After removeAll/PutAll messages are processed on replicas, check for cache close is done. (#2450)
d9bb24d is described below
commit d9bb24d95bd7d310ae0ce693fe1d84774c2f521c
Author: agingade <ag...@pivotal.io>
AuthorDate: Wed Sep 12 11:47:57 2018 -0700
GEODE-5605: After removeAll/PutAll messages are processed on replicas, check for cache close is done. (#2450)
GEODE-5605: After removeAll/PutAll messages are processed on replicas, check for cache close is done.
---
.../geode/internal/cache/DistributedRegion.java | 4 ++
.../geode/internal/cache/PartitionedRegion.java | 12 ++++++
.../cache/partitioned/PutAllPRMessage.java | 28 ++++++++-----
.../cache/partitioned/RemoveAllPRMessage.java | 28 ++++++++-----
.../internal/cache/PartitionedRegionTest.java | 45 ++++++++++++++++++++
.../cache/partitioned/PutAllPRMessageTest.java | 49 ++++++++++++++++++++++
.../cache/partitioned/RemoveAllPRMessageTest.java | 28 +++++++++++++
7 files changed, 172 insertions(+), 22 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
index d9f5903..42dcd98 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DistributedRegion.java
@@ -2477,6 +2477,10 @@ public class DistributedRegion extends LocalRegion implements InternalDistribute
this.getFullPath()), ex);
}
}
+ waitForCurrentOperations();
+ }
+
+ protected void waitForCurrentOperations() {
// Fix for #48066 - make sure that region operations are completely
// distributed to peers before destroying the region.
Boolean flushOnClose =
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index 86b35dc..61829b8 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -5307,6 +5307,18 @@ public class PartitionedRegion extends LocalRegion
retryTime = new RetryTimeKeeper(this.retryTimeout);
}
currentTarget = getOrCreateNodeForBucketWrite(bucketId, retryTime);
+ } catch (EntryNotFoundException entryNotFoundException) {
+ if (!event.isPossibleDuplicate()) {
+ throw entryNotFoundException;
+ }
+ // EntryNotFoundException during retry attempt. The operation
+ // may have been applied during the initial attempt.
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "EntryNotFoundException is ignored during retry attempt for destroy operation. "
+ + event);
+ }
+ return;
}
// If we get here, the attempt failed.
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
index b6450b3..a56ea79 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessage.java
@@ -507,17 +507,7 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
// encounter cacheWriter exception
partialKeys.saveFailedKey(key, cwe);
} finally {
- try {
- // Only PutAllPRMessage knows if the thread id is fake. Event has no idea.
- // So we have to manually set useFakeEventId for this DPAO
- dpao.setUseFakeEventId(true);
- r.checkReadiness();
- bucketRegion.getDataView().postPutAll(dpao, this.versions, bucketRegion);
- } finally {
- if (lockedForPrimary) {
- bucketRegion.doUnlockForPrimary();
- }
- }
+ doPostPutAll(r, dpao, bucketRegion, lockedForPrimary);
}
if (partialKeys.hasFailure()) {
partialKeys.addKeysAndVersions(this.versions);
@@ -559,6 +549,22 @@ public class PutAllPRMessage extends PartitionMessageWithDirectReply {
return true;
}
+ void doPostPutAll(PartitionedRegion r, DistributedPutAllOperation dpao,
+ BucketRegion bucketRegion, boolean lockedForPrimary) {
+ try {
+ // Only PutAllPRMessage knows if the thread id is fake. Event has no idea.
+ // So we have to manually set useFakeEventId for this DPAO
+ dpao.setUseFakeEventId(true);
+ r.checkReadiness();
+ bucketRegion.getDataView().postPutAll(dpao, this.versions, bucketRegion);
+ r.checkReadiness();
+ } finally {
+ if (lockedForPrimary) {
+ bucketRegion.doUnlockForPrimary();
+ }
+ }
+ }
+
public VersionedObjectList getVersions() {
return this.versions;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
index d9f1f47..bfc009d 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessage.java
@@ -516,17 +516,7 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
// encounter cacheWriter exception
partialKeys.saveFailedKey(key, cwe);
} finally {
- try {
- // Only RemoveAllPRMessage knows if the thread id is fake. Event has no idea.
- // So we have to manually set useFakeEventId for this op
- op.setUseFakeEventId(true);
- r.checkReadiness();
- bucketRegion.getDataView().postRemoveAll(op, this.versions, bucketRegion);
- } finally {
- if (lockedForPrimary) {
- bucketRegion.doUnlockForPrimary();
- }
- }
+ doPostRemoveAll(r, op, bucketRegion, lockedForPrimary);
}
if (partialKeys.hasFailure()) {
partialKeys.addKeysAndVersions(this.versions);
@@ -567,6 +557,22 @@ public class RemoveAllPRMessage extends PartitionMessageWithDirectReply {
return true;
}
+ void doPostRemoveAll(PartitionedRegion r, DistributedRemoveAllOperation op,
+ BucketRegion bucketRegion, boolean lockedForPrimary) {
+ try {
+ // Only RemoveAllPRMessage knows if the thread id is fake. Event has no idea.
+ // So we have to manually set useFakeEventId for this op
+ op.setUseFakeEventId(true);
+ r.checkReadiness();
+ bucketRegion.getDataView().postRemoveAll(op, this.versions, bucketRegion);
+ r.checkReadiness();
+ } finally {
+ if (lockedForPrimary) {
+ bucketRegion.doUnlockForPrimary();
+ }
+ }
+ }
+
public VersionedObjectList getVersions() {
return this.versions;
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
index 2e1db35..3b6dc2a 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -15,12 +15,14 @@
package org.apache.geode.internal.cache;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.ThrowableAssert.catchThrowable;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -37,6 +39,7 @@ import org.junit.Before;
import org.junit.Test;
import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.EntryNotFoundException;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
@@ -175,4 +178,46 @@ public class PartitionedRegionTest {
verify(spyPR, times(1)).getNodeForBucketWrite(anyInt(), isNull());
}
+ @Test
+ public void destroyInBucketRemoteOperationThrowsEntryNotFoundException() throws Exception {
+ int bucketId = 0;
+ Object expectedOldValue = "expectedOldValue";
+ KeyInfo keyInfo = mock(KeyInfo.class);
+ when(keyInfo.getBucketId()).thenReturn(bucketId);
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+ when(entryEventImpl.getKeyInfo()).thenReturn(keyInfo);
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+ doThrow(EntryNotFoundException.class).when(spyPR).destroyRemotely(eq(primaryMember),
+ eq(bucketId), eq(entryEventImpl), eq(expectedOldValue));
+
+ Throwable thrown = catchThrowable(() -> {
+ spyPR.destroyInBucket(entryEventImpl, expectedOldValue);
+ });
+
+ assertThat(thrown).isInstanceOf(EntryNotFoundException.class);
+ verify(spyPR, times(1)).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+ }
+
+ @Test
+ public void destroyInBucketRemoteOperationDoesNotThrowEntryNotFoundExceptionForPossibleDuplicateEvent()
+ throws Exception {
+ int bucketId = 0;
+ Object expectedOldValue = "expectedOldValue";
+ EntryEventImpl entryEventImpl = mock(EntryEventImpl.class);
+ when(entryEventImpl.isPossibleDuplicate()).thenReturn(true);
+ KeyInfo keyInfo = mock(KeyInfo.class);
+ when(keyInfo.getBucketId()).thenReturn(bucketId);
+ when(entryEventImpl.getKeyInfo()).thenReturn(keyInfo);
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+ doThrow(EntryNotFoundException.class).when(spyPR).destroyRemotely(eq(primaryMember),
+ eq(bucketId), eq(entryEventImpl), eq(expectedOldValue));
+
+ spyPR.destroyInBucket(entryEventImpl, expectedOldValue);
+
+ verify(spyPR, times(1)).getOrCreateNodeForBucketWrite(eq(bucketId), isNull());
+ }
}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
new file mode 100644
index 0000000..75650ff
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/PutAllPRMessageTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.partitioned;
+
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.Test;
+import org.mockito.InOrder;
+
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedPutAllOperation;
+import org.apache.geode.internal.cache.InternalDataView;
+import org.apache.geode.internal.cache.PartitionedRegion;
+
+public class PutAllPRMessageTest {
+
+ @Test
+ public void doPostPutAllCallsCheckReadinessBeforeAndAfter() throws Exception {
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ DistributedPutAllOperation distributedPutAllOperation = mock(DistributedPutAllOperation.class);
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ InternalDataView internalDataView = mock(InternalDataView.class);
+ when(bucketRegion.getDataView()).thenReturn(internalDataView);
+ PutAllPRMessage putAllPRMessage = new PutAllPRMessage();
+
+ putAllPRMessage.doPostPutAll(partitionedRegion, distributedPutAllOperation, bucketRegion, true);
+
+ InOrder inOrder = inOrder(partitionedRegion, internalDataView);
+ inOrder.verify(partitionedRegion).checkReadiness();
+ inOrder.verify(internalDataView).postPutAll(any(), any(), any());
+ inOrder.verify(partitionedRegion).checkReadiness();
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
index e9e4fb0..f8f7102 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/partitioned/RemoveAllPRMessageTest.java
@@ -14,12 +14,20 @@
*/
package org.apache.geode.internal.cache.partitioned;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import org.junit.Test;
+import org.mockito.InOrder;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DistributedRemoveAllOperation;
+import org.apache.geode.internal.cache.InternalDataView;
+import org.apache.geode.internal.cache.PartitionedRegion;
public class RemoveAllPRMessageTest {
@@ -32,4 +40,24 @@ public class RemoveAllPRMessageTest {
verify(mockRemoveAllPRMessage, times(1)).appendFields(stringBuilder);
}
+
+
+ @Test
+ public void doPostRemoveAllCallsCheckReadinessBeforeAndAfter() throws Exception {
+ PartitionedRegion partitionedRegion = mock(PartitionedRegion.class);
+ DistributedRemoveAllOperation distributedRemoveAllOperation =
+ mock(DistributedRemoveAllOperation.class);
+ BucketRegion bucketRegion = mock(BucketRegion.class);
+ InternalDataView internalDataView = mock(InternalDataView.class);
+ when(bucketRegion.getDataView()).thenReturn(internalDataView);
+ RemoveAllPRMessage removeAllPRMessage = new RemoveAllPRMessage();
+
+ removeAllPRMessage.doPostRemoveAll(partitionedRegion, distributedRemoveAllOperation,
+ bucketRegion, true);
+
+ InOrder inOrder = inOrder(partitionedRegion, internalDataView);
+ inOrder.verify(partitionedRegion).checkReadiness();
+ inOrder.verify(internalDataView).postRemoveAll(any(), any(), any());
+ inOrder.verify(partitionedRegion).checkReadiness();
+ }
}