You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/02 19:29:41 UTC

[1/3] incubator-tinkerpop git commit: removed a join() and a cache() from the Spark message passing algorithm. The speed went from 11minutes to 7.1minutes on my local machine test graph against SparkServer. Going to test on Friendster and the Blades.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 6a8fa2879 -> 4ef878686


removed a join() and a cache() from the Spark message passing algorithm. The speed went from 11minutes to 7.1minutes on my local machine test graph against SparkServer. Going to test on Friendster and the Blades.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/41559076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/41559076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/41559076

Branch: refs/heads/master
Commit: 415590765319b7059332bad599dffb83afcfa940
Parents: 6c18af8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 11:01:53 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:37 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java   | 101 ++++++++++++-------
 .../computer/spark/SparkGraphComputer.java      |   7 +-
 .../computer/spark/payload/MessagePayload.java  |  38 +++++++
 .../process/computer/spark/payload/Payload.java |  28 +++++
 .../spark/payload/ViewIncomingPayload.java      |  59 +++++++++++
 .../spark/payload/ViewOutgoingPayload.java      |  49 +++++++++
 .../computer/spark/payload/ViewPayload.java     |  42 ++++++++
 7 files changed, 284 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 6afaa96..4246f42 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.Payload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewPayload;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -45,7 +50,6 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
