You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/02 19:29:41 UTC
[1/3] incubator-tinkerpop git commit: removed a join() and a cache()
from the Spark message passing algorithm. The speed went from 11minutes to
7.1minutes on my local machine test graph against SparkServer. Going to test
on Friendster and the Blades.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 6a8fa2879 -> 4ef878686
removed a join() and a cache() from the Spark message passing algorithm. The speed went from 11minutes to 7.1minutes on my local machine test graph against SparkServer. Going to test on Friendster and the Blades.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/41559076
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/41559076
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/41559076
Branch: refs/heads/master
Commit: 415590765319b7059332bad599dffb83afcfa940
Parents: 6c18af8
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 11:01:53 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:37 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 101 ++++++++++++-------
.../computer/spark/SparkGraphComputer.java | 7 +-
.../computer/spark/payload/MessagePayload.java | 38 +++++++
.../process/computer/spark/payload/Payload.java | 28 +++++
.../spark/payload/ViewIncomingPayload.java | 59 +++++++++++
.../spark/payload/ViewOutgoingPayload.java | 49 +++++++++
.../computer/spark/payload/ViewPayload.java | 42 ++++++++
7 files changed, 284 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 6afaa96..4246f42 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -27,6 +27,11 @@ import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.Payload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewPayload;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -45,7 +50,6 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
@@ -62,26 +66,26 @@ public final class SparkExecutor {
// VERTEX PROGRAM //
////////////////////
- public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(
+ public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
final JavaPairRDD<Object, VertexWritable> graphRDD,
- final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD,
+ final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
final SparkMemory memory,
final Configuration apacheConfiguration) {
- final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = ((null == viewAndMessagesRDD) ?
- graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<Tuple2<List<DetachedVertexProperty<Object>>, List<M>>>absent())) : // first iteration will not have any views or messages
- graphRDD.leftOuterJoin(viewAndMessagesRDD)) // every other iteration may have views and messages
+ final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
+ graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
+ graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices
.mapPartitionsToPair(partitionIterator -> {
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
workerVertexProgram.workerIterationStart(memory); // start the worker
- return () -> IteratorUtils.map(partitionIterator, vertexViewAndMessages -> {
- final Vertex vertex = vertexViewAndMessages._2()._1().get();
- final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
- final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._1() : Collections.emptyList();
- final List<M> incomingMessages = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._2() : Collections.emptyList();
+ return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
+ final Vertex vertex = vertexViewIncoming._2()._1().get();
+ final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
+ final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
+ final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property)); // attach the view to the vertex
///
final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
@@ -93,56 +97,79 @@ public final class SparkExecutor {
final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
if (!partitionIterator.hasNext())
workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
- return new Tuple2<>(vertex.id(), new Tuple2<>(nextView, outgoingMessages));
+ return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
});
});
- viewAndOutgoingMessagesRDD.cache(); // will use twice (once for message passing and once for view isolation)
-
// "message pass" by reducing on the vertex object id of the message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
- final JavaPairRDD<Object, List<M>> incomingMessagesRDD = viewAndOutgoingMessagesRDD
- .mapValues(Tuple2::_2)
- .flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2().iterator(), message -> {
- final List<M> list = (null == messageCombiner) ? new ArrayList<>() : new ArrayList<>(1);
- list.add(message._2());
- return new Tuple2<>(message._1(), list);
- })).reduceByKey((a, b) -> {
- if (null == messageCombiner) {
- a.addAll(b);
+ final JavaPairRDD<Object, Payload> newViewIncomingRDD = viewOutgoingRDD
+ .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+ IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
+ IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
+ .reduceByKey((a, b) -> {
+ if (a instanceof ViewIncomingPayload) {
+ if (b instanceof MessagePayload)
+ ((ViewIncomingPayload<M>) a).addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
+ else if (b instanceof ViewPayload)
+ ((ViewIncomingPayload<M>) a).setView(((ViewPayload) b).getView());
+ else if (b instanceof ViewIncomingPayload)
+ throw new IllegalStateException("It should never be the case that two views reduce to the same key");
return a;
+ } else if (b instanceof ViewIncomingPayload) {
+ if (a instanceof MessagePayload)
+ ((ViewIncomingPayload<M>) b).addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
+ else if (a instanceof ViewPayload)
+ ((ViewIncomingPayload<M>) b).setView(((ViewPayload) a).getView());
+ else if (a instanceof ViewIncomingPayload)
+ throw new IllegalStateException("It should never be the case that two views reduce to the same key");
+ return b;
} else {
- a.set(0, messageCombiner.combine(a.get(0), b.get(0)));
- return a;
+ final ViewIncomingPayload<M> c = new ViewIncomingPayload<>();
+ if (a instanceof MessagePayload)
+ c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
+ else if (a instanceof ViewPayload)
+ c.setView(((ViewPayload) a).getView());
+ if (b instanceof MessagePayload)
+ c.addIncomingMessage(((MessagePayload<M>) b).getMessage(), messageCombiner);
+ else if (b instanceof ViewPayload)
+ c.setView(((ViewPayload) b).getView());
+ return c;
+ }
+ })
+ .mapValues(payload -> {
+ if (payload instanceof ViewIncomingPayload)
+ return payload;
+ else {
+ final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
+ if (payload instanceof ViewPayload)
+ viewIncomingPayload.setView(((ViewPayload) payload).getView());
+ else
+ throw new IllegalStateException("It should never be the case that a view is not emitted");
+ return viewIncomingPayload;
}
});
- // isolate the views and then join the incoming messages
- final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndIncomingMessagesRDD = viewAndOutgoingMessagesRDD
- .mapValues(Tuple2::_1)
- .leftOuterJoin(incomingMessagesRDD) // there will always be views (even if empty), but there will not always be incoming messages
- .mapValues(tuple -> new Tuple2<>(tuple._1(), tuple._2().or(Collections.emptyList())));
-
- viewAndIncomingMessagesRDD.foreachPartition(partitionIterator -> {
+ newViewIncomingRDD.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated
- return viewAndIncomingMessagesRDD;
+ return (JavaPairRDD) newViewIncomingRDD;
}
/////////////////
// MAP REDUCE //
////////////////
- public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD) {
- return (null == viewAndMessagesRDD) ?
+ public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD) {
+ return (null == viewIncomingRDD) ?
graphRDD.mapValues(vertexWritable -> {
vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
return vertexWritable;
}) :
- graphRDD.leftOuterJoin(viewAndMessagesRDD)
+ graphRDD.leftOuterJoin(viewIncomingRDD)
.mapValues(tuple -> {
final Vertex vertex = tuple._1().get();
vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
- final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
+ final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
return tuple._1();
});
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index e76459b..8c63f8a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -29,6 +29,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -166,7 +167,7 @@ public final class SparkGraphComputer implements GraphComputer {
.mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
.reduceByKey((a, b) -> a) // TODO: test without doing this reduce
.cache(); // partition the graph across the cluster
- JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessagesRDD = null;
+ JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
////////////////////////////////
// process the vertex program //
@@ -184,7 +185,7 @@ public final class SparkGraphComputer implements GraphComputer {
// execute the vertex program
while (true) {
memory.setInTask(true);
- viewAndMessagesRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessagesRDD, memory, vertexProgramConfiguration);
+ viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
memory.setInTask(false);
if (this.vertexProgram.terminate(memory))
break;
@@ -205,7 +206,7 @@ public final class SparkGraphComputer implements GraphComputer {
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
// drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
- final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewAndMessagesRDD).cache();
+ final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
// graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
for (final MapReduce mapReduce : this.mapReducers) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
new file mode 100644
index 0000000..1dde098
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class MessagePayload<M> implements Payload {
+
+ private final M message;
+
+ public MessagePayload(final M message) {
+ this.message = message;
+ }
+
+ public M getMessage() {
+ return this.message;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
new file mode 100644
index 0000000..0937991
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
@@ -0,0 +1,28 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Payload {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
new file mode 100644
index 0000000..be91b2e
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ViewIncomingPayload<M> implements Payload {
+
+ private List<DetachedVertexProperty<Object>> view = null;
+ private final List<M> incomingMessages = new ArrayList<>();
+
+ public ViewIncomingPayload() {
+ }
+
+ public List<DetachedVertexProperty<Object>> getView() {
+ return this.view;
+ }
+
+ public void setView(final List<DetachedVertexProperty<Object>> view) {
+ this.view = view;
+ }
+
+ public List<M> getIncomingMessages() {
+ return incomingMessages;
+ }
+
+ public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {
+ if (this.incomingMessages.isEmpty() || null == messageCombiner)
+ this.incomingMessages.add(message);
+ else
+ this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
new file mode 100644
index 0000000..993f029
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
@@ -0,0 +1,49 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+import scala.Tuple2;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ViewOutgoingPayload<M> implements Payload {
+
+ private final List<DetachedVertexProperty<Object>> view;
+ private final List<Tuple2<Object,M>> outgoingMessages;
+
+ public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view, final List<Tuple2<Object,M>> outgoingMessages) {
+ this.view = view;
+ this.outgoingMessages = outgoingMessages;
+ }
+
+ public ViewPayload getView() {
+ return new ViewPayload(this.view);
+ }
+
+ public List<Tuple2<Object,M>> getOutgoingMessages() {
+ return this.outgoingMessages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/41559076/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
new file mode 100644
index 0000000..977d926
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
@@ -0,0 +1,42 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing,
+ * * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * * KIND, either express or implied. See the License for the
+ * * specific language governing permissions and limitations
+ * * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
+
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class ViewPayload implements Payload {
+
+ private final List<DetachedVertexProperty<Object>> view;
+
+ public ViewPayload(final List<DetachedVertexProperty<Object>> view) {
+ this.view = view;
+ }
+
+ public List<DetachedVertexProperty<Object>> getView() {
+ return this.view;
+ }
+}
[3/3] incubator-tinkerpop git commit: a few object creation
optimizations.
Posted by ok...@apache.org.
a few object creation optimizations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4ef87868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4ef87868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4ef87868
Branch: refs/heads/master
Commit: 4ef8786867daf0a0618387185d85d64302ef860d
Parents: 4155907
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 11:22:14 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:38 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 17 +++++++----------
.../process/computer/spark/SparkGraphComputer.java | 11 +++++------
.../spark/payload/ViewIncomingPayload.java | 14 ++++++++++----
3 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 4246f42..512c0e0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -74,7 +74,7 @@ public final class SparkExecutor {
final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
- graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
+ graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices
.mapPartitionsToPair(partitionIterator -> {
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
@@ -103,7 +103,7 @@ public final class SparkExecutor {
// "message pass" by reducing on the vertex object id of the message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
- final JavaPairRDD<Object, Payload> newViewIncomingRDD = viewOutgoingRDD
+ final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
.flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
@@ -125,7 +125,7 @@ public final class SparkExecutor {
throw new IllegalStateException("It should never be the case that two views reduce to the same key");
return b;
} else {
- final ViewIncomingPayload<M> c = new ViewIncomingPayload<>();
+ final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
if (a instanceof MessagePayload)
c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
else if (a instanceof ViewPayload)
@@ -139,20 +139,17 @@ public final class SparkExecutor {
})
.mapValues(payload -> {
if (payload instanceof ViewIncomingPayload)
- return payload;
- else {
+ return (ViewIncomingPayload<M>) payload;
+ else { // this means the vertex has no incoming messages
final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
- if (payload instanceof ViewPayload)
- viewIncomingPayload.setView(((ViewPayload) payload).getView());
- else
- throw new IllegalStateException("It should never be the case that a view is not emitted");
+ viewIncomingPayload.setView(((ViewPayload) payload).getView());
return viewIncomingPayload;
}
});
newViewIncomingRDD.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated
- return (JavaPairRDD) newViewIncomingRDD;
+ return newViewIncomingRDD;
}
/////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 8c63f8a..1df99d3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -44,14 +44,12 @@ import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.File;
import java.util.HashSet;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -164,8 +162,8 @@ public final class SparkGraphComputer implements GraphComputer {
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
- .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+ .reduceByKey((a, b) -> a) // TODO: why is this necessary?
.cache(); // partition the graph across the cluster
JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
@@ -207,9 +205,10 @@ public final class SparkGraphComputer implements GraphComputer {
if (!this.mapReducers.isEmpty()) {
// drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
- // graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
-
+ // TODO: boolean first = true;
for (final MapReduce mapReduce : this.mapReducers) {
+ // TODO: if (first) first = false;
+ // TODO: else graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
index be91b2e..2de9736 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -33,13 +34,18 @@ import java.util.List;
public class ViewIncomingPayload<M> implements Payload {
private List<DetachedVertexProperty<Object>> view = null;
- private final List<M> incomingMessages = new ArrayList<>();
+ private final List<M> incomingMessages;
- public ViewIncomingPayload() {
+ public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
+ this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
+ }
+
+ public ViewIncomingPayload() {
+ this.incomingMessages = null;
}
public List<DetachedVertexProperty<Object>> getView() {
- return this.view;
+ return null == this.view ? Collections.emptyList() : this.view;
}
public void setView(final List<DetachedVertexProperty<Object>> view) {
@@ -47,7 +53,7 @@ public class ViewIncomingPayload<M> implements Payload {
}
public List<M> getIncomingMessages() {
- return incomingMessages;
+ return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
}
public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {
[2/3] incubator-tinkerpop git commit: excluded the Spark dependency
on the LGPL FindBugs library.
Posted by ok...@apache.org.
excluded the Spark dependency on the LGPL FindBugs library.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/6c18af82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/6c18af82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/6c18af82
Branch: refs/heads/master
Commit: 6c18af829675eae11eb273b3b73898bb3efc4bd9
Parents: 6a8fa28
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 09:20:40 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:37 2015 -0600
----------------------------------------------------------------------
hadoop-gremlin/pom.xml | 5 +++++
.../gremlin/hadoop/process/computer/spark/SparkExecutor.java | 6 +++---
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6c18af82/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index a7e039c..03f77ba 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -168,6 +168,11 @@ limitations under the License.
<groupId>io.netty</groupId>
<artifactId>netty</artifactId>
</exclusion>
+ <!-- lgpl conflics -->
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>findbugs</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<!-- consistent dependencies -->
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/6c18af82/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 65a75c1..6afaa96 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -79,7 +79,7 @@ public final class SparkExecutor {
workerVertexProgram.workerIterationStart(memory); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexViewAndMessages -> {
final Vertex vertex = vertexViewAndMessages._2()._1().get();
- final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent();
+ final boolean hasViewAndMessages = vertexViewAndMessages._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._1() : Collections.emptyList();
final List<M> incomingMessages = hasViewAndMessages ? vertexViewAndMessages._2()._2().get()._2() : Collections.emptyList();
previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property)); // attach the view to the vertex
@@ -120,8 +120,8 @@ public final class SparkExecutor {
// isolate the views and then join the incoming messages
final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndIncomingMessagesRDD = viewAndOutgoingMessagesRDD
.mapValues(Tuple2::_1)
- .fullOuterJoin(incomingMessagesRDD)
- .mapValues(tuple -> new Tuple2<>(tuple._1().or(Collections.emptyList()), tuple._2().or(Collections.emptyList())));
+ .leftOuterJoin(incomingMessagesRDD) // there will always be views (even if empty), but there will not always be incoming messages
+ .mapValues(tuple -> new Tuple2<>(tuple._1(), tuple._2().or(Collections.emptyList())));
viewAndIncomingMessagesRDD.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated