You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by bs...@apache.org on 2016/04/04 17:49:57 UTC

incubator-geode git commit: GEODE-1160 TransactionWriter is triggered if updating entries with using PDX

Repository: incubator-geode
Updated Branches:
  refs/heads/develop 16d09f659 -> 2deb31d95


GEODE-1160 TransactionWriter is triggered if updating entries with using PDX

This inhibits invocation of transaction writers and listners for operations
on internal cache Regions.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/2deb31d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/2deb31d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/2deb31d9

Branch: refs/heads/develop
Commit: 2deb31d95f1f123bd1091f5f527ba0de1e175741
Parents: 16d09f6
Author: Bruce Schuchardt <bs...@pivotal.io>
Authored: Mon Apr 4 08:47:39 2016 -0700
Committer: Bruce Schuchardt <bs...@pivotal.io>
Committed: Mon Apr 4 08:49:23 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/TXCommitMessage.java |  18 ++--
 .../gemfire/internal/cache/TXEvent.java         |  21 ++++
 .../gemfire/internal/cache/TXRmtEvent.java      |  28 +++++
 .../gemfire/internal/cache/TXState.java         |  12 ++-
 .../gemfire/pdx/PdxSerializableDUnitTest.java   | 106 +++++++++++++++++++
 5 files changed, 174 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
index 1a4d377..d1644c7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXCommitMessage.java
@@ -699,7 +699,9 @@ public class TXCommitMessage extends PooledDistributionMessage implements Member
       /*
        * We need to make sure that we should fire a TX afterCommit event.
        */