@@ -62,26 +66,26 @@ public final class SparkExecutor {
     // VERTEX PROGRAM //
     ////////////////////
 
-    public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(
+    public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
             final JavaPairRDD<Object, VertexWritable> graphRDD,
-            final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD,
+            final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
             final SparkMemory memory,
             final Configuration apacheConfiguration) {
 
-        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = ((null == viewAndMessagesRDD) ?
-                graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<Tuple2<List<DetachedVertexProperty<Object>>, List<M>>>absent())) : // first iteration will not have any views or messages
-                graphRDD.leftOuterJoin(viewAndMessagesRDD))                                                                                                    // every other iteration may have views and messages
+        final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
+                graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
+                graphRDD.leftOuterJoin(viewIncomingRDD))                                                                                                    // every other iteration may have views and messages
                 // for each partition of vertices
                 .mapPartitionsToPair(partitionIterator -> {
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
                     final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
                     final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
                     workerVertexProgram.workerIterationStart(memory); // start the worker
-                    return () -> IteratorUtils.map(partitionIterator, vertexViewAndMessages -> {
-                        final Vertex vertex = vertexViewAndMessages._2()._1().get();
-                        final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
-                        final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._1() : Collections.emptyList();
-                        final List<M> incomingMessages = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._2() : Collections.emptyList();
+                    return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
+                        final Vertex vertex = vertexViewIncoming._2()._1().get();
+                        final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
+                        final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
+                        final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property));  // attach the view to the vertex
                         ///
                         final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
@@ -93,56 +97,79 @@ public final class SparkExecutor {
                         final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
                         if (!partitionIterator.hasNext())
                             workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
-                        return new Tuple2<>(vertex.id(), new Tuple2<>(nextView, outgoingMessages));
+                        return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
                 });
 
-        viewAndOutgoingMessagesRDD.cache();  // will use twice (once for message passing and once for view isolation)
-
         // "message pass" by reducing on the vertex object id of the message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, List<M>> incomingMessagesRDD = viewAndOutgoingMessagesRDD
-                .mapValues(Tuple2::_2)
-                .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2().iterator(), message -> {
-                    final List<M> list = (null == messageCombiner) ? new ArrayList<>() : new ArrayList<>(1);
-                    list.add(message._2());
-                    return new Tuple2<>(message._1(), list);
-                })).reduceByKey((a, b) -> {
-                    if (null == messageCombiner) {
-                        a.addAll(b);
+        final JavaPairRDD<Object, Payload> newViewIncomingRDD = viewOutgoingRDD
+                .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+                        IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
+                        IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
+                .reduceByKey((a, b) -> {
+                    if (a instanceof ViewIncomingPayload) {
+                        if (b instanceof MessagePayload)
+                            ((ViewIncomingPayload<M>) a).addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
+                        else if (b instanceof ViewPayload)
+                            ((ViewIncomingPayload<M>) a).setView(((ViewPayload) b).getView());
+                        else if (b instanceof ViewIncomingPayload)
+                            throw new IllegalStateException("It should never be the case that two views reduce to the same key");
                         return a;
+                    } else if (b instanceof ViewIncomingPayload) {
+                        if (a instanceof MessagePayload)
+                            ((ViewIncomingPayload<M>) b).addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
+                        else if (a instanceof ViewPayload)
+                            ((ViewIncomingPayload<M>) b).setView(((ViewPayload) a).getView());
+                        else if (a instanceof ViewIncomingPayload)
+                            throw new IllegalStateException("It should never be the case that two views reduce to the same key");
+                        return b;
                     } else {
-                        a.set(0, messageCombiner.combine(a.get(0), b.get(0)));
-                        return a;
+                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>();
+                        if (a instanceof MessagePayload)
+                            c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
+                        else if (a instanceof ViewPayload)
+                            c.setView(((ViewPayload) a).getView());
+                        if (b instanceof MessagePayload)
+                            c.addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
+                        else if (b instanceof ViewPayload)
+                            c.setView(((ViewPayload) b).getView());
+                        return c;
+                    }
+                })
+                .mapValues(payload -> {
+                    if (payload instanceof ViewIncomingPayload)
+                        return payload;
+                    else {
+                        final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
+                        if (payload instanceof ViewPayload)
+                            viewIncomingPayload.setView(((ViewPayload) payload).getView());
+                        else
+                            throw new IllegalStateException("It should never be the case that a view is not emitted");
+                        return viewIncomingPayload;
                     }
                 });
 
-        // isolate the views and then join the incoming messages
-        final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndIncomingMessagesRDD = viewAndOutgoingMessagesRDD
-                .mapValues(Tuple2::_1)
-                .leftOuterJoin(incomingMessagesRDD) // there will always be views (even if empty), but there will not always be incoming messages
-                .mapValues(tuple -> new Tuple2<>(tuple._1(), tuple._2().or(Collections.emptyList())));
-
-        viewAndIncomingMessagesRDD.foreachPartition(partitionIterator -> {
+        newViewIncomingRDD.foreachPartition(partitionIterator -> {
         }); // need to complete a task so its BSP and the memory for this iteration is updated
-        return viewAndIncomingMessagesRDD;
+        return (JavaPairRDD) newViewIncomingRDD;
     }
 
     /////////////////
     // MAP REDUCE //
     ////////////////
 
-    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD) {
-        return (null == viewAndMessagesRDD) ?
+    public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD) {
+        return (null == viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> {
                     vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
                     return vertexWritable;
                 }) :
-                graphRDD.leftOuterJoin(viewAndMessagesRDD)
+                graphRDD.leftOuterJoin(viewIncomingRDD)
                         .mapValues(tuple -> {
                             final Vertex vertex = tuple._1().get();
                             vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
-                            final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
+                            final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                             view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
                             return tuple._1();
                         });

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index e76459b..8c63f8a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -29,6 +29,7 @@ import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -166,7 +167,7 @@ public final class SparkGraphComputer implements GraphComputer {
                                 .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
                                 .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
                                 .cache(); // partition the graph across the cluster
-                        JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessagesRDD = null;
+                        JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
 
                         ////////////////////////////////
                         // process the vertex program //
@@ -184,7 +185,7 @@ public final class SparkGraphComputer implements GraphComputer {
                             // execute the vertex program
                             while (true) {
                                 memory.setInTask(true);
-                                viewAndMessagesRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessagesRDD, memory, vertexProgramConfiguration);
+                                viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
                                 memory.setInTask(false);
                                 if (this.vertexProgram.terminate(memory))
                                     break;
@@ -205,7 +206,7 @@ public final class SparkGraphComputer implements GraphComputer {
                         //////////////////////////////
                         if (!this.mapReducers.isEmpty()) {
                             // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
-                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewAndMessagesRDD).cache();
+                            final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
                             // graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
 
                             for (final MapReduce mapReduce : this.mapReducers) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
new file mode 100644
index 0000000..1dde098
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
@@ -0,0 +1,38 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class MessagePayload<M> implements Payload {
+
+    private final M message;
+
+    public MessagePayload(final M message) {
+        this.message = message;
+    }
+
+    public M getMessage() {
+        return this.message;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
new file mode 100644
index 0000000..0937991
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
@@ -0,0 +1,28 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Payload {
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
new file mode 100644
index 0000000..be91b2e
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -0,0 +1,59 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ViewIncomingPayload<M> implements Payload {
+
+    private List<DetachedVertexProperty<Object>> view = null;
+    private final List<M> incomingMessages = new ArrayList<>();
+
+    public ViewIncomingPayload() {
+    }
+
+    public List<DetachedVertexProperty<Object>> getView() {
+        return this.view;
+    }
+
+    public void setView(final List<DetachedVertexProperty<Object>> view) {
+        this.view = view;
+    }
+
+    public List<M> getIncomingMessages() {
+        return incomingMessages;
+    }
+
+    public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {
+        if (this.incomingMessages.isEmpty() || null == messageCombiner)
+            this.incomingMessages.add(message);
+        else
+            this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
new file mode 100644
index 0000000..993f029
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
@@ -0,0 +1,49 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+import scala.Tuple2;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ViewOutgoingPayload<M> implements Payload {
+
+    private final List<DetachedVertexProperty<Object>> view;
+    private final List<Tuple2<Object,M>> outgoingMessages;
+
+    public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view, final List<Tuple2<Object,M>> outgoingMessages) {
+        this.view = view;
+        this.outgoingMessages = outgoingMessages;
+    }
+
+    public ViewPayload getView() {
+        return new ViewPayload(this.view);
+    }
+
+    public List<Tuple2<Object,M>> getOutgoingMessages() {
+        return this.outgoingMessages;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
new file mode 100644
index 0000000..977d926
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
@@ -0,0 +1,42 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ViewPayload implements Payload {
+
+    private final List<DetachedVertexProperty<Object>> view;
+
+    public ViewPayload(final List<DetachedVertexProperty<Object>> view) {
+        this.view = view;
+    }
+
+    public List<DetachedVertexProperty<Object>> getView() {
+        return this.view;
+    }
+}


[3/3] incubator-tinkerpop git commit: a few object creation optimizations.

Posted by ok...@apache.org.
a few object creation optimizations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4ef87868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4ef87868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4ef87868

Branch: refs/heads/master
Commit: 4ef8786867daf0a0618387185d85d64302ef860d
Parents: 4155907
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 11:22:14 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:38 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java      | 17 +++++++----------
 .../process/computer/spark/SparkGraphComputer.java | 11 +++++------
 .../spark/payload/ViewIncomingPayload.java         | 14 ++++++++++----
 3 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 4246f42..512c0e0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -74,7 +74,7 @@ public final class SparkExecutor {
 
         final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
-                graphRDD.leftOuterJoin(viewIncomingRDD))                                                                                                    // every other iteration may have views and messages
+                graphRDD.leftOuterJoin(viewIncomingRDD))                                                                        // every other iteration may have views and messages
                 // for each partition of vertices
                 .mapPartitionsToPair(partitionIterator -> {
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
@@ -103,7 +103,7 @@ public final class SparkExecutor {
 
         // "message pass" by reducing on the vertex object id of the message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, Payload> newViewIncomingRDD = viewOutgoingRDD
+        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
                 .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
@@ -125,7 +125,7 @@ public final class SparkExecutor {
                             throw new IllegalStateException("It should never be the case that two views reduce to the same key");
                         return b;
                     } else {
-                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>();
+                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
                         if (a instanceof MessagePayload)
                             c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
                         else if (a instanceof ViewPayload)
@@ -139,20 +139,17 @@ public final class SparkExecutor {
                 })
                 .mapValues(payload -> {
                     if (payload instanceof ViewIncomingPayload)
-                        return payload;
-                    else {
+                        return (ViewIncomingPayload<M>) payload;
+                    else {  // this means the vertex has no incoming messages
                         final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
-                        if (payload instanceof ViewPayload)
-                            viewIncomingPayload.setView(((ViewPayload) payload).getView());
-                        else
-                            throw new IllegalStateException("It should never be the case that a view is not emitted");
+                        viewIncomingPayload.setView(((ViewPayload) payload).getView());
                         return viewIncomingPayload;
                     }
                 });
 
         newViewIncomingRDD.foreachPartition(partitionIterator -> {
         }); // need to complete a task so its BSP and the memory for this iteration is updated
-        return (JavaPairRDD) newViewIncomingRDD;
+        return newViewIncomingRDD;
     }
 
     /////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 8c63f8a..1df99d3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -44,14 +44,12 @@ import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 import java.io.File;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -164,8 +162,8 @@ public final class SparkGraphComputer implements GraphComputer {
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class)
-                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
-                                .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
+                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                                .reduceByKey((a, b) -> a) // TODO: why is this necessary?
                                 .cache(); // partition the graph across the cluster
                         JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
 
@@ -207,9 +205,10 @@ public final class SparkGraphComputer implements GraphComputer {
                         if (!this.mapReducers.isEmpty()) {
                             // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
                             final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
-                            // graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
-
+                            // TODO: boolean first = true;
                             for (final MapReduce mapReduce : this.mapReducers) {
+                                // TODO: if (first) first = false;
+                                // TODO: else graphRDD.unpersist();  // the original graphRDD is no longer needed so free up its memory
                                 // execute the map reduce job
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                                 mapReduce.storeState(newApacheConfiguration);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
index be91b2e..2de9736 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -33,13 +34,18 @@ import java.util.List;
 public class ViewIncomingPayload<M> implements Payload {
 
     private List<DetachedVertexProperty<Object>> view = null;
-    private final List<M> incomingMessages = new ArrayList<>();
+    private final List<M> incomingMessages;
 
-    public ViewIncomingPayload() {
+    public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
+        this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
+    }
+
+    public ViewIncomingPayload()  {
+        this.incomingMessages = null;
     }
 
     public List<DetachedVertexProperty<Object>> getView() {
-        return this.view;
+        return null == this.view ? Collections.emptyList() : this.view;
     }
 
     public void setView(final List<DetachedVertexProperty<Object>> view) {
@@ -47,7 +53,7 @@ public class ViewIncomingPayload<M> implements Payload {
     }
 
     public List<M> getIncomingMessages() {
-        return incomingMessages;
+        return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
     }
 
     public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {


[2/3] incubator-tinkerpop git commit: excluded the Spark dependency on the LGPL FindBugs library.

Posted by ok...@apache.org.
excluded the Spark dependency on the LGPL FindBugs library.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/6c18af82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/6c18af82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/6c18af82

Branch: refs/heads/master
Commit: 6c18af829675eae11eb273b3b73898bb3efc4bd9
Parents: 6a8fa28
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 09:20:40 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:37 2015 -0600

----------------------------------------------------------------------
 hadoop-gremlin/pom.xml                                         | 5 +++++
 .../gremlin/hadoop/process/computer/spark/SparkExecutor.java   | 6 +++---
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6c18af82/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index a7e039c..03f77ba 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -168,6 +168,11 @@ limitations under the License.
                     <groupId>io.netty</groupId>
                     <artifactId>netty</artifactId>
                 </exclusion>
+                <!-- lgpl conflics -->
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>findbugs</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <!-- consistent dependencies -->

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6c18af82/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 65a75c1..6afaa96 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -79,7 +79,7 @@ public final class SparkExecutor {
                     workerVertexProgram.workerIterationStart(memory); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewAndMessages -> {
                         final Vertex vertex = vertexViewAndMessages._2()._1().get();
-                        final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent();
+                        final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._1() : Collections.emptyList();
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._2() : Collections.emptyList();
                         previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property));  // attach the view to the vertex
@@ -120,8 +120,8 @@ public final class SparkExecutor {
         // isolate the views and then join the incoming messages
         final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndIncomingMessagesRDD = viewAndOutgoingMessagesRDD
                 .mapValues(Tuple2::_1)
-                .fullOuterJoin(incomingMessagesRDD)
-                .mapValues(tuple -> new Tuple2<>(tuple._1().or(Collections.emptyList()), tuple._2().or(Collections.emptyList())));
+                .leftOuterJoin(incomingMessagesRDD) // there will always be views (even if empty), but there will not always be incoming messages
+                .mapValues(tuple -> new Tuple2<>(tuple._1(), tuple._2().or(Collections.emptyList())));
 
         viewAndIncomingMessagesRDD.foreachPartition(partitionIterator -> {
         }); // need to complete a task so its BSP and the memory for this iteration is updated