You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/11/09 14:24:44 UTC

[06/16] incubator-tinkerpop git commit: moved logic to AbstractThreadLocalTransaction and added a couple of tests

moved logic to AbstractThreadLocalTransaction and added a couple of tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3938caed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3938caed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3938caed

Branch: refs/heads/master
Commit: 3938caed7fd0eb62453a2a2deb716e283c4b79a7
Parents: 35e97f0
Author: Dylan Millikin <dy...@brightzone.fr>
Authored: Sun Oct 25 13:45:35 2015 +0100
Committer: Dylan Millikin <dy...@brightzone.fr>
Committed: Fri Nov 6 11:18:11 2015 +0100

----------------------------------------------------------------------
 .../util/AbstractThreadLocalTransaction.java    | 38 +++++++-
 .../util/AbstractThreadedTransaction.java       | 29 +++++-
 .../structure/util/AbstractTransaction.java     | 50 +++++++----
 .../gremlin/structure/TransactionTest.java      | 94 ++++++++++++++++++++
 4 files changed, 189 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3938caed/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
index b47eb79..3109215 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.structure.Transaction;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.function.Consumer;
 
 /**
@@ -36,7 +37,20 @@ import java.util.function.Consumer;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public abstract class AbstractThreadLocalTransaction extends AbstractTransaction {
-
+protected static final ThreadLocal<Consumer<Transaction>> readWriteConsumerInternal = 
+        new ThreadLocal<Consumer<Transaction>>() {
+            @Override protected Consumer<Transaction> initialValue() {
+                return READ_WRITE_BEHAVIOR.AUTO;
+            }
+        };
+    
+    protected static final ThreadLocal<Consumer<Transaction>> closeConsumerInternal = 
+        new ThreadLocal<Consumer<Transaction>>() {
+            @Override protected Consumer<Transaction> initialValue() {
+                return CLOSE_BEHAVIOR.ROLLBACK;
+            }
+        };
+    
     protected final ThreadLocal<List<Consumer<Transaction.Status>>> transactionListeners = new ThreadLocal<List<Consumer<Transaction.Status>>>() {
         @Override
         protected List<Consumer<Transaction.Status>> initialValue() {
@@ -72,4 +86,26 @@ public abstract class AbstractThreadLocalTransaction extends AbstractTransaction
     public void clearTransactionListeners() {
         transactionListeners.get().clear();
     }
+    
+    @Override
+    public void doReadWrite() {
+        readWriteConsumerInternal.get().accept(this);
+    }
+    
+    @Override
+    public void doClose() {
+        closeConsumerInternal.get().accept(this);
+        closeConsumerInternal.remove();
+        readWriteConsumerInternal.remove();
+    }
+    
+    @Override
+    public void setReadWrite(final Consumer<Transaction> consumer) {
+        readWriteConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+    }
+    
+    @Override
+    public void setClose(final Consumer<Transaction> consumer) {
+        closeConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3938caed/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
index 57f8ec0..246734a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Transaction;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
 
@@ -38,8 +39,12 @@ import java.util.function.Consumer;
 public abstract class AbstractThreadedTransaction extends AbstractTransaction {
 
     protected final List<Consumer<Status>> transactionListeners = new CopyOnWriteArrayList<>();
-
-    public AbstractThreadedTransaction(final Graph g) {
+    
+    protected Consumer<Transaction> readWriteConsumerInternal = READ_WRITE_BEHAVIOR.AUTO;
+    
+    protected Consumer<Transaction> closeConsumerInternal = CLOSE_BEHAVIOR.ROLLBACK;
+    
+     public AbstractThreadedTransaction(final Graph g) {
         super(g);
     }
 
@@ -67,4 +72,24 @@ public abstract class AbstractThreadedTransaction extends AbstractTransaction {
     public void clearTransactionListeners() {
         transactionListeners.clear();
     }
+    
+    @Override
+    public void doReadWrite() {
+        readWriteConsumerInternal.accept(this);
+    }
+    
+    @Override
+    public void doClose() {
+        closeConsumerInternal.accept(this);
+    }
+    
+    @Override
+    public void setReadWrite(final Consumer<Transaction> consumer) {
+        readWriteConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull);
+    }
+    
+    @Override
+    public void setClose(final Consumer<Transaction> consumer) {
+        closeConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3938caed/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
index 0ad24a6..de7b6ee 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
@@ -35,19 +35,6 @@ import java.util.function.Function;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public abstract class AbstractTransaction implements Transaction {
-    protected static final ThreadLocal<Consumer<Transaction>> readWriteConsumer = 
-        new ThreadLocal<Consumer<Transaction>>() {
-            @Override protected Consumer<Transaction> initialValue() {
-                return READ_WRITE_BEHAVIOR.AUTO;
-            }
-        };
-    
-    protected static final ThreadLocal<Consumer<Transaction>> closeConsumer = 
-        new ThreadLocal<Consumer<Transaction>>() {
-            @Override protected Consumer<Transaction> initialValue() {
-                return CLOSE_BEHAVIOR.ROLLBACK;
-            }
-        };
     
     private Graph g;
 
@@ -86,6 +73,31 @@ public abstract class AbstractTransaction implements Transaction {
      * {@link #addTransactionListener(Consumer)}.
      */
     protected abstract void fireOnRollback();
+    
+    
+    /**
+     * Called {@link #readWrite}.  
+     * Implementers should run their readWrite consumer here.
+     */
+    protected abstract void doReadWrite();
+    
+    /**
+     * Called {@link #close}.  
+     * Implementers should run their readWrite consumer here.
+     */
+    protected abstract void doClose();
+    
+    /**
+     * Called {@link #onReadWrite}.  
+     * Implementers should set their readWrite consumer here.
+     */
+    protected abstract void setReadWrite(final Consumer<Transaction> consumer);
+    
+    /**
+     * Called {@link #onClose}.  
+     * Implementers should set their close consumer here.
+     */
+    protected abstract void setClose(final Consumer<Transaction> consumer);
 
     /**
      * {@inheritDoc}
@@ -103,7 +115,7 @@ public abstract class AbstractTransaction implements Transaction {
      */
     @Override
     public void commit() {
-        readWriteConsumer.get().accept(this);
+        readWrite();
         try {
             doCommit();
             fireOnCommit();
@@ -117,7 +129,7 @@ public abstract class AbstractTransaction implements Transaction {
      */
     @Override
     public void rollback() {
-        readWriteConsumer.get().accept(this);
+        readWrite();
         try {
             doRollback();
             fireOnRollback();
@@ -146,7 +158,7 @@ public abstract class AbstractTransaction implements Transaction {
      */
     @Override
     public void readWrite() {
-        readWriteConsumer.get().accept(this);
+        doReadWrite();
     }
 
     /**
@@ -154,7 +166,7 @@ public abstract class AbstractTransaction implements Transaction {
      */
     @Override
     public void close() {
-        closeConsumer.get().accept(this);
+        doClose();
     }
 
     /**
@@ -162,7 +174,7 @@ public abstract class AbstractTransaction implements Transaction {
      */
     @Override
     public synchronized Transaction onReadWrite(final Consumer<Transaction> consumer) {
-        readWriteConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+        setReadWrite(consumer);
         return this;
     }
 
@@ -171,7 +183,7 @@ public abstract class AbstractTransaction implements Transaction {
      */
     @Override
     public synchronized Transaction onClose(final Consumer<Transaction> consumer) {
-        closeConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onCloseBehaviorCannotBeNull));
+        setClose(consumer);
         return this;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3938caed/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
index 2395d9a..0157be2 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import static org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgePropertyFeatures;
 import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS;
@@ -1042,4 +1044,96 @@ public class TransactionTest extends AbstractGremlinTest {
 
         g.tx().rollback();
     }
+    
+    @Test
+    @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS)
+    public void shouldShareTransactionConsumersAccrossThreads() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(2);
+        final AtomicBoolean openOccured1 = new AtomicBoolean(false);
+        final AtomicBoolean openOccured2 = new AtomicBoolean(false);
+        
+        final Thread t1 = new Thread(() -> {
+            try {
+                g.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);
+            
+                latch.countDown();
+                latch.await();
+                g.tx().open();
+                openOccured1.set(true);
+            } catch (Exception ex) {
+                openOccured1.set(false);
+            }
+            
+        });
+        
+        final Thread t2 = new Thread(() -> {
+            try {
+                
+                latch.countDown();
+                latch.await();
+                g.tx().open();
+                openOccured2.set(true);
+            } catch (Exception ex) {
+                openOccured2.set(false);
+            }
+            
+        });
+        
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+        
+        assertTrue(
+                "Thread t1 transaction should have been set to MANUAL and capable of opening a transaction",
+                openOccured1.get()
+        );
+        assertTrue(
+                "Thread t2 transation should have been set to MANUAL and capable of opening a transaction",
+                openOccured2.get()
+        );
+    }
+    
+    @Test
+    @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS)
+    public void shouldNotShareTransactionConsumersAccrossThreads() throws InterruptedException {
+        final CountDownLatch latch = new CountDownLatch(2);
+        final AtomicBoolean openOccured1 = new AtomicBoolean(false);
+        final AtomicBoolean openOccured2 = new AtomicBoolean(false);
+        
+        final Thread t1 = new Thread(() -> {
+            try {
+                g.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);
+            
+                latch.countDown();
+                latch.await();
+                g.tx().open();
+                openOccured1.set(true);
+            } catch (Exception ex) {
+                openOccured1.set(false);
+            }
+            
+        });
+        
+        final Thread t2 = new Thread(() -> {
+            try {
+                
+                latch.countDown();
+                latch.await();
+                g.tx().open();
+                openOccured2.set(true);
+            } catch (Exception ex) {
+                openOccured2.set(false);
+            }
+            
+        });
+        
+        t1.start();
+        t2.start();
+        t1.join();
+        t2.join();
+        
+        assertTrue("Thread t1 transaction should have been set to MANUAL and capable of opening a transaction", openOccured1.get());
+        assertTrue("Thread t2 transaction should have been set to MANUAL and capable of opening a transaction", !openOccured2.get());
+    }
 }