You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/11/07 15:33:31 UTC
[07/15] incubator-tinkerpop git commit: moved logic to
AbstractThreadLocalTransaction and added a couple of tests
moved logic to AbstractThreadLocalTransaction and added a couple of tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/46f821a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/46f821a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/46f821a8
Branch: refs/heads/TINKERPOP3-885
Commit: 46f821a846abc4fceb776f11c3b25888f01d60ef
Parents: 35e97f0
Author: Dylan Millikin <dy...@brightzone.fr>
Authored: Sun Oct 25 13:45:35 2015 +0100
Committer: Dylan Millikin <dy...@brightzone.fr>
Committed: Fri Nov 6 20:16:41 2015 +0100
----------------------------------------------------------------------
.../util/AbstractThreadLocalTransaction.java | 38 +++-
.../util/AbstractThreadedTransaction.java | 29 +++-
.../structure/util/AbstractTransaction.java | 50 ++++--
.../gremlin/structure/TransactionTest.java | 174 +++++++++++++++++++
4 files changed, 269 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/46f821a8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
index b47eb79..3109215 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadLocalTransaction.java
@@ -23,6 +23,7 @@ import org.apache.tinkerpop.gremlin.structure.Transaction;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Consumer;
/**
@@ -36,7 +37,20 @@ import java.util.function.Consumer;
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public abstract class AbstractThreadLocalTransaction extends AbstractTransaction {
-
+protected static final ThreadLocal<Consumer<Transaction>> readWriteConsumerInternal =
+ new ThreadLocal<Consumer<Transaction>>() {
+ @Override protected Consumer<Transaction> initialValue() {
+ return READ_WRITE_BEHAVIOR.AUTO;
+ }
+ };
+
+ protected static final ThreadLocal<Consumer<Transaction>> closeConsumerInternal =
+ new ThreadLocal<Consumer<Transaction>>() {
+ @Override protected Consumer<Transaction> initialValue() {
+ return CLOSE_BEHAVIOR.ROLLBACK;
+ }
+ };
+
protected final ThreadLocal<List<Consumer<Transaction.Status>>> transactionListeners = new ThreadLocal<List<Consumer<Transaction.Status>>>() {
@Override
protected List<Consumer<Transaction.Status>> initialValue() {
@@ -72,4 +86,26 @@ public abstract class AbstractThreadLocalTransaction extends AbstractTransaction
public void clearTransactionListeners() {
transactionListeners.get().clear();
}
+
+ @Override
+ public void doReadWrite() {
+ readWriteConsumerInternal.get().accept(this);
+ }
+
+ @Override
+ public void doClose() {
+ closeConsumerInternal.get().accept(this);
+ closeConsumerInternal.remove();
+ readWriteConsumerInternal.remove();
+ }
+
+ @Override
+ public void setReadWrite(final Consumer<Transaction> consumer) {
+ readWriteConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+ }
+
+ @Override
+ public void setClose(final Consumer<Transaction> consumer) {
+ closeConsumerInternal.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/46f821a8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
index 57f8ec0..246734a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractThreadedTransaction.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Transaction;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
@@ -38,8 +39,12 @@ import java.util.function.Consumer;
public abstract class AbstractThreadedTransaction extends AbstractTransaction {
protected final List<Consumer<Status>> transactionListeners = new CopyOnWriteArrayList<>();
-
- public AbstractThreadedTransaction(final Graph g) {
+
+ protected Consumer<Transaction> readWriteConsumerInternal = READ_WRITE_BEHAVIOR.AUTO;
+
+ protected Consumer<Transaction> closeConsumerInternal = CLOSE_BEHAVIOR.ROLLBACK;
+
+ public AbstractThreadedTransaction(final Graph g) {
super(g);
}
@@ -67,4 +72,24 @@ public abstract class AbstractThreadedTransaction extends AbstractTransaction {
public void clearTransactionListeners() {
transactionListeners.clear();
}
+
+ @Override
+ public void doReadWrite() {
+ readWriteConsumerInternal.accept(this);
+ }
+
+ @Override
+ public void doClose() {
+ closeConsumerInternal.accept(this);
+ }
+
+ @Override
+ public void setReadWrite(final Consumer<Transaction> consumer) {
+ readWriteConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull);
+ }
+
+ @Override
+ public void setClose(final Consumer<Transaction> consumer) {
+ closeConsumerInternal = Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/46f821a8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
index 0ad24a6..de7b6ee 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/AbstractTransaction.java
@@ -35,19 +35,6 @@ import java.util.function.Function;
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public abstract class AbstractTransaction implements Transaction {
- protected static final ThreadLocal<Consumer<Transaction>> readWriteConsumer =
- new ThreadLocal<Consumer<Transaction>>() {
- @Override protected Consumer<Transaction> initialValue() {
- return READ_WRITE_BEHAVIOR.AUTO;
- }
- };
-
- protected static final ThreadLocal<Consumer<Transaction>> closeConsumer =
- new ThreadLocal<Consumer<Transaction>>() {
- @Override protected Consumer<Transaction> initialValue() {
- return CLOSE_BEHAVIOR.ROLLBACK;
- }
- };
private Graph g;
@@ -86,6 +73,31 @@ public abstract class AbstractTransaction implements Transaction {
* {@link #addTransactionListener(Consumer)}.
*/
protected abstract void fireOnRollback();
+
+
+ /**
+ * Called {@link #readWrite}.
+ * Implementers should run their readWrite consumer here.
+ */
+ protected abstract void doReadWrite();
+
+ /**
+ * Called {@link #close}.
+ * Implementers should run their readWrite consumer here.
+ */
+ protected abstract void doClose();
+
+ /**
+ * Called {@link #onReadWrite}.
+ * Implementers should set their readWrite consumer here.
+ */
+ protected abstract void setReadWrite(final Consumer<Transaction> consumer);
+
+ /**
+ * Called {@link #onClose}.
+ * Implementers should set their close consumer here.
+ */
+ protected abstract void setClose(final Consumer<Transaction> consumer);
/**
* {@inheritDoc}
@@ -103,7 +115,7 @@ public abstract class AbstractTransaction implements Transaction {
*/
@Override
public void commit() {
- readWriteConsumer.get().accept(this);
+ readWrite();
try {
doCommit();
fireOnCommit();
@@ -117,7 +129,7 @@ public abstract class AbstractTransaction implements Transaction {
*/
@Override
public void rollback() {
- readWriteConsumer.get().accept(this);
+ readWrite();
try {
doRollback();
fireOnRollback();
@@ -146,7 +158,7 @@ public abstract class AbstractTransaction implements Transaction {
*/
@Override
public void readWrite() {
- readWriteConsumer.get().accept(this);
+ doReadWrite();
}
/**
@@ -154,7 +166,7 @@ public abstract class AbstractTransaction implements Transaction {
*/
@Override
public void close() {
- closeConsumer.get().accept(this);
+ doClose();
}
/**
@@ -162,7 +174,7 @@ public abstract class AbstractTransaction implements Transaction {
*/
@Override
public synchronized Transaction onReadWrite(final Consumer<Transaction> consumer) {
- readWriteConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onReadWriteBehaviorCannotBeNull));
+ setReadWrite(consumer);
return this;
}
@@ -171,7 +183,7 @@ public abstract class AbstractTransaction implements Transaction {
*/
@Override
public synchronized Transaction onClose(final Consumer<Transaction> consumer) {
- closeConsumer.set(Optional.ofNullable(consumer).orElseThrow(Transaction.Exceptions::onCloseBehaviorCannotBeNull));
+ setClose(consumer);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/46f821a8/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
index 2395d9a..eae224f 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/structure/TransactionTest.java
@@ -36,6 +36,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import static org.apache.tinkerpop.gremlin.structure.Graph.Features.EdgePropertyFeatures;
import static org.apache.tinkerpop.gremlin.structure.Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS;
@@ -1042,4 +1044,176 @@ public class TransactionTest extends AbstractGremlinTest {
g.tx().rollback();
}
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS)
+ public void shouldNotShareTransactionReadWriteConsumersAccrossThreads() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean commitFailed = new AtomicBoolean(false);
+ final AtomicBoolean commitOccured = new AtomicBoolean(false);
+
+ final Thread manualThread = new Thread(() -> {
+ graph.tx().onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+
+ try{
+ graph.tx().commit();
+ commitFailed.set(false);
+ } catch (Exception ex) {
+ commitFailed.set(true);
+ }
+ });
+
+ manualThread.start();
+
+ final Thread autoThread = new Thread(() -> {
+ latch.countDown();
+ try {
+ graph.tx().commit();
+ commitOccured.set(true);
+ } catch (Exception ex) {
+ commitOccured.set(false);
+ }
+ });
+
+ autoThread.start();
+
+ manualThread.join();
+ autoThread.join();
+
+ assertTrue(
+ "manualThread transaction readWrite should be MANUAL and should fail to commit the transaction",
+ commitFailed.get()
+ );
+ assertTrue(
+ "autoThread transaction readWrite should be AUTO and should commit the transaction",
+ commitOccured.get()
+ );
+ }
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS)
+ public void shouldNotShareTransactionCloseConsumersAccrossThreads() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final Thread manualThread = new Thread(() -> {
+ graph.tx().onClose(Transaction.CLOSE_BEHAVIOR.COMMIT);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ });
+
+ manualThread.start();
+
+ final Thread autoThread = new Thread(() -> {
+ latch.countDown();
+ graph.addVertex();
+ graph.tx().close();
+ });
+
+ autoThread.start();
+
+ manualThread.join();
+ autoThread.join();
+
+ assertEquals(
+ "Graph should be empty. autoThread transaction.onClose() should be ROLLBACK (default)",
+ 0,
+ IteratorUtils.count(graph.vertices())
+ );
+ }
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS)
+ public void shouldShareTransactionReadWriteConsumersAccrossThreads() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean commitFailed = new AtomicBoolean(false);
+ final AtomicBoolean commitFailedAgain = new AtomicBoolean(false);
+
+ final Thread manualThread = new Thread(() -> {
+ Transaction tx = graph.tx().createThreadedTx();
+ tx.onReadWrite(Transaction.READ_WRITE_BEHAVIOR.MANUAL);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+
+ try{
+ tx.commit();
+ commitFailed.set(false);
+ } catch (Exception ex) {
+ commitFailed.set(true);
+ }
+ });
+
+ manualThread.start();
+
+ final Thread autoThread = new Thread(() -> {
+ latch.countDown();
+ Transaction tx = graph.tx().createThreadedTx();
+ try {
+ tx.commit();
+ commitFailedAgain.set(false);
+ } catch (Exception ex) {
+ commitFailedAgain.set(true);
+ }
+ });
+
+ autoThread.start();
+
+ manualThread.join();
+ autoThread.join();
+
+ assertTrue(
+ "manualThread transaction readWrite should be MANUAL and should fail to commit the transaction",
+ commitFailed.get()
+ );
+ assertTrue(
+ "autoThread transaction readWrite should be AUTO and should commit the transaction",
+ commitFailedAgain.get()
+ );
+ }
+
+ @Test
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_THREADED_TRANSACTIONS)
+ public void shouldShareTransactionCloseConsumersAccrossThreads() throws InterruptedException {
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final Thread manualThread = new Thread(() -> {
+ Transaction tx = graph.tx().createThreadedTx();
+ tx.onClose(Transaction.CLOSE_BEHAVIOR.COMMIT);
+ try {
+ latch.await();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException(ie);
+ }
+ });
+
+ manualThread.start();
+
+ final Thread autoThread = new Thread(() -> {
+ latch.countDown();
+ Transaction tx = graph.tx().createThreadedTx();
+ graph.addVertex();
+ tx.close();
+ });
+
+ autoThread.start();
+
+ manualThread.join();
+ autoThread.join();
+
+ assertEquals(
+ "Graph should contain elements. autoThread.onClose() should be COMMIT.",
+ 1,
+ IteratorUtils.count(graph.vertices())
+ );
+ }
}