You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/24 23:00:12 UTC
tinkerpop git commit: being a bit smarter about DedupGlobalStep. If
you are at a worker, simply filter. This may (not always) limit data flow.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 b87022bf5 -> b81dbc3b7
being a bit smarter about DedupGlobalStep. If you are at a worker, simply filter. This may (not always) limit data flow.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b81dbc3b
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b81dbc3b
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b81dbc3b
Branch: refs/heads/TINKERPOP-1564
Commit: b81dbc3b7ba0bada9203aa03e307cd093e758de4
Parents: b87022b
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 24 16:00:09 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 24 16:00:09 2017 -0700
----------------------------------------------------------------------
.../gremlin/akka/process/actors/AkkaGraphActors.java | 6 +++---
.../gremlin/akka/process/actors/AkkaPlayTest.java | 10 ++++++----
.../process/traversal/step/filter/DedupGlobalStep.java | 6 +++---
.../process/actors/TestSetupTerminateActorProgram.java | 9 ++++-----
4 files changed, 16 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 26f27c1..125cae4 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -95,9 +95,9 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getMasterActorDeployment(finalConfiguration)))), "master");
return FutureConverters.<ActorsResult<R>>toJava((scala.concurrent.Future) Patterns.ask(master, new DefaultActorsResult<>(), 10000000)).
- thenApply(x -> {
- ((ActorsResult) x).setRuntime(System.currentTimeMillis() - startTime);
- return x;
+ thenApply(actorsResult -> {
+ ((ActorsResult) actorsResult).setRuntime(System.currentTimeMillis() - startTime);
+ return actorsResult;
}).toCompletableFuture();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
index 32b543c..ffa9f5b 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
@@ -31,6 +31,8 @@ import org.junit.Test;
import java.util.Map;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.dedup;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
@@ -45,19 +47,19 @@ public class AkkaPlayTest {
configuration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
final Graph graph = TinkerGraph.open(configuration);
//graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
- GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(15));
+ GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3));
// System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
// [{v[1]=6, v[2]=2, v[3]=6, v[4]=6, v[5]=2, v[6]=2}]
System.out.println(g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").cap("a").toList());
System.out.println(g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").cap("a").toList());
for (int i = 0; i < 1000; i++) {
- Map<Object, Long> map = g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").<Map>cap("a").next();
+ /*Map<Object, Long> map = g.V().both().groupCount("a").out().cap("a").select(Column.keys).unfold().both().groupCount("a").<Map>cap("a").next();
if (24L != map.values().stream().reduce((a, b) -> a + b).get()) {
System.out.println(i + " -- " + map);
assert false;
- }
- //assert 0L == g.V().repeat(dedup()).times(2).count().next();
+ }*/
+ assert 0L == g.V().repeat(dedup()).times(2).count().next();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index 8ccccfd..7724dae 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -59,7 +59,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
private Set<String> keepLabels;
private Map<Object, Traverser.Admin<S>> barrier;
private Iterator<Map.Entry<Object, Traverser.Admin<S>>> barrierIterator;
- private boolean atWorker = true;
+ private boolean atWorker = false;
private boolean pushBased = false;
public DedupGlobalStep(final Traversal.Admin traversal, final String... dedupLabels) {
@@ -69,7 +69,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
@Override
protected boolean filter(final Traverser.Admin<S> traverser) {
- if (this.pushBased && this.atWorker) return true;
+ if (this.pushBased && this.atWorker) return false;
traverser.setBulk(1);
if (null == this.dedupLabels) {
return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal));
@@ -186,7 +186,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
} else {
object = TraversalUtil.applyNullable(traverser, this.dedupTraversal);
}
- if (this.duplicateSet.add(object)) {
+ if (this.duplicateSet.add(object) && !map.containsKey(object)) {
traverser.setBulk(1L);
// traverser.detach();
traverser.set(DetachedFactory.detach(traverser.get(), true)); // TODO: detect required detachment accordingly
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b81dbc3b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
index 58d2d50..cffc61a 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
@@ -35,7 +35,6 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> {
private static final String WORKER_SETUP = "workerSetup";
private static final String WORKER_TERMINATE = "workerTerminate";
private static final String MASTER_SETUP = "masterSetup";
- private static final String MASTER_TERMINATE = "masterTerminate";
@Override
public Worker createWorkerProgram(final Actor.Worker worker) {
@@ -53,7 +52,7 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> {
@Override
public void execute(final String message) {
- fail("The worker should not have received any messages");
+ fail("The worker should not have received any messages: " + message);
}
@Override
@@ -65,7 +64,7 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> {
}
@Override
- public Master createMasterProgram(Actor.Master master) {
+ public Master createMasterProgram(final Actor.Master master) {
return new Master<String>() {
private int masterSetup = 0;
private int masterTerminate = 0;
@@ -108,12 +107,12 @@ class TestSetupTerminateActorProgram implements ActorProgram<List<Integer>> {
assertEquals(this.workerSetup, this.workerTerminate);
assertEquals(this.workerSetup, master.workers().size());
assertEquals(1, this.masterSetup);
- assertEquals(0, this.masterTerminate);
+ assertEquals(0, this.masterTerminate++);
master.setResult(Arrays.asList(
this.workerSetup,
this.workerTerminate,
this.masterSetup,
- ++this.masterTerminate));
+ this.masterTerminate));
}
};
}