You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/08 18:26:46 UTC
[geode] branch featrue/GEODE-5541 updated: GEODE-5541: A create
followed by a destroy of the same key will not invoke cache listener on
remote nodes.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch featrue/GEODE-5541
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/featrue/GEODE-5541 by this push:
new cd887d7 GEODE-5541: A create followed by a destroy of the same key will not invoke cache listener on remote nodes.
cd887d7 is described below
commit cd887d7f6f2646a2ba2427fb020067172a333d10
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Aug 8 11:23:37 2018 -0700
GEODE-5541: A create followed by a destroy of the same key will not invoke cache listener on remote nodes.
---
...tPartitionedRegionWithTransactionDUnitTest.java | 61 ++++++++++++++++++++++
.../geode/internal/cache/TXCommitMessage.java | 14 ++++-
.../geode/internal/cache/RegionCommitTest.java | 59 +++++++++++++++++++++
3 files changed, 133 insertions(+), 1 deletion(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java
index 9460e63..b30474f 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java
@@ -15,13 +15,19 @@
package org.apache.geode.internal.cache.partitioned;
import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.CacheTransactionManager;
import org.apache.geode.cache.Region;
+import org.apache.geode.cache.util.CacheListenerAdapter;
import org.apache.geode.internal.cache.DiskRegion;
import org.apache.geode.internal.cache.DiskStoreImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
@@ -207,4 +213,59 @@ public class PersistentPartitionedRegionWithTransactionDUnitTest
}
});
}
+
+ @Test
+ public void NoDestroyInvocationIfCreateEntryAndDestroyItInTransaction() throws Throwable {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ VM vm2 = host.getVM(2);
+
+ int redundancy = 2;
+
+ vm0.invoke(() -> getCacheSetAlwaysFireLocalListeners());
+ vm1.invoke(() -> getCacheSetAlwaysFireLocalListeners());
+ vm2.invoke(() -> getCacheSetAlwaysFireLocalListeners());
+
+ createPR(vm0, redundancy);
+ createPR(vm1, redundancy);
+ createPR(vm2, redundancy);
+
+ vm0.invoke(() -> addListener());
+ vm1.invoke(() -> addListener());
+ vm2.invoke(() -> addListener());
+
+ vm0.invoke(() -> {
+ Cache cache = getCache();
+ TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+ Region region = cache.getRegion(getPartitionedRegionName());
+ txManager.begin();
+ region.create(1, "toBeDestroyed");
+ region.destroy(1);
+ txManager.commit();
+ });
+
+ vm0.invoke(() -> verifyNoDestroyInvocation());
+ vm1.invoke(() -> verifyNoDestroyInvocation());
+ vm2.invoke(() -> verifyNoDestroyInvocation());
+ }
+
+ private void getCacheSetAlwaysFireLocalListeners() {
+ System.setProperty("gemfire.BucketRegion.alwaysFireLocalListeners", "true");
+ getCache();
+ }
+
+ private void addListener() {
+ Cache cache = getCache();
+ Region region = cache.getRegion(getPartitionedRegionName());
+ CacheListener listener = spy(new CacheListenerAdapter() {});
+ region.getAttributesMutator().addCacheListener(listener);
+ }
+
+ private void verifyNoDestroyInvocation() throws Exception {
+ Cache cache = getCache();
+ Region region = cache.getRegion(getPartitionedRegionName());
+ CacheListener listener = region.getAttributes().getCacheListeners()[0];
+ verify(listener, never()).afterDestroy(any());
+ }
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 13f15ea..550d5bf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1297,9 +1297,13 @@ public class TXCommitMessage extends PooledDistributionMessage
entryOp.versionTag.replaceNullIDs(this.msg.getSender());
}
if (entryOp.op.isDestroy()) {
+ boolean invokeCallbacks =
+ isOpDestroyEvent(internalRegion, entryOp.op.isDestroy(), entryOp.key);
+ List<EntryEventImpl> whichPendingCallbacks =
+ invokeCallbacks ? pendingCallbacks : new ArrayList<EntryEventImpl>();
this.internalRegion.txApplyDestroy(entryOp.key, this.msg.txIdent, this.txEvent,
this.needsUnlock,
- entryOp.op, getEventId(entryOp), entryOp.callbackArg, pendingCallbacks,
+ entryOp.op, getEventId(entryOp), entryOp.callbackArg, whichPendingCallbacks,
entryOp.filterRoutingInfo, this.msg.bridgeContext, false /* origin remote */,
null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
} else if (entryOp.op.isInvalidate()) {
@@ -1316,6 +1320,14 @@ public class TXCommitMessage extends PooledDistributionMessage
}
}
+ boolean isOpDestroyEvent(InternalRegion internalRegion, boolean isOpDestroy, Object key) {
+ // Note that if the region is a proxy(empty) then we go ahead and add
+ // the destroy callback because we may need to forward the event to clients.
+ RegionEntry regionEntry = internalRegion.basicGetEntry(key);
+ return isOpDestroy && (internalRegion.isProxy()
+ || (regionEntry != null && !Token.isRemoved(regionEntry.getValue())));
+ }
+
/**
* Apply a single tx entry op on the far side
*/
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java
index 193faf2..4f91dfd 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java
@@ -52,4 +52,63 @@ public class RegionCommitTest {
RegionCommit regionCommit = new RegionCommit(txCommitMessage);
assertThat(regionCommit.getRegionByPath(dm, path)).isEqualTo(region);
}
+
+ @Test
+ public void isOpDestroyedEventReturnsFalseIfNotDestroyOperation() {
+ RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+
+ assertThat(regionCommit.isOpDestroyEvent(mock(InternalRegion.class), false, new Object()))
+ .isFalse();
+ }
+
+ @Test
+ public void isOpDestroyedEventReturnsFalseIfIsDestroyOperationAndRegionEntryToBeDestroyedIsNull() {
+ RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+ Object key = new Object();
+ when(region.basicGetEntry(key)).thenReturn(null);
+
+ assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isFalse();
+ }
+
+ @Test
+ public void isOpDestroyedEventReturnsFalseIfIsDestroyOperationAndRegionEntryToBeDestroyedIsRemovedToken() {
+ RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+ Object key = new Object();
+ RegionEntry regionEntry = mock(RegionEntry.class);
+ when(region.basicGetEntry(key)).thenReturn(regionEntry);
+ when(regionEntry.getValue()).thenReturn(Token.DESTROYED);
+
+ assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isFalse();
+ }
+
+ @Test
+ public void isOpDestroyedEventReturnsFalseIfIsDestroyOperationAndRegionEntryToBeDestroyedIsTombstone() {
+ RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+ Object key = new Object();
+ RegionEntry regionEntry = mock(RegionEntry.class);
+ when(region.basicGetEntry(key)).thenReturn(regionEntry);
+ when(regionEntry.getValue()).thenReturn(Token.TOMBSTONE);
+
+ assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isFalse();
+ }
+
+ @Test
+ public void isOpDestroyedEventReturnsTrueIfDestroyEntryOnEmptyRegion() {
+ RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+ Object key = new Object();
+ when(region.isProxy()).thenReturn(true);
+
+ assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isTrue();
+ }
+
+ @Test
+ public void isOpDestroyedEventReturnsTrueIfIsDestroyOperationAndRegionEntryIsNotAToken() {
+ RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+ Object key = new Object();
+ RegionEntry regionEntry = mock(RegionEntry.class);
+ when(region.basicGetEntry(key)).thenReturn(regionEntry);
+ when(regionEntry.getValue()).thenReturn(new Token.NotAToken());
+
+ assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isTrue();
+ }
}