You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2015/09/01 18:49:14 UTC
[36/37] incubator-geode git commit: Fix for GEODE-278
Fix for GEODE-278
While applying changes to the Region, pass in List for gathering pendingCallbacks
rather than a null on the remote members to get the same behavior as transaction host.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0c13b4d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0c13b4d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0c13b4d6
Branch: refs/heads/feature/GEODE-77
Commit: 0c13b4d6ffbc73a2eb29202c24b27cf2b1f71611
Parents: 49d99d4
Author: Swapnil Bawaskar <sb...@pivotal.io>
Authored: Tue Aug 25 10:23:21 2015 -0700
Committer: Swapnil Bawaskar <sb...@pivotal.io>
Committed: Mon Aug 31 14:28:01 2015 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/TXCommitMessage.java | 34 +++++++---
.../cache/RemoteTransactionDUnitTest.java | 68 ++++++++++++++++++++
2 files changed, 94 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c13b4d6/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index f012bab..94aaadc 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -737,21 +737,39 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
public void basicProcessOps() {
{
+ List<EntryEventImpl> pendingCallbacks = new ArrayList<>();
Collections.sort(this.farSideEntryOps);
Iterator it = this.farSideEntryOps.iterator();
while (it.hasNext()) {
try {
RegionCommit.FarSideEntryOp entryOp = (RegionCommit.FarSideEntryOp)it.next();
- entryOp.process();
+ entryOp.process(pendingCallbacks);
} catch (CacheRuntimeException problem) {
processCacheRuntimeException(problem);
} catch (Exception e ) {
addProcessingException(e);
}
}
+ firePendingCallbacks(pendingCallbacks);
}
}
-
+
+ private void firePendingCallbacks(List<EntryEventImpl> callbacks) {
+ Iterator<EntryEventImpl> ci = callbacks.iterator();
+ while(ci.hasNext()) {
+ EntryEventImpl ee = ci.next();
+ if(ee.getOperation().isDestroy()) {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_DESTROY, ee, true);
+ } else if(ee.getOperation().isInvalidate()) {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_INVALIDATE, ee, true);
+ } else if(ee.getOperation().isCreate()) {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_CREATE, ee, true);
+ } else {
+ ee.getRegion().invokeTXCallbacks(EnumListenerEvent.AFTER_UPDATE, ee, true);
+ }
+ }
+ }
+
protected void processCacheRuntimeException(CacheRuntimeException problem) {
if (problem instanceof RegionDestroyedException) { // catch RegionDestroyedException
addProcessingException(problem);
@@ -1262,7 +1280,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
* Apply a single tx entry op on the far side
*/
@SuppressWarnings("synthetic-access")
- protected void txApplyEntryOp(FarSideEntryOp entryOp)
+ protected void txApplyEntryOp(FarSideEntryOp entryOp, List<EntryEventImpl> pendingCallbacks)
{
if (this.r == null) {
return;
@@ -1312,7 +1330,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
entryOp.op,
getEventId(entryOp),
entryOp.callbackArg,
- null /* fire inline, no pending callbacks */,
+ pendingCallbacks,
entryOp.filterRoutingInfo,
this.msg.bridgeContext,
false /* origin remote */,
@@ -1328,7 +1346,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
false /*localOp*/,
getEventId(entryOp),
entryOp.callbackArg,
- null /* fire inline, no pending callbacks */,
+ pendingCallbacks,
entryOp.filterRoutingInfo,
this.msg.bridgeContext,
null/*txEntryState*/,
@@ -1343,7 +1361,7 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
this.txEvent,
getEventId(entryOp),
entryOp.callbackArg,
- null /* fire inline, no pending callbacks */,
+ pendingCallbacks,
entryOp.filterRoutingInfo,
this.msg.bridgeContext,
null/*txEntryState*/,
@@ -1658,8 +1676,8 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
/**
* Performs this entryOp on the farside of a tx commit.
*/
- public void process() {
- txApplyEntryOp(this);
+ public void process(List<EntryEventImpl> pendingCallbacks) {
+ txApplyEntryOp(this, pendingCallbacks);
}
public void processAdjunctOnly() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0c13b4d6/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
index 5834622..ccff0c6 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/RemoteTransactionDUnitTest.java
@@ -4244,4 +4244,72 @@ protected static class ClientListener extends CacheListenerAdapter {
Object value = entry._getValue();
return value;
}
+
+ /**
+ * Install Listeners and verify that they are invoked after all tx events have been applied to the cache
+ * see GEODE-278
+ */
+ public void testNonInlineRemoteEvents() {
+ Host host = Host.getHost(0);
+ VM vm0 = host.getVM(0);
+ VM vm1 = host.getVM(1);
+ final String key1 = "nonInline-1";
+ final String key2 = "nonInline-2";
+
+ class NonInlineListener extends CacheListenerAdapter {
+ boolean assertException = false;
+
+ @Override
+ public void afterCreate(EntryEvent event) {
+ if (event.getKey().equals(key1)) {
+ if (getCache().getRegion(D_REFERENCE).get(key2) == null) {
+ assertException = true;
+ }
+ }
+ }
+ }
+
+ SerializableCallable createRegionWithListener = new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ createRegion(false, 0, null);
+ getCache().getRegion(D_REFERENCE).getAttributesMutator().addCacheListener(new NonInlineListener());
+ return null;
+ }
+ };
+
+ vm0.invoke(createRegionWithListener);
+ vm1.invoke(createRegionWithListener);
+
+ vm0.invoke(new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ Region region = getCache().getRegion(D_REFERENCE);
+ CacheTransactionManager mgr = getCache().getCacheTransactionManager();
+ mgr.begin();
+ region.put(key1, "nonInlineValue-1");
+ region.put(key2, "nonInlineValue-2");
+ mgr.commit();
+ return null;
+ }
+ });
+
+ SerializableCallable verifyAssert = new SerializableCallable() {
+ @Override
+ public Object call() throws Exception {
+ CacheListener[] listeners = getCache().getRegion(D_REFERENCE).getAttributes().getCacheListeners();
+ for (CacheListener listener : listeners) {
+ if (listener instanceof NonInlineListener) {
+ NonInlineListener l = (NonInlineListener) listener;
+ assertFalse(l.assertException);
+ }
+ }
+ return null;
+ }
+ };
+
+ vm0.invoke(verifyAssert);
+ vm1.invoke(verifyAssert);
+
+ }
}