You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/03/27 17:09:37 UTC

[08/22] incubator-tinkerpop git commit: partitioning only needed right after data load from HDFS.

partitioning only needed right after data load from HDFS.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/011f242e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/011f242e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/011f242e

Branch: refs/heads/variables
Commit: 011f242ef2b2dd144a56c05f6245292ecfb426d4
Parents: 1b37172
Author: okram <ok...@apache.org>
Authored: Fri Mar 27 07:21:42 2015 -0600
Committer: okram <ok...@apache.org>
Committed: Fri Mar 27 07:21:53 2015 -0600

----------------------------------------------------------------------
 .../gremlin/hadoop/process/computer/spark/SparkExecutor.java  | 7 +++----
 .../hadoop/process/computer/spark/SparkGraphComputer.java     | 6 +++---
 2 files changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/011f242e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index e42efc3..1b2e640 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -50,10 +50,9 @@ public final class SparkExecutor {
     }
 
     // TODO: use SparkVertexPayload typing to be super clean
-    public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
-        JavaPairRDD<Object, SparkPayload<M>> current = graphRDD.reduceByKey((payloadA, payloadB) -> payloadA); // TODO: total hack cause of something weird with recursive RDDs
-        // execute vertex program
-        current = current.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+    public static <M> JavaPairRDD<Object, SparkPayload<M>> executeVertexProgramIteration(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+        // execute vertex program iteration
+        JavaPairRDD<Object, SparkPayload<M>> current = graphRDD.mapPartitionsToPair(partitionIterator -> {     // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
             final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
             final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
             workerVertexProgram.workerIterationStart(memory);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/011f242e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index fc83bd6..2d9c49d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -158,13 +158,13 @@ public final class SparkGraphComputer implements GraphComputer {
                     try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
                         // add the project jars to the cluster
                         SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
-
                         // create a message-passing friendly rdd from the hadoop input format
                         JavaPairRDD<Object, SparkPayload<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class)
-                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(tuple._2().get())));
+                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), (SparkPayload<Object>) new SparkVertexPayload<>(tuple._2().get())))
+                                .reduceByKey((a, b) -> a); // partition the graph across the cluster  // todo: cache?
 
                         ////////////////////////////////
                         // process the vertex program //
@@ -182,7 +182,7 @@ public final class SparkGraphComputer implements GraphComputer {
                             // execute the vertex program
                             while (true) {
                                 memory.setInTask(true);
-                                graphRDD = SparkExecutor.executeStep(graphRDD, memory, vertexProgramConfiguration);
+                                graphRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, memory, vertexProgramConfiguration);
                                 memory.setInTask(false);
                                 if (this.vertexProgram.terminate(memory))
                                     break;