You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by sb...@apache.org on 2015/09/18 19:23:32 UTC

incubator-geode git commit: GEODE-278 Enque events in one more spot.

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 7bc011293 -> 4708d4e18


GEODE-278 Enque events in one more spot.

With the previous commit, I had missed the code block for Adjunct messages. Enque tx events in that block too.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4708d4e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4708d4e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4708d4e1

Branch: refs/heads/develop
Commit: 4708d4e182f89c6a391fcad8bac854f929717685
Parents: 7bc0112
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Thu Sep 17 15:43:52 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Thu Sep 17 15:43:52 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/TXCommitMessage.java |  29 ++--
 .../cache/ClientServerTransactionDUnitTest.java | 151 ++++++++++++++++---
 2 files changed, 141 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index 94aaadc..2a597e9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -737,7 +737,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
   
   public void basicProcessOps() {
     {
-      List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+      List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size());
       Collections.sort(this.farSideEntryOps);
       Iterator it = this.farSideEntryOps.iterator();
       while (it.hasNext()) {
@@ -758,14 +758,18 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
     Iterator<EntryEventImpl> ci = callbacks.iterator();
     while(ci.hasNext()) {
       EntryEventImpl ee = ci.next();
-      if(ee.getOperation().isDestroy()) {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
-      } else if(ee.getOperation().isInvalidate()) {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
-      } else if(ee.getOperation().isCreate()) {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
-      } else {
-        ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+      try {
+        if (ee.getOperation().isDestroy()) {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
+        } else if (ee.getOperation().isInvalidate()) {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
+        } else if (ee.getOperation().isCreate()) {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
+        } else {
+          ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+        }
+      } finally {
+        ee.release();
       }
     }
   }
@@ -1294,7 +1298,6 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
          * This happens when we don't have the bucket and are getting adjunct notification
          */
         EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey);
-        try {
         if(entryOp.filterRoutingInfo!=null) {
           eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
         }
@@ -1309,10 +1312,8 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
         // the message was sent and already reflects the change caused by this event.
         // In the latter case we need to invoke listeners
         final boolean skipListeners = !isDuplicate;
-        eei.invokeCallbacks(this.r, skipListeners, true);
-        } finally {
-          eei.release();
-        }
+        eei.setInvokePRCallbacks(!skipListeners);
+        pendingCallbacks.add(eei);
         return;
       }
       if (logger.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
index 51a8dea..4b65a95 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
@@ -21,34 +21,11 @@ import java.util.concurrent.TimeUnit;
 import javax.naming.Context;
 import javax.transaction.UserTransaction;
 
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.*;
 import org.junit.Ignore;
 
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheListener;
-import com.gemstone.gemfire.cache.CacheLoader;
-import com.gemstone.gemfire.cache.CacheLoaderException;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.CommitConflictException;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EntryEvent;
-import com.gemstone.gemfire.cache.LoaderHelper;
-import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.RegionFactory;
-import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
-import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
-import com.gemstone.gemfire.cache.TransactionEvent;
-import com.gemstone.gemfire.cache.TransactionException;
-import com.gemstone.gemfire.cache.TransactionId;
-import com.gemstone.gemfire.cache.TransactionInDoubtException;
-import com.gemstone.gemfire.cache.TransactionWriter;
-import com.gemstone.gemfire.cache.TransactionWriterException;
-import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
 import com.gemstone.gemfire.cache.client.ClientCache;
 import com.gemstone.gemfire.cache.client.ClientCacheFactory;
 import com.gemstone.gemfire.cache.client.ClientRegionFactory;
@@ -3250,4 +3227,128 @@ public void testClientCommitAndDataStoreGetsEvent() throws Exception {
     });
     
   }
+
+  public void testAdjunctMessage() {
+    Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = "testAdjunctMessage";
+
+    final int port1 = createRegionsAndStartServer(server1, false);
+    final int port2 = createRegionsAndStartServer(server2, false);
+
+    SerializableCallable createServerRegionWithInterest = new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        RegionFactory rf = getCache().createRegionFactory(RegionShortcut.PARTITION);
+        rf.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
+        rf.create(regionName);
+        return null;
+      }
+    };
+    server1.invoke(createServerRegionWithInterest);
+    server2.invoke(createServerRegionWithInterest);
+
+    // get two colocated keys on server1
+    final List<String> keys = (List<String>) server1.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        Region r = getCache().getRegion(regionName);
+        PartitionedRegion pr = (PartitionedRegion) r;
+        List<String> server1Keys = new ArrayList<String>();
+        for (int i=0; i<100; i++) {
+          String key = "k"+i;
+          //pr.put(key, "v" + i);
+          DistributedMember owner = pr.getOwnerForKey(pr.getKeyInfo(key));
+          if (owner.equals(pr.getMyId())) {
+            server1Keys.add(key);
+            if (server1Keys.size() == 2) {
+              break;
+            }
+          }
+        }
+        return server1Keys;
+      }
+    });
+
+    class ClientListener extends CacheListenerAdapter {
+      Set keys = new HashSet();
+      @Override
+      public void afterCreate(EntryEvent event) {
+        add(event);
+      }
+      @Override
+      public void afterUpdate(EntryEvent event) {
+        add(event);
+      }
+      private void add(EntryEvent event) {
+        keys.add(event.getKey());
+      }
+    }
+    client2.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
+        ClientCacheFactory ccf = new ClientCacheFactory();
+        ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port2);
+        ccf.setPoolMinConnections(0);
+        ccf.setPoolSubscriptionEnabled(true);
+        ccf.setPoolSubscriptionRedundancy(0);
+        ccf.set("log-level", getDUnitLogLevel());
+        ClientCache cCache = getClientCache(ccf);
+        Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).addCacheListener(new ClientListener()).create(regionName);
+        r.registerInterestRegex(".*");
+        //cCache.readyForEvents();
+        return null;
+      }
+    });
+    client1.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
+        ClientCacheFactory ccf = new ClientCacheFactory();
+        ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
+        ccf.setPoolMinConnections(0);
+        ccf.setPoolSubscriptionEnabled(true);
+        ccf.set("log-level", getDUnitLogLevel());
+        ClientCache cCache = getClientCache(ccf);
+        Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+        getCache().getCacheTransactionManager().begin();
+        for (String key : keys) {
+          r.put(key, "value");
+        }
+        getCache().getCacheTransactionManager().commit();
+        return null;
+      }
+    });
+    client2.invoke(new SerializableCallable() {
+      @Override
+      public Object call() throws Exception {
+        Region r = getCache().getRegion(regionName);
+        CacheListener[] listeners = r.getAttributes().getCacheListeners();
+        boolean foundListener = false;
+        for (CacheListener listener : listeners) {
+          if (listener instanceof ClientListener) {
+            foundListener = true;
+            final ClientListener clientListener = (ClientListener) listener;
+            WaitCriterion wc = new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return clientListener.keys.containsAll(keys);
+              }
+              @Override
+              public String description() {
+                return "expected:"+keys+" found:"+clientListener.keys;
+              }
+            };
+            DistributedTestCase.waitForCriterion(wc, 30*1000, 500, true);
+          }
+        }
+        assertTrue(foundListener);
+        return null;
+      }
+    });
+  }
 }