You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/01 16:15:52 UTC
incubator-tinkerpop git commit: Cleaned up the SparkGraphComputer
algorithm with comments and some reorganization of code. A few minor
optimizations here and there.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 1244a8fa8 -> 815a378f0
Cleaned up the SparkGraphComputer algorithm with comments and some reorganization of code. A few minor optimizations here and there.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/815a378f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/815a378f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/815a378f
Branch: refs/heads/master
Commit: 815a378f0b112e0d3034ab56a2ca8aeb846bdda1
Parents: 1244a8f
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Apr 1 08:15:47 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Apr 1 08:15:47 2015 -0600
----------------------------------------------------------------------
.../gremlin/process/computer/GraphComputer.java | 7 +-
.../process/computer/spark/SparkExecutor.java | 104 ++++++++++------
.../computer/spark/SparkGraphComputer.java | 65 ++++++----
.../computer/spark/SparkMessagePayload.java | 49 --------
.../process/computer/spark/SparkPayload.java | 48 --------
.../computer/spark/SparkVertexPayload.java | 122 -------------------
6 files changed, 111 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index 1f2b3f3..e847772 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -129,9 +129,6 @@ public interface GraphComputer {
}
public interface Features {
- public default boolean supportsWorkerPersistenceBetweenIterations() {
- return true;
- }
public default boolean supportsGlobalMessageScopes() {
return true;
@@ -173,6 +170,10 @@ public interface GraphComputer {
return true;
}
+ public default boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
+ return true;
+ }
+
public default boolean supportsIsolation(final Isolation isolation) {
return true;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 72aa4d2..5e4996a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -30,11 +30,14 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
@@ -46,7 +49,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -56,58 +58,68 @@ public final class SparkExecutor {
private SparkExecutor() {
}
- public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessageRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+ ////////////////////
+ // VERTEX PROGRAM //
+ ////////////////////
- // execute vertex program iteration
- final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = null == viewAndMessageRDD ?
+ public static <M> JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> executeVertexProgramIteration(
+ final JavaPairRDD<Object, VertexWritable> graphRDD,
+ final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD,
+ final SparkMemory memory,
+ final Configuration apacheConfiguration) {
+
+ final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Tuple2<Object, M>>>> viewAndOutgoingMessagesRDD = null == viewAndMessagesRDD ?
+ // if this is the first iteration, there are now views or incoming messages
graphRDD.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
- final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
- final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
- workerVertexProgram.workerIterationStart(memory);
+ final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
+ final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+ workerVertexProgram.workerIterationStart(memory); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexWritable -> {
final Vertex vertex = vertexWritable._2().get();
- final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, Collections.emptyList());
- workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
- final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
- final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
- if (null != elementComputeKeysArray)
+ final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, Collections.emptyList()); // create the messenger with no incoming messages
+ workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+ final List<DetachedVertexProperty<Object>> newView = new ArrayList<>(); // get the computed view
+ if (null != elementComputeKeysArray) // not all vertex programs have compute keys
vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
if (!partitionIterator.hasNext())
- workerVertexProgram.workerIterationEnd(memory);
+ workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
});
}) :
- graphRDD.leftOuterJoin(viewAndMessageRDD)
+ // join the view/messages to the graph
+ graphRDD.leftOuterJoin(viewAndMessagesRDD)
.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
- final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
- final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]);
- workerVertexProgram.workerIterationStart(memory);
+ final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
+ final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+ workerVertexProgram.workerIterationStart(memory); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexWritableAndIncomingMessages -> {
final Vertex vertex = vertexWritableAndIncomingMessages._2()._1().get();
final List<M> incomingMessages = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._2() : Collections.emptyList();
final List<DetachedVertexProperty<Object>> view = vertexWritableAndIncomingMessages._2()._2().isPresent() ? vertexWritableAndIncomingMessages._2()._2().get()._1() : Collections.emptyList();
- view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
- final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages);
- workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory);
- final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages();
- final List<DetachedVertexProperty<Object>> newView = new ArrayList<>();
- if (null != elementComputeKeysArray)
+ view.forEach(property -> DetachedVertexProperty.addTo(vertex, property)); // attach the view to the vertex
+ final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
+ workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+ final List<DetachedVertexProperty<Object>> newView = new ArrayList<>(); // get the computed view
+ if (null != elementComputeKeysArray) // not all vertex programs have compute keys
vertex.properties(elementComputeKeysArray).forEachRemaining(property -> newView.add(DetachedFactory.detach(property, true)));
if (!partitionIterator.hasNext())
- workerVertexProgram.workerIterationEnd(memory);
+ workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
return new Tuple2<>(vertex.id(), new Tuple2<>(newView, outgoingMessages));
});
});
- viewAndOutgoingMessagesRDD.cache();
+
+ viewAndOutgoingMessagesRDD.cache(); // will use twice (once for message passing and once for view isolation)
// "message pass" by reducing on the vertex object id of the message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
- final JavaPairRDD<Object, List<M>> incomingMessages = viewAndOutgoingMessagesRDD
+ final JavaPairRDD<Object, List<M>> incomingMessagesRDD = viewAndOutgoingMessagesRDD
.mapValues(Tuple2::_2)
.flatMapToPair(tuple -> () -> IteratorUtils.map(tuple._2().iterator(), message -> {
- final List<M> list = new ArrayList<>();
+ final List<M> list = (null == messageCombiner) ? new ArrayList<>() : new ArrayList<>(1);
list.add(message._2());
return new Tuple2<>(message._1(), list);
})).reduceByKey((a, b) -> {
@@ -115,35 +127,49 @@ public final class SparkExecutor {
a.addAll(b);
return a;
} else {
- final M m = messageCombiner.combine(a.get(0), b.get(0));
- a.clear();
- b.clear();
- a.add(m);
+ a.set(0, messageCombiner.combine(a.get(0), b.get(0)));
return a;
}
});
- final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> newViewMessageRDD = viewAndOutgoingMessagesRDD
+ // isolate the views and then join the incoming messages
+ final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndIncomingMessagesRDD = viewAndOutgoingMessagesRDD
.mapValues(Tuple2::_1)
- .fullOuterJoin(incomingMessages)
+ .fullOuterJoin(incomingMessagesRDD)
.mapValues(tuple -> new Tuple2<>(tuple._1().or(Collections.emptyList()), tuple._2().or(Collections.emptyList())));
- newViewMessageRDD.foreachPartition(partitionIterator -> {
- }); // need to complete a task so its BSP.
- return newViewMessageRDD;
+ viewAndIncomingMessagesRDD.foreachPartition(partitionIterator -> {
+ }); // need to complete a task so its BSP and the memory for this iteration is updated
+ return viewAndIncomingMessagesRDD;
}
/////////////////
// MAP REDUCE //
////////////////
+ public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<M>>> viewAndMessagesRDD) {
+ return (null == viewAndMessagesRDD) ?
+ graphRDD.mapValues(vertexWritable -> {
+ vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+ return vertexWritable;
+ }) :
+ graphRDD.leftOuterJoin(viewAndMessagesRDD)
+ .mapValues(tuple -> {
+ final Vertex vertex = tuple._1().get();
+ vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
+ final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
+ view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
+ return tuple._1();
+ });
+ }
+
public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
workerMapReduce.workerStart(MapReduce.Stage.MAP);
final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
- return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
- workerMapReduce.map(keyValue._2().get(), mapEmitter);
+ return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
+ workerMapReduce.map(vertexWritable._2().get(), mapEmitter);
if (!partitionIterator.hasNext())
workerMapReduce.workerEnd(MapReduce.Stage.MAP);
return mapEmitter.getEmissions();
@@ -222,7 +248,7 @@ public final class SparkExecutor {
if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
else
- HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+ mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index f09b649..8a1278a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -42,9 +42,6 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.slf4j.Logger;
@@ -52,7 +49,6 @@ import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.File;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -167,10 +163,10 @@ public final class SparkGraphComputer implements GraphComputer {
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
.reduceByKey((a, b) -> a) // TODO: test without doing this reduce
.cache(); // partition the graph across the cluster
- JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessageRDD = null;
+ JavaPairRDD<Object, Tuple2<List<DetachedVertexProperty<Object>>, List<Object>>> viewAndMessagesRDD = null;
////////////////////////////////
// process the vertex program //
@@ -188,7 +184,7 @@ public final class SparkGraphComputer implements GraphComputer {
// execute the vertex program
while (true) {
memory.setInTask(true);
- viewAndMessageRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessageRDD, memory, vertexProgramConfiguration);
+ viewAndMessagesRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewAndMessagesRDD, memory, vertexProgramConfiguration);
memory.setInTask(false);
if (this.vertexProgram.terminate(memory))
break;
@@ -209,21 +205,8 @@ public final class SparkGraphComputer implements GraphComputer {
//////////////////////////////
if (!this.mapReducers.isEmpty()) {
// drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
- final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = null == viewAndMessageRDD ? // TODO: move to SparkExecutor
- graphRDD.mapValues(vertexWritable -> {
- vertexWritable.get().edges(Direction.BOTH).forEachRemaining(Edge::remove);
- return vertexWritable;
- })
- .cache() :
- graphRDD.leftOuterJoin(viewAndMessageRDD)
- .mapValues(tuple -> {
- final Vertex vertex = tuple._1().get();
- vertex.edges(Direction.BOTH).forEachRemaining(Edge::remove);
- final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get()._1() : Collections.emptyList();
- view.forEach(property -> DetachedVertexProperty.addTo(vertex, property));
- return tuple._1();
- })
- .cache();
+ final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewAndMessagesRDD).cache();
+ graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
@@ -274,7 +257,43 @@ public final class SparkGraphComputer implements GraphComputer {
@Override
public Features features() {
return new Features() {
- @Override
+
+ public boolean supportsVertexAddition() {
+ return false;
+ }
+
+ public boolean supportsVertexRemoval() {
+ return false;
+ }
+
+ public boolean supportsVertexPropertyRemoval() {
+ return false;
+ }
+
+ public boolean supportsEdgeAddition() {
+ return false;
+ }
+
+ public boolean supportsEdgeRemoval() {
+ return false;
+ }
+
+ public boolean supportsEdgePropertyAddition() {
+ return false;
+ }
+
+ public boolean supportsEdgePropertyRemoval() {
+ return false;
+ }
+
+ public boolean supportsIsolation(final Isolation isolation) {
+ return isolation.equals(Isolation.BSP);
+ }
+
+ public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
+ return persist.equals(Persist.NOTHING) || resultGraph.equals(ResultGraph.NEW);
+ }
+
public boolean supportsDirectObjects() {
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
deleted file mode 100644
index 4e58cf1..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessagePayload.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMessagePayload<M> implements Serializable, SparkPayload<M> {
-
- public final List<M> messages = new ArrayList<>();
-
- public SparkMessagePayload() {
-
- }
-
- public SparkMessagePayload(final M message) {
- this.messages.add(message);
- }
-
- @Override
- public List<M> getMessages() {
- return this.messages;
- }
-
- @Override
- public final boolean isVertex() {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
deleted file mode 100644
index 3ec1d5b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-
-import java.util.List;
-import java.util.stream.Stream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface SparkPayload<M> {
-
- public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
- if (null != messageCombiner) {
- final M message = Stream.concat(this.getMessages().stream(),otherMessages.stream()).reduce(messageCombiner::combine).get();
- this.getMessages().clear();
- this.getMessages().add(message);
- } else {
- this.getMessages().addAll(otherMessages);
- }
- }
-
- public List<M> getMessages();
-
- public boolean isVertex();
-
- public default SparkVertexPayload<M> asVertexPayload() {
- return (SparkVertexPayload<M>) this;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/815a378f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
deleted file mode 100644
index 0610750..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import scala.Tuple2;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M>, Serializable {
-
- private final VertexWritable vertexWritable;
- private final List<M> incoming;
- private List<Tuple2<Object, M>> outgoing;
-
- public SparkVertexPayload(final Vertex vertex) {
- this.vertexWritable = new VertexWritable(vertex);
- this.incoming = new ArrayList<>();
- this.outgoing = new ArrayList<>();
- }
-
- @Override
- public final boolean isVertex() {
- return true;
- }
-
- @Override
- public SparkVertexPayload<M> asVertexPayload() {
- return this;
- }
-
- @Override
- public List<M> getMessages() {
- return this.incoming;
- }
-
- public Vertex getVertex() {
- return this.vertexWritable.get();
- }
-
- public VertexWritable getVertexWritable() {
- return this.vertexWritable;
- }
-
- public List<Tuple2<Object, M>> getOutgoingMessages() {
- return this.outgoing;
- }
-
- public Iterator<Tuple2<Object, M>> detachOutgoingMessages() {
- final Iterator<Tuple2<Object, M>> messages = this.outgoing.iterator();
- this.outgoing = new ArrayList<>();
- return messages;
- }
-
- /*public Iterator<M> detachIncomingMessages() {
- final Iterator<M> messages = this.incoming.iterator();
- this.incoming = new ArrayList<>();
- return messages;
- }*/
-
- ///////////
-
- @Override
- public Iterator<M> receiveMessages(final MessageScope messageScope) {
- return this.incoming.iterator();
- }
-
- @Override
- public void sendMessage(final MessageScope messageScope, final M message) {
- if (messageScope instanceof MessageScope.Local) {
- final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
- final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkVertexPayload.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertexWritable.get());
- final Direction direction = SparkVertexPayload.getOppositeDirection(incidentTraversal);
- incidentTraversal.forEachRemaining(edge -> this.outgoing.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
- } else {
- ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoing.add(new Tuple2<>(v.id(), message)));
- }
- }
-
- ///////////
-
- private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
- incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
- return (T) incidentTraversal;
- }
-
- private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
- final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
- return step.getDirection().opposite();
- }
-}