You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/11/07 15:33:36 UTC

[12/15] incubator-tinkerpop git commit: TINKERPOP3-885 Adjustments to work from @PommeVerte from his PR at #113

TINKERPOP3-885 Adjustments to work from @PommeVerte from his PR at #113

Removed two tests that didn't really test threaded transactions properly.  Removed the static modifier on close/readWrite member variables which would cause a problem if the same thread was working with multiple graph instances.  Removed setReadWrite and setClose from AbstractTransaction as they were redundnant to just onReadWrite and onClose (they offered no functionality in and of themselves). Altered the nature of AbstractThreadedTransaction a bit as threaded transactions are really "manual" by virtue of their creation.  The transaction is opened when you createThreadedTransaction() and really shouldn't be re-used after close.  An new one should be created.  We don't have those semantics enforced now, but that's typically how this feature has been used in the past.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/a806c7ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/a806c7ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/a806c7ec

Branch: refs/heads/TINKERPOP3-885
Commit: a806c7ec0a4fc5baaa1d684a647d0e5084f9c4cb
Parents: 1aa1b5a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Sat Nov 7 09:08:12 2015 -0500
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Sat Nov 7 09:08:12 2015 -0500

----------------------------------------------------------------------
 .../util/AbstractThreadLocalTransaction.java    |  14 +--
 .../util/AbstractThreadedTransaction.java       |  47 ++++++---
 .../structure/util/AbstractTransaction.java     |  35 +------
 .../gremlin/structure/TransactionTest.java      | 102 ++-----------------
 4 files changed, 47 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a806c7ec/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
