You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/25 21:14:15 UTC

incubator-tinkerpop git commit: Removed SparkVertex. No longer needed as we can now use the VertexWritable model used in GiraphGraphComptuer. This fixed a KryoSerialzer bug in SparkGraphComputer due to recrussive serialization of a vertex.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 5e0358122 -> 7809fd945


Removed SparkVertex. No longer needed as we can now use the VertexWritable model used in GiraphGraphComptuer. This fixed a KryoSerialzer bug in SparkGraphComputer due to recrussive serialization of a vertex.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/7809fd94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/7809fd94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/7809fd94

Branch: refs/heads/master
Commit: 7809fd9457a38bc1c60c76c7ad1ccfa8699ec68e
Parents: 5e03581
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 14:14:11 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 14:14:11 2015 -0600

----------------------------------------------------------------------
 .../computer/spark/SparkGraphComputer.java      |  45 ++++----
 .../process/computer/spark/SparkVertex.java     | 107 -------------------
 .../computer/spark/SparkVertexPayload.java      |  19 ++--
 .../gremlin/hadoop/HadoopGraphProvider.java     |   2 +-
 4 files changed, 36 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index debacd8..06820c2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -43,8 +43,9 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
@@ -164,7 +165,7 @@ public final class SparkGraphComputer implements GraphComputer {
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class)
-                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
+                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(tuple._2().get())));
 
                         ////////////////////////////////
                         // process the vertex program //
@@ -193,28 +194,36 @@ public final class SparkGraphComputer implements GraphComputer {
                                 }
                             }
                             // write the output graph back to disk
-                            SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
+                            if (!this.persist.get().equals(Persist.NOTHING))
+                                SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
                         }
 
-                        // reuse the graphRDD for all map reduce jobs
-                        graphRDD = graphRDD.cache();
+                        final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
+
                         //////////////////////////////
                         // process the map reducers //
                         //////////////////////////////
-                        final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
-                        for (final MapReduce mapReduce : this.mapReducers) {
-                            // execute the map reduce job
-                            final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
-                            mapReduce.storeState(newApacheConfiguration);
-                            // map
-                            final JavaPairRDD mapRDD = SparkHelper.executeMap(graphRDD, mapReduce, newApacheConfiguration);
-                            // combine TODO? is this really needed
-                            // reduce
-                            final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
-                            // write the map reduce output back to disk (memory)
-                            SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+                        if (!this.mapReducers.isEmpty()) {
+                            // drop all edges in the graphRDD as edges are not needed in the map reduce jobs
+                            graphRDD = graphRDD.mapToPair(tuple -> {
+                                tuple._2().asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+                                return tuple;
+                            });
+                            graphRDD = graphRDD.cache();
+                            for (final MapReduce mapReduce : this.mapReducers) {
+                                // execute the map reduce job
+                                final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+                                mapReduce.storeState(newApacheConfiguration);
+                                // map
+                                final JavaPairRDD mapRDD = SparkHelper.executeMap(graphRDD, mapReduce, newApacheConfiguration);
+                                // combine TODO? is this really needed
+                                // reduce
+                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+                                // write the map reduce output back to disk (memory)
+                                SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+                            }
                         }
-                        // close the context or else bad things happen // todo: does this happen automatically cause of the try(resource) {} block?
+                        // close the context or else bad things happen // TODO: does this happen automatically cause of the try(resource) {} block?
                         sparkContext.close();
                         // update runtime and return the newly computed graph
                         finalMemory.setRuntime(System.currentTimeMillis() - startTime);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
deleted file mode 100644
index 8bd1a0a..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkVertex implements Vertex, Serializable {
-
-    // TODO: Wrapped vertex -- need VertexProgram in partition (broadcast variable?)
-
-    private final VertexWritable vertexWritable;
-
-    public SparkVertex(final TinkerVertex vertex) {
-        this.vertexWritable = new VertexWritable(vertex);
-    }
-
-    @Override
-    public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
-        return this.vertexWritable.get().addEdge(label, inVertex, keyValues);
-    }
-
-    @Override
-    public Object id() {
-        return this.vertexWritable.get().id();
-    }
-
-    @Override
-    public String label() {
-        return this.vertexWritable.get().label();
-    }
-
-    @Override
-    public Graph graph() {
-        return this.vertexWritable.get().graph();
-    }
-
-    @Override
-    public <V> VertexProperty<V> property(final VertexProperty.Cardinality cardinality, final String key, final V value, final Object... keyValues) {
-        return this.vertexWritable.get().property(cardinality, key, value, keyValues);
-    }
-
-    @Override
-    public void remove() {
-        this.vertexWritable.get().remove();
-    }
-
-
-    @Override
-    public Iterator<Edge> edges(final Direction direction, final String... edgeLabels) {
-        return this.vertexWritable.get().edges(direction, edgeLabels);
-    }
-
-    @Override
-    public Iterator<Vertex> vertices(final Direction direction, final String... edgeLabels) {
-        return this.vertexWritable.get().vertices(direction, edgeLabels);
-    }
-
-    @Override
-    public <V> Iterator<VertexProperty<V>> properties(final String... propertyKeys) {
-        return this.vertexWritable.get().properties(propertyKeys);
-    }
-
-    @Override
-    public String toString() {
-        return StringFactory.vertexString(this.vertexWritable.get());
-    }
-
-    @Override
-    public int hashCode() {
-        return this.vertexWritable.get().hashCode();
-    }
-
-    @Override
-    public boolean equals(final Object other) {
-        return ElementHelper.areEqual(this.vertexWritable.get(), other);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
index 42997f1..30563d6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
@@ -18,9 +18,10 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
 
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -38,16 +39,12 @@ import java.util.List;
  */
 public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M>, Serializable {
 
-    private Vertex vertex;
-    private List<M> incoming;
-    private List<Tuple2<Object, M>> outgoing;
-
-    private SparkVertexPayload() {
-
-    }
+    private final VertexWritable vertexWritable;
+    private final List<M> incoming;
+    private final List<Tuple2<Object, M>> outgoing;
 
     public SparkVertexPayload(final Vertex vertex) {
-        this.vertex = vertex;
+        this.vertexWritable = new VertexWritable(vertex);
         this.incoming = new ArrayList<>();
         this.outgoing = new ArrayList<>();
     }
@@ -68,7 +65,7 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
     }
 
     public Vertex getVertex() {
-        return this.vertex;
+        return this.vertexWritable.get();
     }
 
     public List<Tuple2<Object, M>> getOutgoingMessages() {
@@ -86,7 +83,7 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
     public void sendMessage(final MessageScope messageScope, final M message) {
         if (messageScope instanceof MessageScope.Local) {
             final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
-            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkVertexPayload.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
+            final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkVertexPayload.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertexWritable.get());
             final Direction direction = SparkVertexPayload.getOppositeDirection(incidentTraversal);
             incidentTraversal.forEachRemaining(edge -> this.outgoing.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 50738b2..b84d09e 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -122,7 +122,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
 
             /// spark configuration
             put("spark.master", "local[4]");
-            put("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
+            put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
         }};
     }