You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/26 21:55:25 UTC
incubator-tinkerpop git commit: so deep in a Spark pit its unreal.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master aded974a7 -> ff160d8a5
so deep in a Spark pit its unreal.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ff160d8a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ff160d8a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ff160d8a
Branch: refs/heads/master
Commit: ff160d8a5d550140b4bbd42a156dbaf1514b2a91
Parents: aded974
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 26 14:47:10 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 26 14:47:23 2015 -0600
----------------------------------------------------------------------
.../gremlin/util/iterator/IteratorUtils.java | 33 ++++
.../process/computer/spark/SparkExecutor.java | 196 +++++++++++++++++++
.../computer/spark/SparkGraphComputer.java | 15 +-
.../process/computer/spark/SparkPayload.java | 14 +-
.../computer/spark/SparkVertexPayload.java | 19 +-
.../computer/spark/util/SparkHelper.java | 196 -------------------
6 files changed, 262 insertions(+), 211 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
index ef9bd72..0ef038f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -22,6 +22,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.FastNoSuchElementException
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -234,6 +235,38 @@ public final class IteratorUtils {
///////////////////
+ public static final <S, E> Iterator<E> flatMap(final Iterator<S> iterator, final Function<S, Iterator<E>> function) {
+ return new Iterator<E>() {
+
+ private Iterator<E> currentIterator = Collections.emptyIterator();
+
+ @Override
+ public boolean hasNext() {
+ if (this.currentIterator.hasNext())
+ return true;
+ else {
+ while (iterator.hasNext()) {
+ this.currentIterator = function.apply(iterator.next());
+ if (this.currentIterator.hasNext())
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public E next() {
+ if (this.hasNext())
+ return this.currentIterator.next();
+ else
+ throw FastNoSuchElementException.instance();
+ }
+ };
+ }
+
+
+ ///////////////////
+
public static final <S> Iterator<S> concat(final Iterator<S>... iterators) {
final MultiIterator<S> iterator = new MultiIterator<>();
for (final Iterator<S> itty : iterators) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
new file mode 100644
index 0000000..d9a2537
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessagePayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkPayload;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkVertexPayload;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkExecutor {
+
+ private SparkExecutor() {
+ }
+
+ public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
+ JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
+ // execute vertex program
+ current = current.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
+ final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
+ final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
+ workerVertexProgram.workerIterationStart(memory);
+ final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
+ partitionIterator.forEachRemaining(keyValue -> {
+ keyValue._2().asVertexPayload().getOutgoingMessages().clear();
+ workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
+ emission.add(keyValue);
+ });
+ workerVertexProgram.workerIterationEnd(memory);
+ return emission;
+ });
+
+ // emit messages by appending them to the graph as message payloads
+ current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
+ keyValue._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
+ final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
+ list.add(keyValue); // this is a vertex
+ keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))); // this is a message
+ return list;
+ });
+
+ // "message pass" by merging the message payloads with the vertex payloads
+ final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
+ current = current.reduceByKey((payloadA, payloadB) -> {
+ if (payloadA.isVertex()) {
+ final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
+ vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+ vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+ return vertexPayload;
+ } else if (payloadB.isVertex()) {
+ final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
+ vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
+ vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
+ return vertexPayload;
+ } else {
+ final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
+ messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
+ messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
+ return messagePayload;
+ }
+ });
+
+ // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
+ current = current.mapValues(vertexPayload -> {
+ vertexPayload.asVertexPayload().getOutgoingMessages().clear();
+ return vertexPayload;
+ });
+
+ return current;
+ }
+
+ public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
+ final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
+ workerMapReduce.workerStart(MapReduce.Stage.MAP);
+ final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+ partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().getVertex(), mapEmitter));
+ workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+ return mapEmitter.getEmissions();
+ });
+ if (mapReduce.getMapKeySort().isPresent())
+ mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+ return mapRDD;
+ }
+
+ // TODO: public static executeCombine() is this necessary?
+
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
+ final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
+ workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
+ final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+ partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
+ workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
+ return reduceEmitter.getEmissions();
+ });
+ if (mapReduce.getReduceKeySort().isPresent())
+ reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+ return reduceRDD;
+ }
+
+ public static void deleteOutputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+ if (null != outputLocation) {
+ try {
+ FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ }
+
+ public static String getInputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ try {
+ return FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ if (null != outputLocation) {
+ // map back to a <nullwritable,vertexwritable> stream for output
+ graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable(tuple._2().asVertexPayload().getVertex())))
+ .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+ NullWritable.class,
+ VertexWritable.class,
+ (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+ }
+ }
+
+ public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
+ if (null != outputLocation) {
+ // map back to a Hadoop stream for output
+ mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+ ObjectWritable.class,
+ ObjectWritable.class,
+ (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
+ // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
+ // to get results you have to look through HDFS directory structure. Oh the horror.
+ try {
+ if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
+ mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+ else
+ HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 514a811..f43727f 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -29,7 +29,6 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util.SparkHelper;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
@@ -147,14 +146,14 @@ public final class SparkGraphComputer implements GraphComputer {
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
final long startTime = System.currentTimeMillis();
SparkMemory memory = null;
- SparkHelper.deleteOutputLocation(hadoopConfiguration);
+ SparkExecutor.deleteOutputLocation(hadoopConfiguration);
// wire up a spark context
final SparkConf sparkConfiguration = new SparkConf();
sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
- hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, SparkHelper.getInputLocation(hadoopConfiguration)); // necessary for Spark and newAPIHadoopRDD
+ hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, SparkExecutor.getInputLocation(hadoopConfiguration)); // necessary for Spark and newAPIHadoopRDD
// execute the vertex program and map reducers and if there is a failure, auto-close the spark context
try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
// add the project jars to the cluster
@@ -183,7 +182,7 @@ public final class SparkGraphComputer implements GraphComputer {
// execute the vertex program
while (true) {
memory.setInTask(true);
- graphRDD = SparkHelper.executeStep(graphRDD, memory, vertexProgramConfiguration);
+ graphRDD = SparkExecutor.executeStep(graphRDD, memory, vertexProgramConfiguration);
graphRDD.foreachPartition(iterator -> doNothing()); // TODO: i think this is a fast way to execute the rdd (wish there was a "execute()" method).
memory.setInTask(false);
if (this.vertexProgram.terminate(memory))
@@ -195,7 +194,7 @@ public final class SparkGraphComputer implements GraphComputer {
}
// write the output graph back to disk
if (!this.persist.get().equals(Persist.NOTHING))
- SparkHelper.saveGraphRDD(graphRDD, hadoopConfiguration);
+ SparkExecutor.saveGraphRDD(graphRDD, hadoopConfiguration);
}
final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
@@ -215,12 +214,12 @@ public final class SparkGraphComputer implements GraphComputer {
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkHelper.executeMap((JavaPairRDD) graphRDD, mapReduce, newApacheConfiguration);
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) graphRDD, mapReduce, newApacheConfiguration);
// combine TODO? is this really needed
// reduce
- final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkHelper.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
+ final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
// write the map reduce output back to disk (memory)
- SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+ SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
}
}
// close the context or else bad things happen // TODO: does this happen automatically cause of the try(resource) {} block?
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
index 11f98bb..ebb4b18 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkPayload.java
@@ -21,8 +21,6 @@ package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import java.util.List;
-import java.util.Optional;
-import java.util.stream.Stream;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -31,11 +29,15 @@ public interface SparkPayload<M> {
public default void addMessages(final List<M> otherMessages, final MessageCombiner<M> messageCombiner) {
if (null != messageCombiner) {
- final Optional<M> combinedMessage = Stream.concat(this.getMessages().stream(), otherMessages.stream()).reduce(messageCombiner::combine);
- if (combinedMessage.isPresent()) {
- this.getMessages().clear();
- this.getMessages().add(combinedMessage.get());
+ M message = null;
+ for (final M m : this.getMessages()) {
+ message = null == message ? m : messageCombiner.combine(message, m);
}
+ for (final M m : otherMessages) {
+ message = null == message ? m : messageCombiner.combine(message, m);
+ }
+ this.getMessages().clear();
+ if (null != message) this.getMessages().add(message);
} else {
this.getMessages().addAll(otherMessages);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
index 30563d6..2ac67e6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkVertexPayload.java
@@ -32,6 +32,7 @@ import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
/**
@@ -41,7 +42,7 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
private final VertexWritable vertexWritable;
private final List<M> incoming;
- private final List<Tuple2<Object, M>> outgoing;
+ private List<Tuple2<Object, M>> outgoing;
public SparkVertexPayload(final Vertex vertex) {
this.vertexWritable = new VertexWritable(vertex);
@@ -68,10 +69,26 @@ public final class SparkVertexPayload<M> implements SparkPayload<M>, Messenger<M
return this.vertexWritable.get();
}
+ public VertexWritable getVertexWritable() {
+ return this.vertexWritable;
+ }
+
public List<Tuple2<Object, M>> getOutgoingMessages() {
return this.outgoing;
}
+ public Iterator<Tuple2<Object, M>> detachOutgoingMessages() {
+ final Iterator<Tuple2<Object, M>> messages = this.outgoing.iterator();
+ this.outgoing = new ArrayList<>();
+ return messages;
+ }
+
+ /*public Iterator<M> detachIncomingMessages() {
+ final Iterator<M> messages = this.incoming.iterator();
+ this.incoming = new ArrayList<>();
+ return messages;
+ }*/
+
///////////
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ff160d8a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
deleted file mode 100644
index 7696f3a..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.util;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMapEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMemory;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkMessagePayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkReduceEmitter;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkVertexPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkHelper {
-
- private SparkHelper() {
- }
-
- public static <M> JavaPairRDD<Object, SparkPayload<M>> executeStep(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final SparkMemory memory, final Configuration apacheConfiguration) {
- JavaPairRDD<Object, SparkPayload<M>> current = graphRDD;
- // execute vertex program
- current = current.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
- final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
- final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
- workerVertexProgram.workerIterationStart(memory);
- final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
- partitionIterator.forEachRemaining(keyValue -> {
- keyValue._2().asVertexPayload().getOutgoingMessages().clear();
- workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
- emission.add(keyValue);
- });
- workerVertexProgram.workerIterationEnd(memory);
- return emission;
- });
-
- // emit messages by appending them to the graph as message payloads
- current = current.<Object, SparkPayload<M>>flatMapToPair(keyValue -> {
- keyValue._2().asVertexPayload().getMessages().clear(); // there should be no incoming messages at this point
- final List<Tuple2<Object, SparkPayload<M>>> list = new ArrayList<>();
- list.add(keyValue); // this is a vertex
- keyValue._2().asVertexPayload().getOutgoingMessages().forEach(message -> list.add(new Tuple2<>(message._1(), new SparkMessagePayload<>(message._2())))); // this is a message
- return list;
- });
-
- // "message pass" by merging the message payloads with the vertex payloads
- final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
- current = current.reduceByKey((payloadA, payloadB) -> {
- if (payloadA.isVertex()) {
- final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadA.asVertexPayload().getVertex());
- vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
- vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
- return vertexPayload;
- } else if (payloadB.isVertex()) {
- final SparkVertexPayload<M> vertexPayload = new SparkVertexPayload<>(payloadB.asVertexPayload().getVertex());
- vertexPayload.addMessages(payloadA.getMessages(), messageCombiner);
- vertexPayload.addMessages(payloadB.getMessages(), messageCombiner);
- return vertexPayload;
- } else {
- final SparkMessagePayload<M> messagePayload = new SparkMessagePayload<>();
- messagePayload.addMessages(payloadA.getMessages(), messageCombiner);
- messagePayload.addMessages(payloadB.getMessages(), messageCombiner);
- return messagePayload;
- }
- });
-
- // clear all previous outgoing messages (why can't we do this prior to the shuffle? -- this is probably cause of concurrent modification issues prior to reduceByKey)
- current = current.mapValues(vertexPayload -> {
- vertexPayload.asVertexPayload().getOutgoingMessages().clear();
- return vertexPayload;
- });
-
- return current;
- }
-
- public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkVertexPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
- final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
- workerMapReduce.workerStart(MapReduce.Stage.MAP);
- final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
- partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().getVertex(), mapEmitter));
- workerMapReduce.workerEnd(MapReduce.Stage.MAP);
- return mapEmitter.getEmissions();
- });
- if (mapReduce.getMapKeySort().isPresent())
- mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
- return mapRDD;
- }
-
- // TODO: public static executeCombine() is this necessary?
-
- public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
- final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
- workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
- final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
- partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
- workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
- return reduceEmitter.getEmissions();
- });
- if (mapReduce.getReduceKeySort().isPresent())
- reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
- return reduceRDD;
- }
-
- public static void deleteOutputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
- if (null != outputLocation) {
- try {
- FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- }
-
- public static String getInputLocation(final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
- try {
- return FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- public static <M> void saveGraphRDD(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
- if (null != outputLocation) {
- // map back to a <nullwritable,vertexwritable> stream for output
- graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), new VertexWritable(tuple._2().asVertexPayload().getVertex())))
- .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
- NullWritable.class,
- VertexWritable.class,
- (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
- }
- }
-
- public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
- if (null != outputLocation) {
- // map back to a Hadoop stream for output
- mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
- ObjectWritable.class,
- ObjectWritable.class,
- (Class<OutputFormat<ObjectWritable, ObjectWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
- // if its not a SequenceFile there is no certain way to convert to necessary Java objects.
- // to get results you have to look through HDFS directory structure. Oh the horror.
- try {
- if (hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_MEMORY_OUTPUT_FORMAT, SequenceFileOutputFormat.class, OutputFormat.class).equals(SequenceFileOutputFormat.class))
- mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
- else
- HadoopGraph.LOGGER.warn(Constants.SEQUENCE_WARNING);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- }
-}