You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/06 20:54:40 UTC

[09/30] incubator-tinkerpop git commit: added a MapReduce test. We now verify that GraphFilter works for both VertexProgram+MapReduce and MapReduce only. TinkerGraph and Spark integration tests pass.

added a MapReduce test. We now verify that GraphFilter works for both VertexProgram+MapReduce and MapReduce only. TinkerGraph and Spark integration tests pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/7ad48f20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/7ad48f20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/7ad48f20

Branch: refs/heads/master
Commit: 7ad48f20586ec58b1fea7018fa8f37ec8c95c9b9
Parents: eee16c9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Feb 2 12:44:57 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Feb 2 12:44:57 2016 -0700

----------------------------------------------------------------------
 .../process/computer/GraphComputerTest.java     | 165 ++++++++++++++++++-
 .../process/computer/TinkerGraphComputer.java   |   4 +
 2 files changed, 161 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7ad48f20/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 16559c1..4e0a154 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -1486,14 +1486,25 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
     @Test
     @LoadGraphWith(MODERN)
     public void shouldSupportGraphFilter() throws Exception {
-        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).submit().get();
-        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).submit().get();
-        graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).submit().get();
-        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
-        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
-        graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).submit().get();
-        graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
-        graph.compute(graphComputerClass.get()).edges(__.outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
+        /// VERTEX PROGRAM + MAP REDUCE
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).program(new VertexProgramM(VertexProgramM.SOFTWARE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).program(new VertexProgramM(VertexProgramM.PEOPLE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).program(new VertexProgramM(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).program(new VertexProgramM(VertexProgramM.VERTICES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).program(new VertexProgramM(VertexProgramM.ONE_OUT_EDGE_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.outE()).program(new VertexProgramM(VertexProgramM.OUT_EDGES_ONLY)).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
+
+        /// MAP REDUCE ONLY
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("software")).mapReduce(new MapReduceJ(VertexProgramM.SOFTWARE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.bothE("knows")).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).vertices(__.hasLabel("person")).edges(__.<Vertex>bothE("knows").has("weight", P.gt(0.5f))).mapReduce(new MapReduceJ(VertexProgramM.PEOPLE_KNOWS_WELL_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.<Vertex>bothE().limit(0)).mapReduce(new MapReduceJ(VertexProgramM.VERTICES_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.<Vertex>outE().limit(1)).mapReduce(new MapReduceJ(VertexProgramM.ONE_OUT_EDGE_ONLY)).submit().get();
+        graph.compute(graphComputerClass.get()).edges(__.outE()).mapReduce(new MapReduceJ(VertexProgramM.OUT_EDGES_ONLY)).submit().get();
     }
 
     public static class VertexProgramM implements VertexProgram {
@@ -1653,5 +1664,143 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
 
     }
 
+    private static class MapReduceJ implements MapReduce<MapReduce.NullObject, Integer, MapReduce.NullObject, Integer, Integer> {
+
+        private String state;
+
+        public MapReduceJ() {
+        }
+
+        public MapReduceJ(final String state) {
+            this.state = state;
+        }
+
+        @Override
+        public void loadState(final Graph graph, final Configuration configuration) {
+            this.state = configuration.getString("state");
+        }
+
+        @Override
+        public void storeState(final Configuration configuration) {
+            configuration.setProperty("state", this.state);
+            MapReduce.super.storeState(configuration);
+        }
+
+        @Override
+        @SuppressWarnings("CloneDoesntCallSuperClone,CloneDoesntDeclareCloneNotSupportedException")
+        public MapReduceJ clone() {
+            return new MapReduceJ(this.state);
+        }
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return true;
+        }
+
+        @Override
+        public void map(final Vertex vertex, final MapEmitter<NullObject, Integer> emitter) {
+            emitter.emit(1);
+            switch (this.state) {
+                case VertexProgramM.SOFTWARE_ONLY: {
+                    assertEquals("software", vertex.label());
+                    break;
+                }
+                case VertexProgramM.PEOPLE_ONLY: {
+                    assertEquals("person", vertex.label());
+                    break;
+                }
+                case VertexProgramM.KNOWS_ONLY: {
+                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+                    break;
+                }
+                case VertexProgramM.PEOPLE_KNOWS_ONLY: {
+                    assertEquals("person", vertex.label());
+                    break;
+                }
+                case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
+                    assertEquals("person", vertex.label());
+                    break;
+                }
+                case VertexProgramM.VERTICES_ONLY: {
+                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+                    break;
+                }
+                case VertexProgramM.ONE_OUT_EDGE_ONLY: {
+                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+                    break;
+                }
+                case VertexProgramM.OUT_EDGES_ONLY: {
+                    assertTrue(vertex.label().equals("person") || vertex.label().equals("software"));
+                    break;
+                }
+                default:
+                    throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
+            }
+        }
+
+        @Override
+        public void combine(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
+            this.reduce(key, values, emitter);
+        }
+
+        @Override
+        public void reduce(final NullObject key, final Iterator<Integer> values, final ReduceEmitter<NullObject, Integer> emitter) {
+            int count = 0;
+            while (values.hasNext()) {
+                count = count + values.next();
+            }
+            emitter.emit(count);
+        }
+
+        @Override
+        public Integer generateFinalResult(final Iterator<KeyValue<NullObject, Integer>> keyValues) {
+            int counter = keyValues.next().getValue();
+            assertFalse(keyValues.hasNext());
+
+            switch (this.state) {
+                case VertexProgramM.SOFTWARE_ONLY: {
+                    assertEquals(2, counter);
+                    break;
+                }
+                case VertexProgramM.PEOPLE_ONLY: {
+                    assertEquals(4, counter);
+                    break;
+                }
+                case VertexProgramM.KNOWS_ONLY: {
+                    assertEquals(6, counter);
+                    break;
+                }
+                case VertexProgramM.PEOPLE_KNOWS_ONLY: {
+                    assertEquals(4, counter);
+                    break;
+                }
+                case VertexProgramM.PEOPLE_KNOWS_WELL_ONLY: {
+                    assertEquals(4, counter);
+                    break;
+                }
+                case VertexProgramM.VERTICES_ONLY: {
+                    assertEquals(6, counter);
+                    break;
+                }
+                case VertexProgramM.ONE_OUT_EDGE_ONLY: {
+                    assertEquals(6, counter);
+                    break;
+                }
+                case VertexProgramM.OUT_EDGES_ONLY: {
+                    assertEquals(6, counter);
+                    break;
+                }
+                default:
+                    throw new IllegalStateException("This is an illegal state for this test case: " + this.state);
+            }
+            return counter;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return "a";
+        }
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7ad48f20/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index c6dfdea..d38da14 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerHelper;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -169,6 +170,9 @@ public final class TinkerGraphComputer implements GraphComputer {
                             this.memory.completeSubRound();
                         }
                     }
+                } else {
+                    // MapReduce only
+                    TinkerHelper.createGraphComputerView(this.graph, this.graphFilter, Collections.emptySet());
                 }
 
                 // execute mapreduce jobs