You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/08/08 18:26:46 UTC

[geode] branch featrue/GEODE-5541 updated: GEODE-5541: A create followed by a destroy of the same key will not invoke cache listener on remote nodes.

This is an automated email from the ASF dual-hosted git repository.

eshu11 pushed a commit to branch featrue/GEODE-5541
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/featrue/GEODE-5541 by this push:
     new cd887d7  GEODE-5541: A create followed by a destroy of the same key will not invoke cache listener on remote nodes.
cd887d7 is described below

commit cd887d7f6f2646a2ba2427fb020067172a333d10
Author: eshu <es...@pivotal.io>
AuthorDate: Wed Aug 8 11:23:37 2018 -0700

    GEODE-5541: A create followed by a destroy of the same key will not invoke cache listener on remote nodes.
---
 ...tPartitionedRegionWithTransactionDUnitTest.java | 61 ++++++++++++++++++++++
 .../geode/internal/cache/TXCommitMessage.java      | 14 ++++-
 .../geode/internal/cache/RegionCommitTest.java     | 59 +++++++++++++++++++++
 3 files changed, 133 insertions(+), 1 deletion(-)

diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java
index 9460e63..b30474f 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/partitioned/PersistentPartitionedRegionWithTransactionDUnitTest.java
@@ -15,13 +15,19 @@
 package org.apache.geode.internal.cache.partitioned;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
 
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheListener;
 import org.apache.geode.cache.CacheTransactionManager;
 import org.apache.geode.cache.Region;
+import org.apache.geode.cache.util.CacheListenerAdapter;
 import org.apache.geode.internal.cache.DiskRegion;
 import org.apache.geode.internal.cache.DiskStoreImpl;
 import org.apache.geode.internal.cache.PartitionedRegion;
@@ -207,4 +213,59 @@ public class PersistentPartitionedRegionWithTransactionDUnitTest
       }
     });
   }
+
+  @Test
+  public void NoDestroyInvocationIfCreateEntryAndDestroyItInTransaction() throws Throwable {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+
+    int redundancy = 2;
+
+    vm0.invoke(() -> getCacheSetAlwaysFireLocalListeners());
+    vm1.invoke(() -> getCacheSetAlwaysFireLocalListeners());
+    vm2.invoke(() -> getCacheSetAlwaysFireLocalListeners());
+
+    createPR(vm0, redundancy);
+    createPR(vm1, redundancy);
+    createPR(vm2, redundancy);
+
+    vm0.invoke(() -> addListener());
+    vm1.invoke(() -> addListener());
+    vm2.invoke(() -> addListener());
+
+    vm0.invoke(() -> {
+      Cache cache = getCache();
+      TXManagerImpl txManager = (TXManagerImpl) cache.getCacheTransactionManager();
+      Region region = cache.getRegion(getPartitionedRegionName());
+      txManager.begin();
+      region.create(1, "toBeDestroyed");
+      region.destroy(1);
+      txManager.commit();
+    });
+
+    vm0.invoke(() -> verifyNoDestroyInvocation());
+    vm1.invoke(() -> verifyNoDestroyInvocation());
+    vm2.invoke(() -> verifyNoDestroyInvocation());
+  }
+
+  private void getCacheSetAlwaysFireLocalListeners() {
+    System.setProperty("gemfire.BucketRegion.alwaysFireLocalListeners", "true");
+    getCache();
+  }
+
+  private void addListener() {
+    Cache cache = getCache();
+    Region region = cache.getRegion(getPartitionedRegionName());
+    CacheListener listener = spy(new CacheListenerAdapter() {});
+    region.getAttributesMutator().addCacheListener(listener);
+  }
+
+  private void verifyNoDestroyInvocation() throws Exception {
+    Cache cache = getCache();
+    Region region = cache.getRegion(getPartitionedRegionName());
+    CacheListener listener = region.getAttributes().getCacheListeners()[0];
+    verify(listener, never()).afterDestroy(any());
+  }
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
index 13f15ea..550d5bf 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/TXCommitMessage.java
@@ -1297,9 +1297,13 @@ public class TXCommitMessage extends PooledDistributionMessage
         entryOp.versionTag.replaceNullIDs(this.msg.getSender());
       }
       if (entryOp.op.isDestroy()) {
+        boolean invokeCallbacks =
+            isOpDestroyEvent(internalRegion, entryOp.op.isDestroy(), entryOp.key);
+        List<EntryEventImpl> whichPendingCallbacks =
+            invokeCallbacks ? pendingCallbacks : new ArrayList<EntryEventImpl>();
         this.internalRegion.txApplyDestroy(entryOp.key, this.msg.txIdent, this.txEvent,
             this.needsUnlock,
-            entryOp.op, getEventId(entryOp), entryOp.callbackArg, pendingCallbacks,
+            entryOp.op, getEventId(entryOp), entryOp.callbackArg, whichPendingCallbacks,
             entryOp.filterRoutingInfo, this.msg.bridgeContext, false /* origin remote */,
             null/* txEntryState */, entryOp.versionTag, entryOp.tailKey);
       } else if (entryOp.op.isInvalidate()) {
@@ -1316,6 +1320,14 @@ public class TXCommitMessage extends PooledDistributionMessage
       }
     }
 
