You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/06 20:54:40 UTC
[09/30] incubator-tinkerpop git commit: added a MapReduce test. We
now verify that GraphFilter works for both VertexProgram+MapReduce and
MapReduce only. TinkerGraph and Spark integration tests pass.
added a MapReduce test. We now verify that GraphFilter works for both VertexProgram+MapReduce and MapReduce only. TinkerGraph and Spark integration tests pass.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/7ad48f20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/7ad48f20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/7ad48f20
Branch: refs/heads/master
Commit: 7ad48f20586ec58b1fea7018fa8f37ec8c95c9b9
Parents: eee16c9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Feb 2 12:44:57 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Feb 2 12:44:57 2016 -0700
----------------------------------------------------------------------
.../process/computer/GraphComputerTest.java | 165 ++++++++++++++++++-
.../process/computer/TinkerGraphComputer.java | 4 +
2 files changed, 161 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7ad48f20/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 16559c1..4e0a154 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -1486,14 +1486,25 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
@Test
@LoadGraphWith(MODERN)
public void shouldSupportGraphFilter() throws Exception {
- graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
- graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
- graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
- graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
- graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
- graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
- graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
- graph.compute(graphComputerClass.get()).edges(__.outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
+ /// VERTEX PROGRAM + MAP REDUCE
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
+
+ /// MAP REDUCE ONLY
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
+ graph.compute(graphComputerClass.get()).edges(__.outE()).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
}
public static class VertexProgramM implements VertexProgram {
@@ -1653,5 +1664,143 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
}
+ private static class MapReduceJ implements MapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
+
+ private String state;
+
+ public MapReduceJ() {
+ }
+
+ public MapReduceJ(final String state) {
+ this.state = state;
+ }
+
+ @Override
+ public void loadState(final Graph graph, final Configuration configuration) {
+ this.state = configuration.getString("state");
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ configuration.setProperty("state", this.state);
+ MapReduce.super.storeState(configuration);
+ }
+
+ @Override
+ @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
+ public MapReduceJ clone() {
+ return new MapReduceJ(this.state);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
+ emitter.emit(1);
+ switch (this.state) {
+ case VertexProgramM.SOFTWARE_ONLY: {
+ assertEquals("software", vertex.label());
+ break;
+ }
+ case VertexProgramM.PEOPLE_ONLY: {
+ assertEquals("person", vertex.label());
+ break;
+ }
+ case VertexProgramM.KNOWS_ONLY: {
+ assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+ break;
+ }
+ case VertexProgramM.PEOPLE_KNOWS_ONLY: {
+ assertEquals("person", vertex.label());
+ break;
+ }
+ case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
+ assertEquals("person", vertex.label());
+ break;
+ }
+ case VertexProgramM.VERTICES_ONLY: {
+ assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+ break;
+ }
+ case VertexProgramM.ONE_OUT_EDGE_ONLY: {
+ assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+ break;
+ }
+ case VertexProgramM.OUT_EDGES_ONLY: {
+ assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+ break;
+ }
+ default:
+ throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
+ }
+ }
+
+ @Override
+ public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
+ this.reduce(key, values, emitter);
+ }
+
+ @Override
+ public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
+ int count = 0;
+ while (values.hasNext()) {
+ count = count + values.next();
+ }
+ emitter.emit(count);
+ }
+
+ @Override
+ public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
+ int counter = keyValues.next().getValue();
+ assertFalse(keyValues.hasNext());
+
+ switch (this.state) {
+ case VertexProgramM.SOFTWARE_ONLY: {
+ assertEquals(2, counter);
+ break;
+ }
+ case VertexProgramM.PEOPLE_ONLY: {
+ assertEquals(4, counter);
+ break;
+ }
+ case VertexProgramM.KNOWS_ONLY: {
+ assertEquals(6, counter);
+ break;
+ }
+ case VertexProgramM.PEOPLE_KNOWS_ONLY: {
+ assertEquals(4, counter);
+ break;
+ }
+ case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
+ assertEquals(4, counter);
+ break;
+ }
+ case VertexProgramM.VERTICES_ONLY: {
+ assertEquals(6, counter);
+ break;
+ }
+ case VertexProgramM.ONE_OUT_EDGE_ONLY: {
+ assertEquals(6, counter);
+ break;
+ }
+ case VertexProgramM.OUT_EDGES_ONLY: {
+ assertEquals(6, counter);
+ break;
+ }
+ default:
+ throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
+ }
+ return counter;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return "a";
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7ad48f20/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index c6dfdea..d38da14 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -169,6 +170,9 @@ public final class TinkerGraphComputer implements GraphComputer {
this.memory.completeSubRound();
}
}
+ } else {
+ // MapReduce only
+ TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, Collections.emptySet());
}
// execute mapreduce jobs