You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/25 21:14:15 UTC
incubator-tinkerpop git commit: Removed SparkVertex. No longer needed
as we can now use the VertexWritable model used in GiraphGraphComptuer. This
fixed a KryoSerialzer bug in SparkGraphComputer due to recrussive
serialization of a vertex.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 5e0358122 -> 7809fd945
Removed SparkVertex. No longer needed as we can now use the VertexWritable model used in GiraphGraphComptuer. This fixed a KryoSerialzer bug in SparkGraphComputer due to recrussive serialization of a vertex.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/7809fd94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/7809fd94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/7809fd94
Branch: refs/heads/master
Commit: 7809fd9457a38bc1c60c76c7ad1ccfa8699ec68e
Parents: 5e03581
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 25 14:14:11 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 25 14:14:11 2015 -0600
----------------------------------------------------------------------
.../computer/spark/SparkGraphComputer.java | 45 ++++----
.../process/computer/spark/SparkVertex.java | 107 -------------------
.../computer/spark/SparkVertexPayload.java | 19 ++--
.../gremlin/hadoop/HadoopGraphProvider.java | 2 +-
4 files changed, 36 insertions(+), 137 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index debacd8..06820c2 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -43,8 +43,9 @@ import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
@@ -164,7 +165,7 @@ public final class SparkGraphComputer implements GraphComputer {
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkVertexPayload<>(tuple._2().get())));
////////////////////////////////
// process the vertex program //
@@ -193,28 +194,36 @@ public final class SparkGraphComputer implements GraphComputer {
}
}
// write the output graph back to disk
- SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
+ if (!this.persist.get().equals(Persist.NOTHING))
+ SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
}
- // reuse the graphRDD for all map reduce jobs
- graphRDD = graphRDD.cache();
+ final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
+
//////////////////////////////
// process the map reducers //
//////////////////////////////
- final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
- for (final MapReduce mapReduce : this.mapReducers) {
- // execute the map reduce job
- final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
- mapReduce.storeState(newApacheConfiguration);
- // map
- final JavaPairRDD mapRDD = SparkHelper.executeMap(graphRDD, mapReduce, newApacheConfiguration);
- // combine TODO? is this really needed
- // reduce
- final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
- // write the map reduce output back to disk (memory)
- SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+ if (!this.mapReducers.isEmpty()) {
+ // drop all edges in the graphRDD as edges are not needed in the map reduce jobs
+ graphRDD = graphRDD.mapToPair(tuple -> {
+ tuple._2().asVertexPayload().getVertex().edges(Direction.BOTH).forEachRemaining(Edge::remove);
+ return tuple;
+ });
+ graphRDD = graphRDD.cache();
+ for (final MapReduce mapReduce : this.mapReducers) {
+ // execute the map reduce job
+ final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+ mapReduce.storeState(newApacheConfiguration);
+ // map
+ final JavaPairRDD mapRDD = SparkHelper.executeMap(graphRDD, mapReduce, newApacheConfiguration);
+ // combine TODO? is this really needed
+ // reduce
+ final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+ // write the map reduce output back to disk (memory)
+ SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+ }
}
- // close the context or else bad things happen // todo: does this happen automatically cause of the try(resource) {} block?
+ // close the context or else bad things happen // TODO: does this happen automatically cause of the try(resource) {} block?
sparkContext.close();
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
deleted file mode 100644
index 8bd1a0a..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertex.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkVertex implements Vertex, Serializable {
-
- // TODO: Wrapped vertex -- need VertexProgram in partition (broadcast variable?)
-
- private final VertexWritable vertexWritable;
-
- public SparkVertex(final TinkerVertex vertex) {
- this.vertexWritable = new VertexWritable(vertex);
- }
-
- @Override
- public Edge addEdge(final String label, final Vertex inVertex, final Object... keyValues) {
- return this.vertexWritable.get().addEdge(label, inVertex, keyValues);
- }
-
- @Override
- public Object id() {
- return this.vertexWritable.get().id();
- }
-
- @Override
- public String label() {
- return this.vertexWritable.get().label();
- }
-
- @Override
- public Graph graph() {
- return this.vertexWritable.get().graph();
- }
-
- @Override
- public <V> VertexProperty<V> property(final VertexProperty.Cardinality cardinality, final String key, final V value, final Object... keyValues) {
- return this.vertexWritable.get().property(cardinality, key, value, keyValues);
- }
-
- @Override
- public void remove() {
- this.vertexWritable.get().remove();
- }
-
-
- @Override
- public Iterator<Edge> edges(final Direction direction, final String... edgeLabels) {
- return this.vertexWritable.get().edges(direction, edgeLabels);
- }
-
- @Override
- public Iterator<Vertex> vertices(final Direction direction, final String... edgeLabels) {
- return this.vertexWritable.get().vertices(direction, edgeLabels);
- }
-
- @Override
- public <V> Iterator<VertexProperty<V>> properties(final String... propertyKeys) {
- return this.vertexWritable.get().properties(propertyKeys);
- }
-
- @Override
- public String toString() {
- return StringFactory.vertexString(this.vertexWritable.get());
- }
-
- @Override
- public int hashCode() {
- return this.vertexWritable.get().hashCode();
- }
-
- @Override
- public boolean equals(final Object other) {
- return ElementHelper.areEqual(this.vertexWritable.get(), other);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
index 42997f1..30563d6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
@@ -18,9 +18,10 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -38,16 +39,12 @@ import java.util.List;
*/
public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M>, Serializable {
- private Vertex vertex;
- private List<M> incoming;
- private List<Tuple2<Object, M>> outgoing;
-
- private SparkVertexPayload() {
-
- }
+ private final VertexWritable vertexWritable;
+ private final List<M> incoming;
+ private final List<Tuple2<Object, M>> outgoing;
public SparkVertexPayload(final Vertex vertex) {
- this.vertex = vertex;
+ this.vertexWritable = new VertexWritable(vertex);
this.incoming = new ArrayList<>();
this.outgoing = new ArrayList<>();
}
@@ -68,7 +65,7 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
}
public Vertex getVertex() {
- return this.vertex;
+ return this.vertexWritable.get();
}
public List<Tuple2<Object, M>> getOutgoingMessages() {
@@ -86,7 +83,7 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
public void sendMessage(final MessageScope messageScope, final M message) {
if (messageScope instanceof MessageScope.Local) {
final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
- final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkVertexPayload.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
+ final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkVertexPayload.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertexWritable.get());
final Direction direction = SparkVertexPayload.getOppositeDirection(incidentTraversal);
incidentTraversal.forEachRemaining(edge -> this.outgoing.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/7809fd94/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 50738b2..b84d09e 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -122,7 +122,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
/// spark configuration
put("spark.master", "local[4]");
- put("spark.serializer", "org.apache.spark.serializer.JavaSerializer");
+ put("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
}};
}