You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/06/03 18:26:57 UTC
[5/7] incubator-tinkerpop git commit: Refactor EventQueue into an
interface so that it can be supplied to the EventStrategy.
Refactor EventQueue into an interface so that it can be supplied to the EventStrategy.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/8c5408ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/8c5408ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/8c5408ca
Branch: refs/heads/master
Commit: 8c5408ca2167eeda63e1983370b62b5e7a11d15d
Parents: d1a7b9a
Author: Stephen Mallette <sp...@genoprime.com>
Authored: Wed Jun 3 10:28:28 2015 -0400
Committer: Stephen Mallette <sp...@genoprime.com>
Committed: Wed Jun 3 12:25:49 2015 -0400
----------------------------------------------------------------------
.../strategy/decoration/EventStrategy.java | 108 ++++++----
.../decoration/EventStrategyProcessTest.java | 198 ++++++++++++++++---
2 files changed, 235 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8c5408ca/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategy.java
index a881260..530e1aa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategy.java
@@ -32,7 +32,7 @@ import org.apache.tinkerpop.gremlin.structure.Transaction;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.Deque;
import java.util.List;
@@ -46,20 +46,16 @@ import java.util.List;
* @author Stephen Mallette (http://stephen.genoprime.com)
*/
public final class EventStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> implements TraversalStrategy.DecorationStrategy {
- private final List<MutationListener> listeners = new ArrayList<>();
+ private final EventQueue eventQueue;
- private EventStrategy(final MutationListener... listeners) {
- this.listeners.addAll(Arrays.asList(listeners));
+ private EventStrategy(final Builder builder) {
+ this.eventQueue = builder.eventQueue;
+ this.eventQueue.setListeners(builder.listeners);
}
@Override
public void apply(final Traversal.Admin<?, ?> traversal) {
- // EventStrategy requires access to both graph features and the current transaction object which are
- // both part of Graph - if that isn't present, this strategy doesn't work.
- if (!traversal.getGraph().isPresent())
- throw new IllegalStateException(String.format("%s requires a graph instance is present on the traversal", EventStrategy.class.getName()));
-
- final EventStrategyCallback callback = new EventStrategyCallback(new EventQueue(traversal.getGraph().get()));
+ final EventStrategyCallback callback = new EventStrategyCallback(eventQueue);
TraversalHelper.getStepsOfAssignableClass(Mutating.class, traversal).forEach(s -> s.getMutatingCallbackRegistry().addCallback(callback));
}
@@ -68,20 +64,21 @@ public final class EventStrategy extends AbstractTraversalStrategy<TraversalStra
}
public class EventStrategyCallback implements EventCallback<Event>, Serializable {
- private final EventQueue trigger;
+ private final EventQueue eventQueue;
- public EventStrategyCallback(final EventQueue trigger) {
- this.trigger = trigger;
+ public EventStrategyCallback(final EventQueue eventQueue) {
+ this.eventQueue = eventQueue;
}
@Override
public void accept(final Event event) {
- trigger.addEvent(event);
+ eventQueue.addEvent(event);
}
}
public final static class Builder {
private final List<MutationListener> listeners = new ArrayList<>();
+ private EventQueue eventQueue = new DefaultEventQueue();
Builder() {}
@@ -90,43 +87,70 @@ public final class EventStrategy extends AbstractTraversalStrategy<TraversalStra
return this;
}
+ public Builder eventQueue(final EventQueue eventQueue) {
+ this.eventQueue = eventQueue;
+ return this;
+ }
+
public EventStrategy create() {
- return new EventStrategy(this.listeners.toArray(new MutationListener[this.listeners.size()]));
+ return new EventStrategy(this);
}
}
- class EventQueue {
+ public interface EventQueue {
+ /**
+ * Provide listeners to the queue that were given to the {@link EventStrategy} on construction.
+ */
+ public void setListeners(final List<MutationListener> listeners);
/**
- * A queue of events that are triggered by change to the graph. The queue builds up until the trigger fires them
- * in the order they were received.
+ * Add an event to the event queue.
*/
+ public void addEvent(final Event evt);
+ }
+
+ public static class DefaultEventQueue implements EventQueue {
+ private List<MutationListener> listeners = Collections.emptyList();
+
+ @Override
+ public void setListeners(final List<MutationListener> listeners) {
+ this.listeners = listeners;
+ }
+
+ @Override
+ public void addEvent(final Event evt) {
+ evt.fireEvent(listeners.iterator());
+ }
+ }
+
+ /**
+ * Stores events in a queue that builds up until the transaction is committed which then fires them in the order
+ * they were received.
+ */
+ public static class TransactionalEventQueue implements EventQueue {
+
private final ThreadLocal<Deque<Event>> eventQueue = new ThreadLocal<Deque<Event>>() {
protected Deque<Event> initialValue() {
return new ArrayDeque<>();
}
};
- /**
- * When set to true, events in the event queue will only be fired when a transaction is committed.
- */
- private final boolean enqueueEvents;
-
- public EventQueue(final Graph graph) {
- enqueueEvents = graph.features().graph().supportsTransactions();
-
- if (enqueueEvents) {
- // since this is a transactional graph events are enqueued so the events should be fired/reset only after
- // transaction is committed/rolled back as tied to a graph transaction
- graph.tx().addTransactionListener(status -> {
- if (status == Transaction.Status.COMMIT)
- fireEventQueue();
- else if (status == Transaction.Status.ROLLBACK)
- resetEventQueue();
- else
- throw new RuntimeException(String.format("The %s is not aware of this status: %s", EventQueue.class.getName(), status));
- });
- }
+ private List<MutationListener> listeners = Collections.emptyList();
+
+ public TransactionalEventQueue(final Graph graph) {
+ if (!graph.features().graph().supportsTransactions())
+ throw new IllegalStateException(String.format("%s requires the graph to support transactions", EventStrategy.class.getName()));
+
+ // since this is a transactional graph events are enqueued so the events should be fired/reset only after
+ // transaction is committed/rolled back as tied to a graph transaction
+ graph.tx().addTransactionListener(status -> {
+ if (status == Transaction.Status.COMMIT)
+ fireEventQueue();
+ else if (status == Transaction.Status.ROLLBACK)
+ resetEventQueue();
+ else
+ throw new RuntimeException(String.format("The %s is not aware of this status: %s", EventQueue.class.getName(), status));
+ });
}
/**
@@ -135,11 +159,11 @@ public final class EventStrategy extends AbstractTraversalStrategy<TraversalStra
*/
public void addEvent(final Event evt) {
eventQueue.get().add(evt);
+ }
- if (!this.enqueueEvents) {
- fireEventQueue();
- resetEventQueue();
- }
+ @Override
+ public void setListeners(final List<MutationListener> listeners) {
+ this.listeners = listeners;
}
private void resetEventQueue() {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/8c5408ca/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategyProcessTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategyProcessTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategyProcessTest.java
index e07b38d..c8adb78 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategyProcessTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/decoration/EventStrategyProcessTest.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration;
import org.apache.tinkerpop.gremlin.FeatureRequirement;
import org.apache.tinkerpop.gremlin.FeatureRequirementSet;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.event.MutationListener;
import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -37,6 +36,8 @@ import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
/**
@@ -49,9 +50,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddVertex() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
graph.addVertex("some", "thing");
final GraphTraversalSource gts = create(eventStrategy);
@@ -67,9 +73,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddVertexFromStart() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
graph.addVertex("some", "thing");
final GraphTraversalSource gts = create(eventStrategy);
@@ -85,9 +96,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddEdge() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex v = graph.addVertex();
v.addEdge("self", v);
@@ -109,9 +125,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddEdgeByPath() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex v = graph.addVertex();
v.addEdge("self", v);
@@ -133,9 +154,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddVertexPropertyAdded() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex vSome = graph.addVertex("some", "thing");
vSome.property(VertexProperty.Cardinality.single, "that", "thing");
@@ -155,9 +181,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddVertexPropertyChanged() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex vSome = graph.addVertex("some", "thing");
vSome.property(VertexProperty.Cardinality.single, "that", "thing");
@@ -179,9 +210,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddVertexPropertyPropertyChanged() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex vSome = graph.addVertex("some", "thing");
vSome.property(VertexProperty.Cardinality.single, "that", "thing", "is", "good");
@@ -202,9 +238,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddEdgePropertyAdded() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex v = graph.addVertex();
v.addEdge("self", v);
@@ -230,9 +271,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerEdgePropertyChanged() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex v = graph.addVertex();
final Edge e = v.addEdge("self", v);
@@ -258,9 +304,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerRemoveVertex() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
graph.addVertex("some", "thing");
final GraphTraversalSource gts = create(eventStrategy);
@@ -277,9 +328,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerRemoveEdge() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex v = graph.addVertex("some", "thing");
v.addEdge("self", v);
@@ -297,9 +353,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerRemoveVertexProperty() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
graph.addVertex("some", "thing");
final GraphTraversalSource gts = create(eventStrategy);
@@ -316,9 +377,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerRemoveEdgeProperty() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex v = graph.addVertex();
v.addEdge("self", v, "some", "thing");
@@ -337,9 +403,14 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
public void shouldTriggerAddVertexPropertyPropertyRemoved() {
final StubMutationListener listener1 = new StubMutationListener();
final StubMutationListener listener2 = new StubMutationListener();
- final EventStrategy eventStrategy = EventStrategy.build()
+ final EventStrategy.Builder builder = EventStrategy.build()
.addListener(listener1)
- .addListener(listener2).create();
+ .addListener(listener2);
+
+ if (graph.features().graph().supportsTransactions())
+ builder.eventQueue(new EventStrategy.TransactionalEventQueue(graph));
+
+ final EventStrategy eventStrategy = builder.create();
final Vertex vSome = graph.addVertex("some", "thing");
vSome.property(VertexProperty.Cardinality.single, "that", "thing", "is", "good");
@@ -358,6 +429,75 @@ public class EventStrategyProcessTest extends AbstractGremlinProcessTest {
assertEquals(1, listener1.vertexPropertyPropertyRemovedEventRecorded());
}
+ @Test
+ @FeatureRequirementSet(FeatureRequirementSet.Package.VERTICES_ONLY)
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS)
+ public void shouldTriggerAfterCommit() {
+ final StubMutationListener listener1 = new StubMutationListener();
+ final StubMutationListener listener2 = new StubMutationListener();
+ final EventStrategy.Builder builder = EventStrategy.build()
+ .eventQueue(new EventStrategy.TransactionalEventQueue(graph))
+ .addListener(listener1)
+ .addListener(listener2);
+
+ final EventStrategy eventStrategy = builder.create();
+
+ graph.addVertex("some", "thing");
+ final GraphTraversalSource gts = create(eventStrategy);
+ gts.V().addV("any", "thing").next();
+ gts.V().addV("any", "one").next();
+
+ assertEquals(0, listener1.addVertexEventRecorded());
+ assertEquals(0, listener2.addVertexEventRecorded());
+
+ gts.tx().commit();
+ assertEquals(2, IteratorUtils.count(gts.V().has("any")));
+
+ assertEquals(2, listener1.addVertexEventRecorded());
+ assertEquals(2, listener2.addVertexEventRecorded());
+ }
+
+ @Test
+ @FeatureRequirementSet(FeatureRequirementSet.Package.VERTICES_ONLY)
+ @FeatureRequirement(featureClass = Graph.Features.GraphFeatures.class, feature = Graph.Features.GraphFeatures.FEATURE_TRANSACTIONS)
+ public void shouldResetAfterRollback() {
+ final StubMutationListener listener1 = new StubMutationListener();
+ final StubMutationListener listener2 = new StubMutationListener();
+ final EventStrategy.Builder builder = EventStrategy.build()
+ .eventQueue(new EventStrategy.TransactionalEventQueue(graph))
+ .addListener(listener1)
+ .addListener(listener2);
+
+ final EventStrategy eventStrategy = builder.create();
+
+ graph.addVertex("some", "thing");
+ final GraphTraversalSource gts = create(eventStrategy);
+ gts.V().addV("any", "thing").next();
+ gts.V().addV("any", "one").next();
+
+ assertEquals(0, listener1.addVertexEventRecorded());
+ assertEquals(0, listener2.addVertexEventRecorded());
+
+ gts.tx().rollback();
+ assertThat(gts.V().has("any").hasNext(), is(false));
+
+ assertEquals(0, listener1.addVertexEventRecorded());
+ assertEquals(0, listener2.addVertexEventRecorded());
+
+ graph.addVertex("some", "thing");
+ gts.V().addV("any", "thing").next();
+ gts.V().addV("any", "one").next();
+
+ assertEquals(0, listener1.addVertexEventRecorded());
+ assertEquals(0, listener2.addVertexEventRecorded());
+
+ gts.tx().commit();
+ assertEquals(2, IteratorUtils.count(gts.V().has("any")));
+
+ assertEquals(2, listener1.addVertexEventRecorded());
+ assertEquals(2, listener2.addVertexEventRecorded());
+ }
+
private GraphTraversalSource create(final EventStrategy strategy) {
return graphProvider.traversal(graph, strategy);
}