index 3109215..485e3f2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
@@ -37,14 +37,14 @@ import java.util.function.Consumer;
  * @author Stephen Mallette (http://stephen.genoprime.com)
  */
 public abstract class AbstractThreadLocalTransaction extends AbstractTransaction {
-protected static final ThreadLocal<Consumer<Transaction>> readWriteConsumerInternal = 
+    protected final ThreadLocal<Consumer<Transaction>> readWriteConsumerInternal =
         new ThreadLocal<Consumer<Transaction>>() {
             @Override protected Consumer<Transaction> initialValue() {
                 return READ_WRITE_BEHAVIOR.AUTO;
             }
         };
     
-    protected static final ThreadLocal<Consumer<Transaction>> closeConsumerInternal = 
+    protected final ThreadLocal<Consumer<Transaction>> closeConsumerInternal =
         new ThreadLocal<Consumer<Transaction>>() {
             @Override protected Consumer<Transaction> initialValue() {
                 return CLOSE_BEHAVIOR.ROLLBACK;
@@ -88,24 +88,26 @@ protected static final ThreadLocal<Consumer<Transaction>> readWriteConsumerInter
     }
     
     @Override
-    public void doReadWrite() {
+    protected void doReadWrite() {
         readWriteConsumerInternal.get().accept(this);
     }
     
     @Override
-    public void doClose() {
+    protected void doClose() {
         closeConsumerInternal.get().accept(this);
         closeConsumerInternal.remove();
         readWriteConsumerInternal.remove();
     }
     
     @Override
-    public void setReadWrite(final Consumer<Transaction> consumer) {
+    public Transaction onReadWrite(final Consumer<Transaction> consumer) {
         readWriteConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+        return this;
     }
     
     @Override
-    public void setClose(final Consumer<Transaction> consumer) {
+    public Transaction onClose(final Consumer<Transaction> consumer) {
         closeConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+        return this;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a806c7ec/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
index 246734a..2fdc7df 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 /**
  * A base implementation of {@link Transaction} that provides core functionality for transaction listeners using a
@@ -40,10 +41,6 @@ public abstract class AbstractThreadedTransaction extends AbstractTransaction {
 
     protected final List<Consumer<Status>> transactionListeners = new CopyOnWriteArrayList<>();
     
-    protected Consumer<Transaction> readWriteConsumerInternal = READ_WRITE_BEHAVIOR.AUTO;
-    
-    protected Consumer<Transaction> closeConsumerInternal = CLOSE_BEHAVIOR.ROLLBACK;
-    
      public AbstractThreadedTransaction(final Graph g) {
         super(g);
     }
@@ -72,24 +69,42 @@ public abstract class AbstractThreadedTransaction extends AbstractTransaction {
     public void clearTransactionListeners() {
         transactionListeners.clear();
     }
-    
+
+    /**
+     * Most implementations should do nothing with this as the tx is already open on creation.
+     */
     @Override
-    public void doReadWrite() {
-        readWriteConsumerInternal.accept(this);
+    protected void doReadWrite() {
+        // do nothing
     }
-    
+
+    /**
+     * Clears transaction listeners
+     */
     @Override
-    public void doClose() {
-        closeConsumerInternal.accept(this);
+    protected void doClose() {
+        clearTransactionListeners();
     }
-    
+
+    /**
+     * The nature of threaded transactions are such that they are always open when created and manual in nature,
+     * therefore setting this value is not required.
+     *
+     * @throws UnsupportedOperationException
+     */
     @Override
-    public void setReadWrite(final Consumer<Transaction> consumer) {
-        readWriteConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull);
+    public synchronized Transaction onReadWrite(final Consumer<Transaction> consumer) {
+        throw new UnsupportedOperationException("Threaded transactions are open when created and in manual mode");
     }
-    
+
+    /**
+     * The nature of threaded transactions are such that they are always open when created and manual in nature,
+     * therefore setting this value is not required.
+     *
+     * @throws UnsupportedOperationException
+     */
     @Override
-    public void setClose(final Consumer<Transaction> consumer) {
-        closeConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull);
+    public synchronized Transaction onClose(final Consumer<Transaction> consumer) {
+        throw new UnsupportedOperationException("Threaded transactions are open when created and in manual mode");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a806c7ec/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
index 54e7d9c..156f754 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.structure.util;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Transaction;
 
-import java.util.Optional;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
@@ -48,13 +47,13 @@ public abstract class AbstractTransaction implements Transaction {
     protected abstract void doOpen();
 
     /**
-     * Called with {@link #commit} after the {@link #readWriteConsumer} has been notified.  Implementers should
+     * Called with {@link #commit} after the {@link #onReadWrite(Consumer)} has been notified.  Implementers should
      * include their commit logic here.
      */
     protected abstract void doCommit() throws TransactionException;
 
     /**
-     * Called with {@link #rollback} after the {@link #readWriteConsumer} has been notified.  Implementers should
+     * Called with {@link #rollback} after the {@link #onReadWrite(Consumer)} has been notified.  Implementers should
      * include their rollback logic here.
      */
     protected abstract void doRollback() throws TransactionException;
@@ -84,18 +83,6 @@ public abstract class AbstractTransaction implements Transaction {
      * Implementers should run their readWrite consumer here.
      */
     protected abstract void doClose();
-    
-    /**
-     * Called {@link #onReadWrite}.  
-     * Implementers should set their readWrite consumer here.
-     */
-    protected abstract void setReadWrite(final Consumer<Transaction> consumer);
-    
-    /**
-     * Called {@link #onClose}.  
-     * Implementers should set their close consumer here.
-     */
-    protected abstract void setClose(final Consumer<Transaction> consumer);
 
     /**
      * {@inheritDoc}
@@ -168,24 +155,6 @@ public abstract class AbstractTransaction implements Transaction {
     }
 
     /**
-     * {@inheritDoc}
-     */
-    @Override
-    public synchronized Transaction onReadWrite(final Consumer<Transaction> consumer) {
-        setReadWrite(consumer);
-        return this;
-    }
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public synchronized Transaction onClose(final Consumer<Transaction> consumer) {
-        setClose(consumer);
-        return this;
-    }
-
-    /**
      * An "internal" exception thrown by vendors when calls to {@link AbstractTransaction#doCommit} or
      * {@link AbstractTransaction#doRollback} fail.
      */

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/a806c7ec/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
index eae224f..8d23fce 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
@@ -36,8 +36,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
-import java.util.logging.Level;
-import java.util.logging.Logger;
 
 import static org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgePropertyFeatures;
 import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS;
@@ -1047,10 +1045,10 @@ public class TransactionTest extends AbstractGremlinTest {
     
     @Test
     @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS)
-    public void shouldNotShareTransactionReadWriteConsumersAccrossThreads() throws InterruptedException {
+    public void shouldNotShareTransactionReadWriteConsumersAcrossThreads() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicBoolean commitFailed = new AtomicBoolean(false);
-        final AtomicBoolean commitOccured = new AtomicBoolean(false);
+        final AtomicBoolean commitOccurred = new AtomicBoolean(false);
         
         final Thread manualThread = new Thread(() -> {
             graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);
@@ -1074,9 +1072,9 @@ public class TransactionTest extends AbstractGremlinTest {
             latch.countDown();
             try {
                 graph.tx().commit();
-                commitOccured.set(true);
+                commitOccurred.set(true);
             } catch (Exception ex) {
-                commitOccured.set(false);
+                commitOccurred.set(false);
             }
         });
         
@@ -1091,13 +1089,13 @@ public class TransactionTest extends AbstractGremlinTest {
         );
         assertTrue(
                 "autoThread transaction readWrite should be AUTO and should commit the transaction",
-                commitOccured.get()
+                commitOccurred.get()
         );
     }
     
     @Test
     @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS)
-    public void shouldNotShareTransactionCloseConsumersAccrossThreads() throws InterruptedException {
+    public void shouldNotShareTransactionCloseConsumersAcrossThreads() throws InterruptedException {
         final CountDownLatch latch = new CountDownLatch(1);
         
         final Thread manualThread = new Thread(() -> {
@@ -1128,92 +1126,4 @@ public class TransactionTest extends AbstractGremlinTest {
                 IteratorUtils.count(graph.vertices())
         );
     }
-    
-    @Test
-    @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS)
-    public void shouldShareTransactionReadWriteConsumersAccrossThreads() throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        final AtomicBoolean commitFailed = new AtomicBoolean(false);
-        final AtomicBoolean commitFailedAgain = new AtomicBoolean(false);
-        
-        final Thread manualThread = new Thread(() -> {
-            Transaction tx = graph.tx().createThreadedTx();
-            tx.onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);
-            try {
-                latch.await();
-            } catch (InterruptedException ie) {
-                throw new RuntimeException(ie);
-            }
-            
-            try{
-                tx.commit();
-                commitFailed.set(false);
-            } catch (Exception ex) {
-                commitFailed.set(true);
-            }
-        });
-        
-        manualThread.start();
-        
-        final Thread autoThread = new Thread(() -> {
-            latch.countDown();
-            Transaction tx = graph.tx().createThreadedTx();
-            try {
-                tx.commit();
-                commitFailedAgain.set(false);
-            } catch (Exception ex) {
-                commitFailedAgain.set(true);
-            }
-        });
-        
-        autoThread.start();
-        
-        manualThread.join();
-        autoThread.join();
-        
-        assertTrue(
-                "manualThread transaction readWrite should be MANUAL and should fail to commit the transaction",
-                commitFailed.get()
-        );
-        assertTrue(
-                "autoThread transaction readWrite should be AUTO and should commit the transaction",
-                commitFailedAgain.get()
-        );
-    }
-    
-    @Test
-    @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS)
-    public void shouldShareTransactionCloseConsumersAccrossThreads() throws InterruptedException {
-        final CountDownLatch latch = new CountDownLatch(1);
-        
-        final Thread manualThread = new Thread(() -> {
-            Transaction tx = graph.tx().createThreadedTx();
-            tx.onClose(Transaction.CLOSE_BEHAVIOR.COMMIT);
-            try {
-                latch.await();
-            } catch (InterruptedException ie) {
-                throw new RuntimeException(ie);
-            }
-        });
-        
-        manualThread.start();
-        
-        final Thread autoThread = new Thread(() -> {
-            latch.countDown();
-            Transaction tx = graph.tx().createThreadedTx();
-            graph.addVertex();
-            tx.close();
-        });
-        
-        autoThread.start();
-        
-        manualThread.join();
-        autoThread.join();
-        
-        assertEquals(
-                "Graph should contain elements. autoThread.onClose() should be COMMIT.",
-                1,
-                IteratorUtils.count(graph.vertices())
-        );
-    }
 }