You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/06/13 19:37:51 UTC
[33/42] tinkerpop git commit: found a bug that was introduced during
the KryoShim work earlier this week. I made things super explicit in
SparkGraphComputer as to what is the GraphComputer configuration and what is
the VertexProgram configuration so that
found a bug that was introduced during the KryoShim work earlier this week. I made things super explicit in SparkGraphComputer as to what is the GraphComputer configuration and what is the VertexProgram configuration so that these issues don't pop up again. Simple fix -- CTR.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b36b42f6
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b36b42f6
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b36b42f6
Branch: refs/heads/TINKERPOP-1278
Commit: b36b42f6cd8cf74bc6bf267e732d79d55a0cd719
Parents: ff52eb6
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Jun 10 12:10:17 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jun 10 12:10:34 2016 -0600
----------------------------------------------------------------------
.../spark/process/computer/SparkExecutor.java | 31 ++++++-------
.../process/computer/SparkGraphComputer.java | 46 ++++++++++----------
2 files changed, 39 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 4db8086..8dd2381 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -78,7 +78,8 @@ public final class SparkExecutor {
final JavaPairRDD<Object, VertexWritable> graphRDD,
final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
final SparkMemory memory,
- final Configuration apacheConfiguration) {
+ final Configuration graphComputerConfiguration, // has the Graph/GraphComputer.configuration() information
+ final Configuration vertexProgramConfiguration) { // has the VertexProgram.loadState() information
boolean partitionedGraphRDD = graphRDD.partitioner().isPresent();
@@ -89,8 +90,8 @@ public final class SparkExecutor {
graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices emit a view and their outgoing messages
.mapPartitionsToPair(partitionIterator -> {
- KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
- final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
+ KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+ final VertexProgram<M> workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
final SparkMessenger<M> messenger = new SparkMessenger<>();
workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
@@ -132,10 +133,10 @@ public final class SparkExecutor {
/////////////////////////////////////////////////////////////
/////////////////////////////////////////////////////////////
final PairFlatMapFunction<Tuple2<Object, ViewOutgoingPayload<M>>, Object, Payload> messageFunction =
- tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+ tuple -> () -> IteratorUtils.concat(
IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))));
- final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
+ final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(vertexProgramConfiguration), vertexProgramConfiguration).getMessageCombiner().orElse(null);
final Function2<Payload, Payload, Payload> reducerFunction = (a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
if (a instanceof ViewIncomingPayload) {
((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
@@ -170,7 +171,7 @@ public final class SparkExecutor {
assert graphRDD.partitioner().get().equals(newViewIncomingRDD.partitioner().get());
newViewIncomingRDD
.foreachPartition(partitionIterator -> {
- KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
+ KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
}); // need to complete a task so its BSP and the memory for this iteration is updated
return newViewIncomingRDD;
}
@@ -203,10 +204,10 @@ public final class SparkExecutor {
public static <K, V> JavaPairRDD<K, V> executeMap(
final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce,
- final Configuration apacheConfiguration) {
+ final Configuration graphComputerConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
- KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
- return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+ KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+ return () -> new MapIterator<>(MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
});
if (mapReduce.getMapKeySort().isPresent())
mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get(), true, 1);
@@ -214,19 +215,19 @@ public final class SparkExecutor {
}
public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeCombine(final JavaPairRDD<K, V> mapRDD,
- final Configuration apacheConfiguration) {
+ final Configuration graphComputerConfiguration) {
return mapRDD.mapPartitionsToPair(partitionIterator -> {
- KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
- return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+ KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+ return () -> new CombineIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
});
}
public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(
final JavaPairRDD<K, V> mapOrCombineRDD, final MapReduce<K, V, OK, OV, ?> mapReduce,
- final Configuration apacheConfiguration) {
+ final Configuration graphComputerConfiguration) {
JavaPairRDD<OK, OV> reduceRDD = mapOrCombineRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
- KryoShimServiceLoader.applyConfiguration(apacheConfiguration);
- return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration), partitionIterator);
+ KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+ return () -> new ReduceIterator<>(MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(graphComputerConfiguration), graphComputerConfiguration), partitionIterator);
});
if (mapReduce.getReduceKeySort().isPresent())
reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get(), true, 1);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b36b42f6/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 9e05e53..5178225 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -135,19 +135,19 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
return computerService.submit(() -> {
final long startTime = System.currentTimeMillis();
// apache and hadoop configurations that are used throughout the graph computer computation
- final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.sparkConfiguration);
- if (!apacheConfiguration.containsKey(Constants.SPARK_SERIALIZER))
- apacheConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
- apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
- final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
+ final org.apache.commons.configuration.Configuration graphComputerConfiguration = new HadoopConfiguration(this.sparkConfiguration);
+ if (!graphComputerConfiguration.containsKey(Constants.SPARK_SERIALIZER))
+ graphComputerConfiguration.setProperty(Constants.SPARK_SERIALIZER, GryoSerializer.class.getCanonicalName());
+ graphComputerConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_WRITER_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
+ final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(graphComputerConfiguration);
final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
- final Storage sparkContextStorage = SparkContextStorage.open(apacheConfiguration);
+ final Storage sparkContextStorage = SparkContextStorage.open(graphComputerConfiguration);
final boolean inputFromHDFS = FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
final boolean inputFromSpark = PersistedInputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, Object.class));
final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_WRITER, Object.class));
- final boolean skipPartitioner = apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
- final boolean skipPersist = apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
+ final boolean skipPartitioner = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_PARTITIONER, false);
+ final boolean skipPersist = graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE, false);
String inputLocation = null;
if (inputFromSpark)
inputLocation = Constants.getSearchGraphLocation(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), sparkContextStorage).orElse(null);
@@ -158,7 +158,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
if (null != inputLocation && inputFromHDFS) {
try {
- apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
+ graphComputerConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, FileSystem.get(hadoopConfiguration).getFileStatus(new Path(inputLocation)).getPath().toString());
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -176,7 +176,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
OutputFormatRDD.class.newInstance();
// if the input class can filter on load, then set the filters
if (inputRDD instanceof InputFormatRDD && GraphFilterAware.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class, InputFormat.class))) {
- GraphFilterAware.storeGraphFilter(apacheConfiguration, hadoopConfiguration, this.graphFilter);
+ GraphFilterAware.storeGraphFilter(graphComputerConfiguration, hadoopConfiguration, this.graphFilter);
filtered = false;
} else if (inputRDD instanceof GraphFilterAware) {
((GraphFilterAware) inputRDD).setGraphFilter(this.graphFilter);
@@ -214,7 +214,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
updateLocalConfiguration(sparkContext, sparkConfiguration);
// create a message-passing friendly rdd from the input rdd
boolean partitioned = false;
- JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(apacheConfiguration, sparkContext);
+ JavaPairRDD<Object, VertexWritable> loadedGraphRDD = inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
// if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
if (filtered) {
this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
@@ -255,10 +255,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
/////////////////
// if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
- if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
+ if (graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
try {
final SparkVertexProgramInterceptor<VertexProgram> interceptor =
- (SparkVertexProgramInterceptor) Class.forName(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
+ (SparkVertexProgramInterceptor) Class.forName(graphComputerConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
} catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
throw new IllegalStateException(e.getMessage());
@@ -278,7 +278,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
throw new TraversalInterruptedException();
}
memory.setInExecute(true);
- viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+ viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, graphComputerConfiguration, vertexProgramConfiguration);
memory.setInExecute(false);
if (this.vertexProgram.terminate(memory))
break;
@@ -301,7 +301,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// write the computed graph to the respective output (rdd or output format)
if (null != outputRDD && !this.persist.equals(Persist.NOTHING)) {
assert null != computedGraphRDD; // the logic holds that a computeGraphRDD must be created at this point
- outputRDD.writeGraphRDD(apacheConfiguration, computedGraphRDD);
+ outputRDD.writeGraphRDD(graphComputerConfiguration, computedGraphRDD);
}
}
@@ -330,7 +330,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
- final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+ final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(graphComputerConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceRDD, mapReduce, newApacheConfiguration);
@@ -340,7 +340,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
final JavaPairRDD reduceRDD = mapReduce.doStage(MapReduce.Stage.REDUCE) ? SparkExecutor.executeReduce(combineRDD, mapReduce, newApacheConfiguration) : combineRDD;
// write the map reduce output back to disk and computer result memory
if (null != outputRDD)
- mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(apacheConfiguration, mapReduce.getMemoryKey(), reduceRDD));
+ mapReduce.addResultToMemory(finalMemory, outputRDD.writeMemoryRDD(graphComputerConfiguration, mapReduce.getMemoryKey(), reduceRDD));
}
// if the mapReduceRDD is not simply the computed graph, unpersist the mapReduceRDD
if (computedGraphCreated && !outputToSpark) {
@@ -370,13 +370,13 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
// clear properties that should not be propagated in an OLAP chain
- apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
- apacheConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
- apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
- apacheConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
- return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
+ graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_GRAPH_FILTER);
+ graphComputerConfiguration.clearProperty(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR);
+ graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_GRAPH_CACHE);
+ graphComputerConfiguration.clearProperty(Constants.GREMLIN_SPARK_SKIP_PARTITIONER);
+ return new DefaultComputerResult(InputOutputHelper.getOutputGraph(graphComputerConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
} finally {
- if (!apacheConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
+ if (!graphComputerConfiguration.getBoolean(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false))
Spark.close();
}
});