-      if(!disableListeners && (forceListener || (txEvent!=null && txEvent.getEvents().size()>0))) {
+      boolean internalEvent = (txEvent != null && txEvent.hasOnlyInternalEvents());
+      if (!disableListeners && !internalEvent
+          && (forceListener || (txEvent!=null && !txEvent.isEmpty()))) {
         for (int i=0; i < tls.length; i++) {
           try {
             tls[i].afterCommit(txEvent);
@@ -2457,13 +2459,13 @@ private static final long serialVersionUID = 589384721273797822L;
   }
   
   
-	/**
-	 * Disable firing of TX Listeners. Currently on used on clients.
-	 * @param b disable the listeners
-	 */
-	public void setDisableListeners(boolean b) {
-		disableListeners = true;
-	}
+  /**
+   * Disable firing of TX Listeners. Currently on used on clients.
+   * @param b disable the listeners
+   */
+  public void setDisableListeners(boolean b) {
+    disableListeners = true;
+  }
 
   public Version getClientVersion() {
     return clientVersion;

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
index e686cea..6bfd26f 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXEvent.java
@@ -128,6 +128,27 @@ public class TXEvent implements TransactionEvent, Releasable {
     return this.events;
   }
 
+  /**
+   * Do all operations touch internal regions?
+   * Returns false if the transaction is empty
+   * or if any events touch non-internal regions.
+   */
+  public boolean hasOnlyInternalEvents() {
+    List<CacheEvent<?,?>> txevents = getEvents();
+    if (txevents == null || txevents.isEmpty()) {
+      return false;
+    }
+    for (CacheEvent<?,?> txevent: txevents) {
+      LocalRegion region = (LocalRegion)txevent.getRegion();
+      if (region != null
+          && !region.isPdxTypesRegion()
+          && !region.isInternalRegion()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   public final Cache getCache() {
     return this.cache;
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
index b378c8e..c0493ac 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXRmtEvent.java
@@ -83,6 +83,30 @@ public class TXRmtEvent implements TransactionEvent
       }
     }
   }
+  
+  /**
+   * Do all operations touch internal regions?
+   * Returns false if the transaction is empty
+   * or if any events touch non-internal regions.
+   */
+  public boolean hasOnlyInternalEvents() {
+    if (events == null || events.isEmpty()) {
+      return false;
+    }
+    Iterator<CacheEvent<?,?>> it = this.events.iterator();
+    while (it.hasNext()) {
+      CacheEvent<?,?> event = it.next();
+      if (isEventUserVisible(event)) {
+        LocalRegion region = (LocalRegion)event.getRegion();
+        if (region != null
+            && !region.isPdxTypesRegion()
+            && !region.isInternalRegion()) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
 
   public List getCreateEvents()
   {
@@ -175,6 +199,10 @@ public class TXRmtEvent implements TransactionEvent
       }
     }
   }
+  
+  public boolean isEmpty() {
+    return (events == null) || events.isEmpty();
+  }
 
   private EntryEventImpl createEvent(LocalRegion r, Operation op,
       RegionEntry re, Object key, Object newValue,Object aCallbackArgument)

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
index 9e8dd18..3bec397 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TXState.java
@@ -388,7 +388,10 @@ public class TXState implements TXStateInterface {
       if(!firedWriter && writer!=null) {
         try {
           firedWriter = true;
-          writer.beforeCommit(getEvent());
+          TXEvent event = getEvent();
+          if (!event.hasOnlyInternalEvents()) {
+            writer.beforeCommit(event);
+          }
         } catch(TransactionWriterException twe) {
           cleanup();
           throw new CommitConflictException(twe);
@@ -917,7 +920,10 @@ public class TXState implements TXStateInterface {
         try {
           // need to mark this so we don't fire again in commit
           firedWriter  = true;
-          writer.beforeCommit(getEvent());
+          TXEvent event = getEvent();
+          if (!event.hasOnlyInternalEvents()) {
+            writer.beforeCommit(event);
+          }
         } catch(TransactionWriterException twe) {
           cleanup();
           throw new CommitConflictException(twe);
@@ -1623,7 +1629,7 @@ public class TXState implements TXStateInterface {
 
 
   public boolean isFireCallbacks() {
-    return true;
+    return !getEvent().hasOnlyInternalEvents();
   }
 
   public boolean isOriginRemoteForEvents() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/2deb31d9/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
index 1e901bc..3621016 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/pdx/PdxSerializableDUnitTest.java
@@ -16,13 +16,23 @@
  */
 package com.gemstone.gemfire.pdx;
 
+import java.util.List;
+
 import com.gemstone.gemfire.cache.AttributesFactory;
 import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheEvent;
 import com.gemstone.gemfire.cache.CacheFactory;
 import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.Region;
 import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.TransactionEvent;
+import com.gemstone.gemfire.cache.TransactionListener;
+import com.gemstone.gemfire.cache.TransactionWriter;
+import com.gemstone.gemfire.cache.TransactionWriterException;
 import com.gemstone.gemfire.cache30.CacheTestCase;
+import com.gemstone.gemfire.cache30.TestTransactionListener;
+import com.gemstone.gemfire.internal.cache.LocalRegion;
+import com.gemstone.gemfire.internal.cache.TXEvent;
 import com.gemstone.gemfire.pdx.internal.PeerTypeRegistration;
 import com.gemstone.gemfire.test.dunit.AsyncInvocation;
 import com.gemstone.gemfire.test.dunit.Host;
@@ -83,6 +93,48 @@ public class PdxSerializableDUnitTest extends CacheTestCase {
     });
   }
   
+  public void testTransactionCallbacksNotInvoked() {
+    Host host = Host.getHost(0);
+    VM vm1 = host.getVM(0);
+    VM vm2 = host.getVM(1);
+    
+    SerializableCallable createRegionAndAddPoisonedListener = new SerializableCallable() {
+      public Object call() throws Exception {
+        AttributesFactory af = new AttributesFactory();
+        af.setScope(Scope.DISTRIBUTED_ACK);
+        af.setDataPolicy(DataPolicy.REPLICATE);
+        createRootRegion("testSimplePdx", af.create());
+        addPoisonedTransactionListeners();
+        return null;
+      }
+    };
+
+    vm1.invoke(createRegionAndAddPoisonedListener);
+    vm2.invoke(createRegionAndAddPoisonedListener);
+    vm1.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+        //Check to make sure the type region is not yet created
+        Region r = getRootRegion("testSimplePdx");
+        Cache mycache = getCache();
+        mycache.getCacheTransactionManager().begin();
+        r.put(1, new SimpleClass(57, (byte) 3));
+        mycache.getCacheTransactionManager().commit();
+        //Ok, now the type registry should exist
+        assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
+        return null;
+      }
+    });
+    vm2.invoke(new SerializableCallable() {
+      public Object call() throws Exception {
+      //Ok, now the type registry should exist
+        assertNotNull(getRootRegion(PeerTypeRegistration.REGION_NAME));
+        Region r = getRootRegion("testSimplePdx");
+        assertEquals(new SimpleClass(57, (byte) 3), r.get(1));
+        return null;
+      }
+    });
+  }
+  
   public void testPersistenceDefaultDiskStore() throws Throwable {
     
     SerializableCallable createRegion = new SerializableCallable() {
@@ -187,4 +239,58 @@ public class PdxSerializableDUnitTest extends CacheTestCase {
     
     vm3.invoke(checkForObject);
   }
+
+  /**
+   * add a listener and writer that will throw an exception when invoked
+   * if events are for internal regions
+   */
+  public final void addPoisonedTransactionListeners() {
+    MyTestTransactionListener listener = new MyTestTransactionListener();
+    getCache().getCacheTransactionManager().addListener(listener);
+    getCache().getCacheTransactionManager().setWriter(listener);
+  }
+
+
+  static private class MyTestTransactionListener
+  implements TransactionWriter, TransactionListener {
+    private MyTestTransactionListener() {
+      
+    }
+    
+    private void checkEvent(TransactionEvent event) {
+      List<CacheEvent<?,?>> events = event.getEvents();
+      System.out.println("MyTestTransactionListener.checkEvent: events are " + events);
+      for (CacheEvent<?,?> cacheEvent: events) {
+        if (((LocalRegion)cacheEvent.getRegion()).isPdxTypesRegion()) {
+          throw new UnsupportedOperationException("found internal event: " + cacheEvent
+              + " region=" + cacheEvent.getRegion().getName());
+        }
+      }
+    }
+
+    @Override
+    public void beforeCommit(TransactionEvent event)
+        throws TransactionWriterException {
+      checkEvent(event);
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void afterCommit(TransactionEvent event) {
+      checkEvent(event);
+    }
+
+    @Override
+    public void afterFailedCommit(TransactionEvent event) {
+      checkEvent(event);
+    }
+
+    @Override
+    public void afterRollback(TransactionEvent event) {
+      checkEvent(event);
+    }
+  }
 }