You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/04/04 17:49:57 UTC
incubator-geode git commit: GEODE-1160 TransactionWriter is triggered
if updating entries with using PDX
Repository: incubator-geode
Updated Branches:
refs/heads/develop 16d09f659 -> 2deb31d95
GEODE-1160 TransactionWriter is triggered if updating entries with using PDX
This inhibits invocation of transaction writers and listners for operations
on internal cache Regions.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2deb31d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2deb31d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2deb31d9
Branch: refs/heads/develop
Commit: 2deb31d95f1f123bd1091f5f527ba0de1e175741
Parents: 16d09f6
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon Apr 4 08:47:39 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon Apr 4 08:49:23 2016 -0700
----------------------------------------------------------------------
.../gemfire/internal/cache/TXCommitMessage.java | 18 ++--
.../gemfire/internal/cache/TXEvent.java | 21 ++++
.../gemfire/internal/cache/TXRmtEvent.java | 28 +++++
.../gemfire/internal/cache/TXState.java | 12 ++-
.../gemfire/pdx/PdxSerializableDUnitTest.java | 106 +++++++++++++++++++
5 files changed, 174 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index 1a4d377..d1644c7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -699,7 +699,9 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
/*
* We need to make sure that we should fire a TX afterCommit event.
*/
- if(!disableListeners && (forceListener || (txEvent!=null && txEvent.getEvents().size()>0))) {
+ boolean internalEvent = (txEvent != null && txEvent.hasOnlyInternalEvents());
+ if (!disableListeners && !internalEvent
+ && (forceListener || (txEvent!=null && !txEvent.isEmpty()))) {
for (int i=0; i < tls.length; i++) {
try {
tls[i].afterCommit(txEvent);
@@ -2457,13 +2459,13 @@ private static final long serialVersionUID = 589384721273797822L;
}
- /**
- * Disable firing of TX Listeners. Currently on used on clients.
- * @param b disable the listeners
- */
- public void setDisableListeners(boolean b) {
- disableListeners = true;
- }
+ /**
+ * Disable firing of TX Listeners. Currently on used on clients.
+ * @param b disable the listeners
+ */
+ public void setDisableListeners(boolean b) {
+ disableListeners = true;
+ }
public Version getClientVersion() {
return clientVersion;
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
index e686cea..6bfd26f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
@@ -128,6 +128,27 @@ public class TXEvent implements TransactionEvent, Releasable {
return this.events;
}
+ /**
+ * Do all operations touch internal regions?
+ * Returns false if the transaction is empty
+ * or if any events touch non-internal regions.
+ */
+ public boolean hasOnlyInternalEvents() {
+ List<CacheEvent<?,?>> txevents = getEvents();
+ if (txevents == null || txevents.isEmpty()) {
+ return false;
+ }
+ for (CacheEvent<?,?> txevent: txevents) {
+ LocalRegion region = (LocalRegion)txevent.getRegion();
+ if (region != null
+ && !region.isPdxTypesRegion()
+ && !region.isInternalRegion()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
public final Cache getCache() {
return this.cache;
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
index b378c8e..c0493ac 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
@@ -83,6 +83,30 @@ public class TXRmtEvent implements TransactionEvent
}
}
}
+
+ /**
+ * Do all operations touch internal regions?
+ * Returns false if the transaction is empty
+ * or if any events touch non-internal regions.
+ */
+ public boolean hasOnlyInternalEvents() {
+ if (events == null || events.isEmpty()) {
+ return false;
+ }
+ Iterator<CacheEvent<?,?>> it = this.events.iterator();
+ while (it.hasNext()) {
+ CacheEvent<?,?> event = it.next();
+ if (isEventUserVisible(event)) {
+ LocalRegion region = (LocalRegion)event.getRegion();
+ if (region != null
+ && !region.isPdxTypesRegion()
+ && !region.isInternalRegion()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
public List getCreateEvents()
{
@@ -175,6 +199,10 @@ public class TXRmtEvent implements TransactionEvent
}
}
}
+
+ public boolean isEmpty() {
+ return (events == null) || events.isEmpty();
+ }
private EntryEventImpl createEvent(LocalRegion r, Operation op,
RegionEntry re, Object key, Object newValue,Object aCallbackArgument)
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index 9e8dd18..3bec397 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -388,7 +388,10 @@ public class TXState implements TXStateInterface {
if(!firedWriter && writer!=null) {
try {
firedWriter = true;
- writer.beforeCommit(getEvent());
+ TXEvent event = getEvent();
+ if (!event.hasOnlyInternalEvents()) {
+ writer.beforeCommit(event);
+ }
} catch(TransactionWriterException twe) {
cleanup();
throw new CommitConflictException(twe);
@@ -917,7 +920,10 @@ public class TXState implements TXStateInterface {
try {
// need to mark this so we don't fire again in commit
firedWriter = true;
- writer.beforeCommit(getEvent());
+ TXEvent event = getEvent();
+ if (!event.hasOnlyInternalEvents()) {
+ writer.beforeCommit(event);
+ }
} catch(TransactionWriterException twe) {
cleanup();
throw new CommitConflictException(twe);
@@ -1623,7 +1629,7 @@ public class TXState implements TXStateInterface {
public boolean isFireCallbacks() {
- return true;
+ return !getEvent().hasOnlyInternalEvents();
}
public boolean isOriginRemoteForEvents() {
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
index 1e901bc..3621016 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
@@ -16,13 +16,23 @@
*/
package com.gemstone.gemfire.pdx;
+import java.util.List;
+
import com.gemstone.gemfire.cache.AttributesFactory;
import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheEvent;
import com.gemstone.gemfire.cache.CacheFactory;
import com.gemstone.gemfire.cache.DataPolicy;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.TransactionEvent;
+import com.gemstone.gemfire.cache.TransactionListener;
+import com.gemstone.gemfire.cache.TransactionWriter;
+import com.gemstone.gemfire.cache.TransactionWriterException;
import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.TestTransactionListener;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.TXEvent;
import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
import com.gemstone.gemfire.test.dunit.AsyncInvocation;
import com.gemstone.gemfire.test.dunit.Host;
@@ -83,6 +93,48 @@ public class PdxSerializableDUnitTest extends CacheTestCase {
});
}
+ public void testTransactionCallbacksNotInvoked() {
+ Host host = Host.getHost(0);
+ VM vm1 = host.getVM(0);
+ VM vm2 = host.getVM(1);
+
+ SerializableCallable createRegionAndAddPoisonedListener = new SerializableCallable() {
+ public Object call() throws Exception {
+ AttributesFactory af = new AttributesFactory();
+ af.setScope(Scope.DISTRIBUTED_ACK);
+ af.setDataPolicy(DataPolicy.REPLICATE);
+ createRootRegion("testSimplePdx", af.create());
+ addPoisonedTransactionListeners();
+ return null;
+ }
+ };
+
+ vm1.invoke(createRegionAndAddPoisonedListener);
+ vm2.invoke(createRegionAndAddPoisonedListener);
+ vm1.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ //Check to make sure the type region is not yet created
+ Region r = getRootRegion("testSimplePdx");
+ Cache mycache = getCache();
+ mycache.getCacheTransactionManager().begin();
+ r.put(1, new SimpleClass(57, (byte) 3));
+ mycache.getCacheTransactionManager().commit();
+ //Ok, now the type registry should exist
+ assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
+ return null;
+ }
+ });
+ vm2.invoke(new SerializableCallable() {
+ public Object call() throws Exception {
+ //Ok, now the type registry should exist
+ assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
+ Region r = getRootRegion("testSimplePdx");
+ assertEquals(new SimpleClass(57, (byte) 3), r.get(1));
+ return null;
+ }
+ });
+ }
+
public void testPersistenceDefaultDiskStore() throws Throwable {
SerializableCallable createRegion = new SerializableCallable() {
@@ -187,4 +239,58 @@ public class PdxSerializableDUnitTest extends CacheTestCase {
vm3.invoke(checkForObject);
}
+
+ /**
+ * add a listener and writer that will throw an exception when invoked
+ * if events are for internal regions
+ */
+ public final void addPoisonedTransactionListeners() {
+ MyTestTransactionListener listener = new MyTestTransactionListener();
+ getCache().getCacheTransactionManager().addListener(listener);
+ getCache().getCacheTransactionManager().setWriter(listener);
+ }
+
+
+ static private class MyTestTransactionListener
+ implements TransactionWriter, TransactionListener {
+ private MyTestTransactionListener() {
+
+ }
+
+ private void checkEvent(TransactionEvent event) {
+ List<CacheEvent<?,?>> events = event.getEvents();
+ System.out.println("MyTestTransactionListener.checkEvent: events are " + events);
+ for (CacheEvent<?,?> cacheEvent: events) {
+ if (((LocalRegion)cacheEvent.getRegion()).isPdxTypesRegion()) {
+ throw new UnsupportedOperationException("found internal event: " + cacheEvent
+ + " region=" + cacheEvent.getRegion().getName());
+ }
+ }
+ }
+
+ @Override
+ public void beforeCommit(TransactionEvent event)
+ throws TransactionWriterException {
+ checkEvent(event);
+ }
+
+ @Override
+ public void close() {
+ }
+
+ @Override
+ public void afterCommit(TransactionEvent event) {
+ checkEvent(event);
+ }
+
+ @Override
+ public void afterFailedCommit(TransactionEvent event) {
+ checkEvent(event);
+ }
+
+ @Override
+ public void afterRollback(TransactionEvent event) {
+ checkEvent(event);
+ }
+ }
}