+    boolean isOpDestroyEvent(InternalRegion internalRegion, boolean isOpDestroy, Object key) {
+      // Note that if the region is a proxy(empty) then we go ahead and add
+      // the destroy callback because we may need to forward the event to clients.
+      RegionEntry regionEntry = internalRegion.basicGetEntry(key);
+      return isOpDestroy && (internalRegion.isProxy()
+          || (regionEntry != null && !Token.isRemoved(regionEntry.getValue())));
+    }
+
     /**
      * Apply a single tx entry op on the far side
      */
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java
index 193faf2..4f91dfd 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/RegionCommitTest.java
@@ -52,4 +52,63 @@ public class RegionCommitTest {
     RegionCommit regionCommit = new RegionCommit(txCommitMessage);
     assertThat(regionCommit.getRegionByPath(dm, path)).isEqualTo(region);
   }
+
+  @Test
+  public void isOpDestroyedEventReturnsFalseIfNotDestroyOperation() {
+    RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+
+    assertThat(regionCommit.isOpDestroyEvent(mock(InternalRegion.class), false, new Object()))
+        .isFalse();
+  }
+
+  @Test
+  public void isOpDestroyedEventReturnsFalseIfIsDestroyOperationAndRegionEntryToBeDestroyedIsNull() {
+    RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+    Object key = new Object();
+    when(region.basicGetEntry(key)).thenReturn(null);
+
+    assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isFalse();
+  }
+
+  @Test
+  public void isOpDestroyedEventReturnsFalseIfIsDestroyOperationAndRegionEntryToBeDestroyedIsRemovedToken() {
+    RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+    Object key = new Object();
+    RegionEntry regionEntry = mock(RegionEntry.class);
+    when(region.basicGetEntry(key)).thenReturn(regionEntry);
+    when(regionEntry.getValue()).thenReturn(Token.DESTROYED);
+
+    assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isFalse();
+  }
+
+  @Test
+  public void isOpDestroyedEventReturnsFalseIfIsDestroyOperationAndRegionEntryToBeDestroyedIsTombstone() {
+    RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+    Object key = new Object();
+    RegionEntry regionEntry = mock(RegionEntry.class);
+    when(region.basicGetEntry(key)).thenReturn(regionEntry);
+    when(regionEntry.getValue()).thenReturn(Token.TOMBSTONE);
+
+    assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isFalse();
+  }
+
+  @Test
+  public void isOpDestroyedEventReturnsTrueIfDestroyEntryOnEmptyRegion() {
+    RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+    Object key = new Object();
+    when(region.isProxy()).thenReturn(true);
+
+    assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isTrue();
+  }
+
+  @Test
+  public void isOpDestroyedEventReturnsTrueIfIsDestroyOperationAndRegionEntryIsNotAToken() {
+    RegionCommit regionCommit = new RegionCommit(txCommitMessage);
+    Object key = new Object();
+    RegionEntry regionEntry = mock(RegionEntry.class);
+    when(region.basicGetEntry(key)).thenReturn(regionEntry);
+    when(regionEntry.getValue()).thenReturn(new Token.NotAToken());
+
+    assertThat(regionCommit.isOpDestroyEvent(region, true, key)).isTrue();
+  }
 }