You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/17 22:02:34 UTC

incubator-tinkerpop git commit: SingleMessenger to reduce the number of TraversalVertexProgram iterations by doing the first message locally.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master b61be9f84 -> e64153c4b


SingleMessenger to reduce the number of TraversalVertexProgram iterations by doing the first message locally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/e64153c4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/e64153c4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/e64153c4

Branch: refs/heads/master
Commit: e64153c4b36a0d7d76125d1b246b7bf9b5d98ada
Parents: b61be9f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Mar 17 15:02:24 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Mar 17 15:02:32 2015 -0600

----------------------------------------------------------------------
 .../computer/traversal/SingleMessenger.java     | 48 ++++++++++++++++++++
 .../traversal/TraversalVertexProgram.java       | 36 ++++++++++-----
 hadoop-gremlin/conf/giraph-graphson.properties  |  4 +-
 hadoop-gremlin/conf/giraph-gryo.properties      |  8 +++-
 .../hadoop/structure/HadoopConfiguration.java   | 37 +++++++++------
 .../hadoop/structure/util/HadoopHelper.java     |  1 -
 6 files changed, 103 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e64153c4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
new file mode 100644
index 0000000..c593e2b
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.traversal;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+
+import java.util.Arrays;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SingleMessenger<M> implements Messenger<M> {
+
+    private final Messenger<M> baseMessenger;
+    private final M message;
+
+    public SingleMessenger(final Messenger<M> baseMessenger, final M message) {
+        this.baseMessenger = baseMessenger;
+        this.message = message;
+    }
+
+    @Override
+    public Iterable<M> receiveMessages(final MessageScope messageScope) {
+        return Arrays.asList(this.message);
+    }
+
+    @Override
+    public void sendMessage(final MessageScope messageScope, final M message) {
+        this.baseMessenger.sendMessage(messageScope, message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e64153c4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index 8b10043..3732fab 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -20,8 +20,8 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.Traversal;
-import org.apache.tinkerpop.gremlin.process.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.TraversalSource;
 import org.apache.tinkerpop.gremlin.process.Traverser;
 import org.apache.tinkerpop.gremlin.process.TraverserGenerator;
 import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
@@ -43,12 +43,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -151,23 +151,35 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
             final GraphStep<Element> startStep = (GraphStep<Element>) this.traversal.getStartStep();
             final TraverserGenerator traverserGenerator = this.traversal.getTraverserGenerator();
             final String future = startStep.getNextStep().getId();
-            boolean voteToHalt = true;
-            final Iterator<? extends Element> starts = startStep.returnsVertices() ? IteratorUtils.of(vertex) : vertex.edges(Direction.OUT);
-            while (starts.hasNext()) {
-                final Element start = starts.next();
-                if (ElementHelper.idExists(start.id(), startStep.getIds())) {
-                    final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, startStep, 1l);
+            if (startStep.returnsVertices()) {  // VERTICES (process the first step locally)
+                if (ElementHelper.idExists(vertex.id(), startStep.getIds())) {
+                    final Traverser.Admin<Element> traverser = traverserGenerator.generate(vertex, startStep, 1l);
                     traverser.setStepId(future);
                     traverser.detach();
                     if (traverser.isHalted())
                         haltedTraversers.add((Traverser.Admin) traverser);
-                    else {
-                        voteToHalt = false;
-                        messenger.sendMessage(MessageScope.Global.of(vertex), new TraverserSet<>(traverser));
+                    else
+                        memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, new TraverserSet<>(traverser)), this.traversalMatrix));
+                }
+            } else {  // EDGES (process the first step via a message pass)
+                boolean voteToHalt = true;
+                final Iterator<Edge> starts = vertex.edges(Direction.OUT);
+                while (starts.hasNext()) {
+                    final Edge start = starts.next();
+                    if (ElementHelper.idExists(start.id(), startStep.getIds())) {
+                        final Traverser.Admin<Element> traverser = traverserGenerator.generate(start, startStep, 1l);
+                        traverser.setStepId(future);
+                        traverser.detach();
+                        if (traverser.isHalted())
+                            haltedTraversers.add((Traverser.Admin) traverser);
+                        else {
+                            voteToHalt = false;
+                            messenger.sendMessage(MessageScope.Global.of(vertex), new TraverserSet<>(traverser));
+                        }
                     }
                 }
+                memory.and(VOTE_TO_HALT, voteToHalt);
             }
-            memory.and(VOTE_TO_HALT, voteToHalt);
         } else {
             memory.and(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e64153c4/hadoop-gremlin/conf/giraph-graphson.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/giraph-graphson.properties b/hadoop-gremlin/conf/giraph-graphson.properties
index 090b0ce..b6f28c6 100644
--- a/hadoop-gremlin/conf/giraph-graphson.properties
+++ b/hadoop-gremlin/conf/giraph-graphson.properties
@@ -43,8 +43,8 @@ giraph.maxWorkers=2
 # Some of these parameters may be over written by Hadoop-Gremlin as deemed necessary.
 ##############################################################################################################
 # mapred.linerecordreader.maxlength=5242880
-# mapred.map.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
-# mapred.reduce.child.java.opts=-Xmx1024m -Dtinkerpop.profiling=true
+# mapred.map.child.java.opts=-Xmx1024m
+# mapred.reduce.child.java.opts=-Xmx1024m
 # mapred.map.tasks=6
 # mapred.reduce.tasks=3
 # mapred.job.reuse.jvm.num.tasks=-1

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e64153c4/hadoop-gremlin/conf/giraph-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/giraph-gryo.properties b/hadoop-gremlin/conf/giraph-gryo.properties
index 7813ad7..317d4a2 100644
--- a/hadoop-gremlin/conf/giraph-gryo.properties
+++ b/hadoop-gremlin/conf/giraph-gryo.properties
@@ -28,4 +28,10 @@ gremlin.hadoop.outputLocation=output
 #gremlin.traversalVertexProgram.traversalSupplier.object=org.apache.tinkerpop.gremlin.hadoop.process.computer.example.TraversalSupplier1
 
 giraph.minWorkers=2
-giraph.maxWorkers=2
\ No newline at end of file
+giraph.maxWorkers=2
+giraph.maxPartitionsInMemory=1
+giraph.userPartitionCount=2
+giraph.useOutOfCoreGraph=true
+giraph.isStaticGraph=true
+mapred.map.child.java.opts=-Xmx1024m
+mapred.reduce.child.java.opts=-Xmx1024m
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e64153c4/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 4f899a0..aa30af5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.util.StreamFactory;
+import org.javatuples.Pair;
 
 import java.io.Serializable;
 import java.util.HashMap;
@@ -34,7 +36,7 @@ import java.util.Map;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public class HadoopConfiguration extends AbstractConfiguration implements Serializable {
+public class HadoopConfiguration extends AbstractConfiguration implements Serializable, Iterable {
 
     private final Map<String, Object> properties = new HashMap<>();
 
@@ -75,6 +77,22 @@ public class HadoopConfiguration extends AbstractConfiguration implements Serial
 
     ///////
 
+    public Class<InputFormat<NullWritable, VertexWritable>> getGraphInputFormat() {
+        try {
+            return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT));
+        } catch (final ClassNotFoundException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
+    public Class<OutputFormat<NullWritable, VertexWritable>> getGraphOutputFormat() {
+        try {
+            return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT));
+        } catch (final ClassNotFoundException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+    }
+
     public String getInputLocation() {
         return this.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
     }
@@ -91,19 +109,8 @@ public class HadoopConfiguration extends AbstractConfiguration implements Serial
         this.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, outputLocation);
     }
 
-    public Class<InputFormat<NullWritable, VertexWritable>> getGraphInputFormat() {
-        try {
-            return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT));
-        } catch (final ClassNotFoundException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-
-    public Class<OutputFormat<NullWritable, VertexWritable>> getGraphOutputFormat() {
-        try {
-            return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT));
-        } catch (final ClassNotFoundException e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
+    @Override
+    public Iterator iterator() {
+        return StreamFactory.stream(this.getKeys()).map(k -> new Pair<>(k, this.getProperty(k))).iterator();
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/e64153c4/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
index 075d903..c6ad68a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/util/HadoopHelper.java
@@ -38,7 +38,6 @@ public final class HadoopHelper {
             newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "/" + Constants.HIDDEN_G);
             newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopGraph.configuration().getGraphOutputFormat()).getCanonicalName());
             newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
-
         } else {
             newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopGraph.configuration().getOutputLocation() + "_");
         }