You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by vf...@apache.org on 2015/11/25 20:07:17 UTC
[04/50] [abbrv] incubator-geode git commit: GEODE-278 Enque events in
one more spot.
GEODE-278 Enque events in one more spot.
With the previous commit, I had missed the code block for Adjunct messages. Enque tx events in that block too.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/4708d4e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/4708d4e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/4708d4e1
Branch: refs/heads/feature/GEODE-78
Commit: 4708d4e182f89c6a391fcad8bac854f929717685
Parents: 7bc0112
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Thu Sep 17 15:43:52 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Thu Sep 17 15:43:52 2015 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/TXCommitMessage.java | 29 ++--
.../cache/ClientServerTransactionDUnitTest.java | 151 ++++++++++++++++---
2 files changed, 141 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index 94aaadc..2a597e9 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -737,7 +737,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
public void basicProcessOps() {
{
- List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
+ List<EntryEventImpl> pendingCallbacks = new ArrayList<>(this.farSideEntryOps.size());
Collections.sort(this.farSideEntryOps);
Iterator it = this.farSideEntryOps.iterator();
while (it.hasNext()) {
@@ -758,14 +758,18 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
Iterator<EntryEventImpl> ci = callbacks.iterator();
while(ci.hasNext()) {
EntryEventImpl ee = ci.next();
- if(ee.getOperation().isDestroy()) {
- ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
- } else if(ee.getOperation().isInvalidate()) {
- ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
- } else if(ee.getOperation().isCreate()) {
- ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
- } else {
- ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+ try {
+ if (ee.getOperation().isDestroy()) {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
+ } else if (ee.getOperation().isInvalidate()) {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
+ } else if (ee.getOperation().isCreate()) {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
+ } else {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+ }
+ } finally {
+ ee.release();
}
}
}
@@ -1294,7 +1298,6 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
* This happens when we don't have the bucket and are getting adjunct notification
*/
EntryEventImpl eei = AbstractRegionMap.createCBEvent(this.r, entryOp.op, entryOp.key, entryOp.value, this.msg.txIdent, txEvent, getEventId(entryOp), entryOp.callbackArg,entryOp.filterRoutingInfo,this.msg.bridgeContext, null, entryOp.versionTag, entryOp.tailKey);
- try {
if(entryOp.filterRoutingInfo!=null) {
eei.setLocalFilterInfo(entryOp.filterRoutingInfo.getFilterInfo(this.r.getCache().getMyId()));
}
@@ -1309,10 +1312,8 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
// the message was sent and already reflects the change caused by this event.
// In the latter case we need to invoke listeners
final boolean skipListeners = !isDuplicate;
- eei.invokeCallbacks(this.r, skipListeners, true);
- } finally {
- eei.release();
- }
+ eei.setInvokePRCallbacks(!skipListeners);
+ pendingCallbacks.add(eei);
return;
}
if (logger.isDebugEnabled()) {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/4708d4e1/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
index 51a8dea..4b65a95 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/ClientServerTransactionDUnitTest.java
@@ -21,34 +21,11 @@ import java.util.concurrent.TimeUnit;
import javax.naming.Context;
import javax.transaction.UserTransaction;
+import com.gemstone.gemfire.LogWriter;
+import com.gemstone.gemfire.cache.*;
import org.junit.Ignore;
-import com.gemstone.gemfire.cache.AttributesFactory;
-import com.gemstone.gemfire.cache.Cache;
-import com.gemstone.gemfire.cache.CacheFactory;
-import com.gemstone.gemfire.cache.CacheListener;
-import com.gemstone.gemfire.cache.CacheLoader;
-import com.gemstone.gemfire.cache.CacheLoaderException;
-import com.gemstone.gemfire.cache.CacheTransactionManager;
-import com.gemstone.gemfire.cache.CacheWriterException;
-import com.gemstone.gemfire.cache.CommitConflictException;
-import com.gemstone.gemfire.cache.DataPolicy;
-import com.gemstone.gemfire.cache.EntryEvent;
-import com.gemstone.gemfire.cache.LoaderHelper;
-import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Region.Entry;
-import com.gemstone.gemfire.cache.RegionFactory;
-import com.gemstone.gemfire.cache.RegionShortcut;
-import com.gemstone.gemfire.cache.Scope;
-import com.gemstone.gemfire.cache.TransactionDataNodeHasDepartedException;
-import com.gemstone.gemfire.cache.TransactionDataNotColocatedException;
-import com.gemstone.gemfire.cache.TransactionEvent;
-import com.gemstone.gemfire.cache.TransactionException;
-import com.gemstone.gemfire.cache.TransactionId;
-import com.gemstone.gemfire.cache.TransactionInDoubtException;
-import com.gemstone.gemfire.cache.TransactionWriter;
-import com.gemstone.gemfire.cache.TransactionWriterException;
-import com.gemstone.gemfire.cache.UnsupportedOperationInTransactionException;
import com.gemstone.gemfire.cache.client.ClientCache;
import com.gemstone.gemfire.cache.client.ClientCacheFactory;
import com.gemstone.gemfire.cache.client.ClientRegionFactory;
@@ -3250,4 +3227,128 @@ public void testClientCommitAndDataStoreGetsEvent() throws Exception {
});
}
+
+ public void testAdjunctMessage() {
+ Host host = Host.getHost(0);
+ VM server1 = host.getVM(0);
+ VM server2 = host.getVM(1);
+ VM client1 = host.getVM(2);
+ VM client2 = host.getVM(3);
+ final String regionName = "testAdjunctMessage";
+
+ final int port1 = createRegionsAndStartServer(server1, false);
+ final int port2 = createRegionsAndStartServer(server2, false);
+
+ SerializableCallable createServerRegionWithInterest = new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ RegionFactory rf = getCache().createRegionFactory(RegionShortcut.PARTITION);
+ rf.setSubscriptionAttributes(new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT));
+ rf.create(regionName);
+ return null;
+ }
+ };
+ server1.invoke(createServerRegionWithInterest);
+ server2.invoke(createServerRegionWithInterest);
+
+ // get two colocated keys on server1
+ final List<String> keys = (List<String>) server1.invoke(new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ Region r = getCache().getRegion(regionName);
+ PartitionedRegion pr = (PartitionedRegion) r;
+ List<String> server1Keys = new ArrayList<String>();
+ for (int i=0; i<100; i++) {
+ String key = "k"+i;
+ //pr.put(key, "v" + i);
+ DistributedMember owner = pr.getOwnerForKey(pr.getKeyInfo(key));
+ if (owner.equals(pr.getMyId())) {
+ server1Keys.add(key);
+ if (server1Keys.size() == 2) {
+ break;
+ }
+ }
+ }
+ return server1Keys;
+ }
+ });
+
+ class ClientListener extends CacheListenerAdapter {
+ Set keys = new HashSet();
+ @Override
+ public void afterCreate(EntryEvent event) {
+ add(event);
+ }
+ @Override
+ public void afterUpdate(EntryEvent event) {
+ add(event);
+ }
+ private void add(EntryEvent event) {
+ keys.add(event.getKey());
+ }
+ }
+ client2.invoke(new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
+ ClientCacheFactory ccf = new ClientCacheFactory();
+ ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port2);
+ ccf.setPoolMinConnections(0);
+ ccf.setPoolSubscriptionEnabled(true);
+ ccf.setPoolSubscriptionRedundancy(0);
+ ccf.set("log-level", getDUnitLogLevel());
+ ClientCache cCache = getClientCache(ccf);
+ Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).addCacheListener(new ClientListener()).create(regionName);
+ r.registerInterestRegex(".*");
+ //cCache.readyForEvents();
+ return null;
+ }
+ });
+ client1.invoke(new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ System.setProperty("gemfire.bridge.disableShufflingOfEndpoints", "true");
+ ClientCacheFactory ccf = new ClientCacheFactory();
+ ccf.addPoolServer("localhost"/*getServerHostName(Host.getHost(0))*/, port1);
+ ccf.setPoolMinConnections(0);
+ ccf.setPoolSubscriptionEnabled(true);
+ ccf.set("log-level", getDUnitLogLevel());
+ ClientCache cCache = getClientCache(ccf);
+ Region r = cCache.createClientRegionFactory(ClientRegionShortcut.CACHING_PROXY).create(regionName);
+ getCache().getCacheTransactionManager().begin();
+ for (String key : keys) {
+ r.put(key, "value");
+ }
+ getCache().getCacheTransactionManager().commit();
+ return null;
+ }
+ });
+ client2.invoke(new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ Region r = getCache().getRegion(regionName);
+ CacheListener[] listeners = r.getAttributes().getCacheListeners();
+ boolean foundListener = false;
+ for (CacheListener listener : listeners) {
+ if (listener instanceof ClientListener) {
+ foundListener = true;
+ final ClientListener clientListener = (ClientListener) listener;
+ WaitCriterion wc = new WaitCriterion() {
+ @Override
+ public boolean done() {
+ return clientListener.keys.containsAll(keys);
+ }
+ @Override
+ public String description() {
+ return "expected:"+keys+" found:"+clientListener.keys;
+ }
+ };
+ DistributedTestCase.waitForCriterion(wc, 30*1000, 500, true);
+ }
+ }
+ assertTrue(foundListener);
+ return null;
+ }
+ });
+ }
}