You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/09/01 18:49:14 UTC

[36/37] incubator-geode git commit: Fix for GEODE-278

Fix for GEODE-278

While applying changes to the Region, pass in List for gathering pendingCallbacks
rather than a null on the remote members to get the same behavior as transaction host.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0c13b4d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0c13b4d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0c13b4d6

Branch: refs/heads/feature/GEODE-77
Commit: 0c13b4d6ffbc73a2eb29202c24b27cf2b1f71611
Parents: 49d99d4
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Tue Aug 25 10:23:21 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Mon Aug 31 14:28:01 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/TXCommitMessage.java | 34 +++++++---
 .../cache/RemoteTransactionDUnitTest.java       | 68 ++++++++++++++++++++
 2 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c13b4d6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index f012bab..94aaadc 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -737,21 +737,39 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
   
   public void basicProcessOps() {
     {
+      List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
       Collections.sort(this.farSideEntryOps);
       Iterator it = this.farSideEntryOps.iterator();
       while (it.hasNext()) {
         try {
           RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp)it.next();
-          entryOp.process();
+          entryOp.process(pendingCallbacks);
         } catch (CacheRuntimeException problem) {
           processCacheRuntimeException(problem);
         } catch (Exception e ) {
           addProcessingException(e);
         }
       }
+      firePendingCallbacks(pendingCallbacks);
     }
   }
-  
+
+  private void firePendingCallbacks(List<EntryEventImpl> callbacks) {
+    Iterator<EntryEventImpl> ci = callbacks.iterator();
+    while(ci.hasNext()) {
+      EntryEventImpl ee = ci.next();
+      if(ee.getOperation().isDestroy()) {
+        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
+      } else if(ee.getOperation().isInvalidate()) {
+        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
+      } else if(ee.getOperation().isCreate()) {
+        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
+      } else {
+        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+      }
+    }
+  }
+
   protected void processCacheRuntimeException(CacheRuntimeException problem) {
     if (problem instanceof RegionDestroyedException) { // catch RegionDestroyedException
       addProcessingException(problem);
@@ -1262,7 +1280,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
      * Apply a single tx entry op on the far side
      */
     @SuppressWarnings("synthetic-access")
-    protected void txApplyEntryOp(FarSideEntryOp entryOp)
+    protected void txApplyEntryOp(FarSideEntryOp entryOp, List<EntryEventImpl> pendingCallbacks)
     {
       if (this.r == null) {
         return;
@@ -1312,7 +1330,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
                               entryOp.op,
                               getEventId(entryOp),
                               entryOp.callbackArg,
-                              null /* fire inline, no pending callbacks */,
+                              pendingCallbacks,
                               entryOp.filterRoutingInfo,
                               this.msg.bridgeContext,
                               false /* origin remote */,
@@ -1328,7 +1346,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
                                  false /*localOp*/,
                                  getEventId(entryOp),
                                  entryOp.callbackArg,
-                                 null /* fire inline, no pending callbacks */,
+                                 pendingCallbacks,
                                  entryOp.filterRoutingInfo,
                                  this.msg.bridgeContext,
                                  null/*txEntryState*/,
@@ -1343,7 +1361,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
                           this.txEvent,
                           getEventId(entryOp),
                           entryOp.callbackArg,
-                          null /* fire inline, no pending callbacks */,
+                          pendingCallbacks,
                           entryOp.filterRoutingInfo,
                           this.msg.bridgeContext,
                           null/*txEntryState*/,
@@ -1658,8 +1676,8 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
       /**
        * Performs this entryOp on the farside of a tx commit.
        */
-      public void process() {
-        txApplyEntryOp(this);
+      public void process(List<EntryEventImpl> pendingCallbacks) {
+        txApplyEntryOp(this, pendingCallbacks);
       }
       
       public void processAdjunctOnly() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c13b4d6/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
index 5834622..ccff0c6 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
@@ -4244,4 +4244,72 @@ protected static class ClientListener extends CacheListenerAdapter {
     Object value = entry._getValue();
     return value;
   }
+
+  /**
+   * Install Listeners and verify that they are invoked after all tx events have been applied to the cache
+   * see GEODE-278
+   */
+  public void testNonInlineRemoteEvents() {
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    final String key1 = "nonInline-1";
+    final String key2 = "nonInline-2";
+
+    class NonInlineListener extends CacheListenerAdapter {
+      boolean assertException = false;
+
+      @Override
+      public void afterCreate(EntryEvent event) {
+        if (event.getKey().equals(key1)) {
+          if (getCache().getRegion(D_REFERENCE).get(key2) == null) {
+            assertException = true;
+          }
+        }
+      }
+    }
+
+    SerializableCallable createRegionWithListener = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        createRegion(false, 0, null);
+        getCache().getRegion(D_REFERENCE).getAttributesMutator().addCacheListener(new NonInlineListener());
+        return null;
+      }
+    };
+
+    vm0.invoke(createRegionWithListener);
+    vm1.invoke(createRegionWithListener);
+
+    vm0.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        Region region = getCache().getRegion(D_REFERENCE);
+        CacheTransactionManager mgr = getCache().getCacheTransactionManager();
+        mgr.begin();
+        region.put(key1, "nonInlineValue-1");
+        region.put(key2, "nonInlineValue-2");
+        mgr.commit();
+        return null;
+      }
+    });
+
+    SerializableCallable verifyAssert = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        CacheListener[] listeners = getCache().getRegion(D_REFERENCE).getAttributes().getCacheListeners();
+        for (CacheListener listener : listeners) {
+          if (listener instanceof NonInlineListener) {
+            NonInlineListener l = (NonInlineListener) listener;
+            assertFalse(l.assertException);
+          }
+        }
+        return null;
+      }
+    };
+
+    vm0.invoke(verifyAssert);
+    vm1.invoke(verifyAssert);
+
+  }
 }