You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/03 20:50:32 UTC
[3/3] incubator-tinkerpop git commit: Spark is now isolated into
spark-gremlin package. Tests are passing. A few hacks here and there just to
get things building. However,
it was pretty easy to split apart -- which is pomising.
Spark is now isolated into spark-gremlin package. Tests are passing. A few hacks here and there just to get things building. However, it was pretty easy to split apart -- which is pomising.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/04f5651e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/04f5651e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/04f5651e
Branch: refs/heads/hadoop_split
Commit: 04f5651e8d8e9108a4281d959b71ff161a52c208
Parents: 0977a25
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Sep 3 12:50:24 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Sep 3 12:50:24 2015 -0600
----------------------------------------------------------------------
hadoop-gremlin/pom.xml | 62 -----
.../groovy/plugin/HadoopGremlinPlugin.java | 3 -
.../process/computer/spark/RuleAccumulator.java | 55 ----
.../process/computer/spark/SparkExecutor.java | 222 -----------------
.../computer/spark/SparkGraphComputer.java | 216 ----------------
.../process/computer/spark/SparkMapEmitter.java | 45 ----
.../process/computer/spark/SparkMemory.java | 181 --------------
.../process/computer/spark/SparkMessenger.java | 83 -------
.../computer/spark/SparkReduceEmitter.java | 45 ----
.../computer/spark/io/InputFormatRDD.java | 46 ----
.../process/computer/spark/io/InputRDD.java | 41 ---
.../computer/spark/io/OutputFormatRDD.java | 48 ----
.../process/computer/spark/io/OutputRDD.java | 31 ---
.../computer/spark/payload/MessagePayload.java | 35 ---
.../process/computer/spark/payload/Payload.java | 27 --
.../spark/payload/ViewIncomingPayload.java | 95 -------
.../spark/payload/ViewOutgoingPayload.java | 46 ----
.../computer/spark/payload/ViewPayload.java | 39 ---
.../gremlin/hadoop/structure/HadoopGraph.java | 16 +-
.../spark/HadoopSparkGraphProvider.java | 37 ---
.../SparkGraphComputerProcessIntegrateTest.java | 32 ---
...GraphComputerGroovyProcessIntegrateTest.java | 33 ---
.../groovy/SparkHadoopGremlinPluginTest.java | 33 ---
.../computer/spark/io/ExampleInputRDD.java | 47 ----
.../computer/spark/io/ExampleOutputRDD.java | 45 ----
.../computer/spark/io/InputOutputRDDTest.java | 59 -----
.../process/computer/spark/io/InputRDDTest.java | 54 ----
.../computer/spark/io/OutputRDDTest.java | 62 -----
pom.xml | 1 +
spark-gremlin/pom.xml | 249 +++++++++++++++++++
.../spark/groovy/plugin/SparkGremlinPlugin.java | 26 ++
.../spark/process/computer/RuleAccumulator.java | 55 ++++
.../spark/process/computer/SparkExecutor.java | 202 +++++++++++++++
.../process/computer/SparkGraphComputer.java | 217 ++++++++++++++++
.../spark/process/computer/SparkMapEmitter.java | 45 ++++
.../spark/process/computer/SparkMemory.java | 181 ++++++++++++++
.../spark/process/computer/SparkMessenger.java | 83 +++++++
.../process/computer/SparkReduceEmitter.java | 45 ++++
.../process/computer/io/InputFormatRDD.java | 47 ++++
.../spark/process/computer/io/InputRDD.java | 41 +++
.../process/computer/io/OutputFormatRDD.java | 49 ++++
.../spark/process/computer/io/OutputRDD.java | 31 +++
.../computer/payload/MessagePayload.java | 35 +++
.../spark/process/computer/payload/Payload.java | 27 ++
.../computer/payload/ViewIncomingPayload.java | 95 +++++++
.../computer/payload/ViewOutgoingPayload.java | 46 ++++
.../process/computer/payload/ViewPayload.java | 39 +++
.../spark/process/HadoopGraphProvider.java | 166 +++++++++++++
.../computer/HadoopSparkGraphProvider.java | 36 +++
.../SparkGraphComputerProcessIntegrateTest.java | 32 +++
...GraphComputerGroovyProcessIntegrateTest.java | 33 +++
.../groovy/SparkHadoopGremlinPluginTest.java | 32 +++
.../process/computer/io/ExampleInputRDD.java | 47 ++++
.../process/computer/io/ExampleOutputRDD.java | 45 ++++
.../process/computer/io/InputOutputRDDTest.java | 59 +++++
.../spark/process/computer/io/InputRDDTest.java | 54 ++++
.../process/computer/io/OutputRDDTest.java | 62 +++++
57 files changed, 2091 insertions(+), 1727 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/pom.xml b/hadoop-gremlin/pom.xml
index cb6a050..2466da0 100644
--- a/hadoop-gremlin/pom.xml
+++ b/hadoop-gremlin/pom.xml
@@ -108,68 +108,6 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>
- <!-- SPARK GRAPH COMPUTER -->
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_2.10</artifactId>
- <version>1.2.1</version>
- <exclusions>
- <!-- self conflicts -->
- <exclusion>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- </exclusion>
- <exclusion>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <!-- gremlin-core conflicts -->
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
- </exclusion>
- <!-- gremlin-groovy conflicts -->
- <exclusion>
- <groupId>jline</groupId>
- <artifactId>jline</artifactId>
- </exclusion>
- <!-- hadoop conflicts -->
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <!-- giraph conflicts -->
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- </exclusion>
- <!-- lgpl conflicts -->
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>findbugs</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<!-- consistent dependencies -->
<dependency>
<groupId>org.scala-lang</groupId>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index a34153e..18c4b32 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -30,7 +30,6 @@ import org.apache.tinkerpop.gremlin.groovy.plugin.RemoteAcceptor;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
@@ -71,7 +70,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
add(IMPORT_SPACE + HDFSTools.class.getPackage().getName() + DOT_STAR);
////
add(IMPORT_SPACE + GiraphGraphComputer.class.getPackage().getName() + DOT_STAR);
- add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + MapReduceGraphComputer.class.getPackage().getName() + DOT_STAR);
}};
@@ -92,7 +90,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", Job.class.getName()));
///
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", GiraphGraphComputer.class.getName()));
- pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", SparkGraphComputer.class.getName()));
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", MapReduceGraphComputer.class.getName()));
///
pluginAcceptor.eval(String.format("Logger.getLogger(%s).setLevel(Level.INFO)", HadoopGraph.class.getName()));
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
deleted file mode 100644
index 422b676..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/RuleAccumulator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.AccumulatorParam;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class RuleAccumulator implements AccumulatorParam<Rule> {
-
- @Override
- public Rule addAccumulator(final Rule a, final Rule b) {
- if (a.getOperation().equals(Rule.Operation.NO_OP))
- return b;
- if (b.getOperation().equals(Rule.Operation.NO_OP))
- return a;
- else
- return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
- }
-
- @Override
- public Rule addInPlace(final Rule a, final Rule b) {
- if (a.getOperation().equals(Rule.Operation.NO_OP))
- return b;
- if (b.getOperation().equals(Rule.Operation.NO_OP))
- return a;
- else
- return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
- }
-
- @Override
- public Rule zero(final Rule rule) {
- return new Rule(Rule.Operation.NO_OP, null);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
deleted file mode 100644
index b7268b8..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import com.google.common.base.Optional;
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.MessagePayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.Payload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewOutgoingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import scala.Tuple2;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkExecutor {
-
- private static final String[] EMPTY_ARRAY = new String[0];
-
- private SparkExecutor() {
- }
-
- ////////////////////
- // VERTEX PROGRAM //
- ////////////////////
-
- public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
- final JavaPairRDD<Object, VertexWritable> graphRDD,
- final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
- final SparkMemory memory,
- final Configuration apacheConfiguration) {
-
- final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
- graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
- graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
- // for each partition of vertices
- .mapPartitionsToPair(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
- final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
- final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
- final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
- final SparkMessenger<M> messenger = new SparkMessenger<>();
- workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
- return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
- final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
- // drop any compute properties that are cached in memory
- if (elementComputeKeysArray.length > 0)
- vertex.dropVertexProperties(elementComputeKeysArray);
- final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
- final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
- final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
- previousView.forEach(property -> property.attach(Attachable.Method.create(vertex))); // attach the view to the vertex
- ///
- messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
- workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
- ///
- final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ? // not all vertex programs have compute keys
- Collections.emptyList() :
- IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
- final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
- if (!partitionIterator.hasNext())
- workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
- return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
- });
- })).setName("viewOutgoingRDD");
-
- // "message pass" by reducing on the vertex object id of the view and message payloads
- final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
- final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
- .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
- IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
- IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))))) // emit the outgoing message payloads one by one
- .reduceByKey((a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
- if (a instanceof ViewIncomingPayload) {
- ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
- return a;
- } else if (b instanceof ViewIncomingPayload) {
- ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
- return b;
- } else {
- final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
- c.mergePayload(a, messageCombiner);
- c.mergePayload(b, messageCombiner);
- return c;
- }
- })
- .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
- .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
- .mapValues(payload -> payload instanceof ViewIncomingPayload ?
- (ViewIncomingPayload<M>) payload : // this happens if there is a vertex with incoming messages
- new ViewIncomingPayload<>((ViewPayload) payload)); // this happens if there is a vertex with no incoming messages
-
- newViewIncomingRDD.setName("viewIncomingRDD")
- .foreachPartition(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
- }); // need to complete a task so its BSP and the memory for this iteration is updated
- return newViewIncomingRDD;
- }
-
- /////////////////
- // MAP REDUCE //
- ////////////////
-
- public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
- return (null == viewIncomingRDD) ? // there was no vertex program
- graphRDD.mapValues(vertexWritable -> {
- vertexWritable.get().dropEdges();
- return vertexWritable;
- }) :
- graphRDD.leftOuterJoin(viewIncomingRDD)
- .mapValues(tuple -> {
- final StarGraph.StarVertex vertex = tuple._1().get();
- vertex.dropEdges();
- vertex.dropVertexProperties(elementComputeKeys);
- final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
- view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
- return tuple._1();
- });
- }
-
- public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
- final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
- workerMapReduce.workerStart(MapReduce.Stage.MAP);
- final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
- return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
- workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
- if (!partitionIterator.hasNext())
- workerMapReduce.workerEnd(MapReduce.Stage.MAP);
- return mapEmitter.getEmissions();
- });
- });
- if (mapReduce.getMapKeySort().isPresent())
- mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
- return mapRDD;
- }
-
- // TODO: public static executeCombine() is this necessary? YES --- we groupByKey in reduce() where we want to combine first.
-
- public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
- JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
- HadoopPools.initialize(apacheConfiguration);
- final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
- workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
- final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
- return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
- workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
- if (!partitionIterator.hasNext())
- workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
- return reduceEmitter.getEmissions();
- });
- });
- if (mapReduce.getReduceKeySort().isPresent())
- reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
- return reduceRDD;
- }
-
- ///////////////////
- // Input/Output //
- //////////////////
-
- public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION,null);
- if (null != outputLocation) {
- // map back to a Hadoop stream for output
- mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
- ObjectWritable.class,
- ObjectWritable.class,
- SequenceFileOutputFormat.class, hadoopConfiguration);
- // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
- try {
- mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
deleted file mode 100644
index eb1e411..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.spark.SparkConf;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputFormatRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.InputRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputFormatRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io.OutputRDD;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload.ViewIncomingPayload;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Stream;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
-
- public SparkGraphComputer(final HadoopGraph hadoopGraph) {
- super(hadoopGraph);
- }
-
- @Override
- public Future<ComputerResult> submit() {
- super.validateStatePriorToExecution();
- // apache and hadoop configurations that are used throughout the graph computer computation
- final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
- apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
- final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
- if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
- try {
- final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
- apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
- hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- // create the completable future
- return CompletableFuture.<ComputerResult>supplyAsync(() -> {
- final long startTime = System.currentTimeMillis();
- SparkMemory memory = null;
- // delete output location
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
- if (null != outputLocation) {
- try {
- FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- // wire up a spark context
- final SparkConf sparkConfiguration = new SparkConf();
- sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
- /*final List<Class> classes = new ArrayList<>();
- classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
- classes.addAll(IOClasses.getSharedHadoopClasses());
- classes.add(ViewPayload.class);
- classes.add(MessagePayload.class);
- classes.add(ViewIncomingPayload.class);
- classes.add(ViewOutgoingPayload.class);
- sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
-
- // create the spark configuration from the graph computer configuration
- hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
- // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
- try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
- // add the project jars to the cluster
- this.loadJars(sparkContext, hadoopConfiguration);
- // create a message-passing friendly rdd from the input rdd
- final JavaPairRDD<Object, VertexWritable> graphRDD;
- try {
- graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
- .newInstance()
- .readGraphRDD(apacheConfiguration, sparkContext)
- .setName("graphRDD")
- .cache();
- } catch (final InstantiationException | IllegalAccessException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
-
- ////////////////////////////////
- // process the vertex program //
- ////////////////////////////////
- if (null != this.vertexProgram) {
- // set up the vertex program and wire up configurations
- memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
- this.vertexProgram.setup(memory);
- memory.broadcastMemory(sparkContext);
- final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
- this.vertexProgram.storeState(vertexProgramConfiguration);
- ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
- ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
-
- // execute the vertex program
- while (true) {
- memory.setInTask(true);
- viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
- memory.setInTask(false);
- if (this.vertexProgram.terminate(memory))
- break;
- else {
- memory.incrIteration();
- memory.broadcastMemory(sparkContext);
- }
- }
- // write the graph rdd using the output rdd
- if (!this.persist.equals(Persist.NOTHING)) {
- try {
- hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
- .newInstance()
- .writeGraphRDD(apacheConfiguration, graphRDD);
- } catch (final InstantiationException | IllegalAccessException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- }
-
- final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
-
- //////////////////////////////
- // process the map reducers //
- //////////////////////////////
- if (!this.mapReducers.isEmpty()) {
- final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
- final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
- for (final MapReduce mapReduce : this.mapReducers) {
- // execute the map reduce job
- final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
- mapReduce.storeState(newApacheConfiguration);
- // map
- final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
- // combine TODO: is this really needed
- // reduce
- final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
- // write the map reduce output back to disk (memory)
- SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
- }
- }
- // update runtime and return the newly computed graph
- finalMemory.setRuntime(System.currentTimeMillis() - startTime);
- return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
- }
- });
- }
-
- /////////////////
-
- private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
- if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
- final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
- if (null == hadoopGremlinLocalLibs)
- this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
- else {
- final String[] paths = hadoopGremlinLocalLibs.split(":");
- for (final String path : paths) {
- final File file = new File(path);
- if (file.exists())
- Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
- else
- this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
- }
- }
- }
- }
-
- public static void main(final String[] args) throws Exception {
- final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
- new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
deleted file mode 100644
index 7141259..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMapEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
-
- private List<Tuple2<K, V>> emissions = new ArrayList<>();
-
- @Override
- public void emit(final K key, final V value) {
- this.emissions.add(new Tuple2<>(key, value));
- }
-
- public Iterator<Tuple2<K, V>> getEmissions() {
- final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
- this.emissions = new ArrayList<>();
- return iterator;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
deleted file mode 100644
index e2de405..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMemory.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.spark.Accumulator;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.broadcast.Broadcast;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMemory implements Memory.Admin, Serializable {
-
- public final Set<String> memoryKeys = new HashSet<>();
- private final AtomicInteger iteration = new AtomicInteger(0); // do these need to be atomics?
- private final AtomicLong runtime = new AtomicLong(0l);
- private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
- private Broadcast<Map<String, Object>> broadcast;
- private boolean inTask = false;
-
- public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
- if (null != vertexProgram) {
- for (final String key : vertexProgram.getMemoryComputeKeys()) {
- MemoryHelper.validateKey(key);
- this.memoryKeys.add(key);
- }
- }
- for (final MapReduce mapReduce : mapReducers) {
- this.memoryKeys.add(mapReduce.getMemoryKey());
- }
- for (final String key : this.memoryKeys) {
- this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), key, new RuleAccumulator()));
- }
- this.broadcast = sparkContext.broadcast(new HashMap<>());
- }
-
- @Override
- public Set<String> keys() {
- if (this.inTask)
- return this.broadcast.getValue().keySet();
- else {
- final Set<String> trueKeys = new HashSet<>();
- this.memory.forEach((key, value) -> {
- if (value.value().getObject() != null)
- trueKeys.add(key);
- });
- return Collections.unmodifiableSet(trueKeys);
- }
- }
-
- @Override
- public void incrIteration() {
- this.iteration.getAndIncrement();
- }
-
- @Override
- public void setIteration(final int iteration) {
- this.iteration.set(iteration);
- }
-
- @Override
- public int getIteration() {
- return this.iteration.get();
- }
-
- @Override
- public void setRuntime(final long runTime) {
- this.runtime.set(runTime);
- }
-
- @Override
- public long getRuntime() {
- return this.runtime.get();
- }
-
- @Override
- public <R> R get(final String key) throws IllegalArgumentException {
- final R r = this.getValue(key);
- if (null == r)
- throw Memory.Exceptions.memoryDoesNotExist(key);
- else
- return r;
- }
-
- @Override
- public void incr(final String key, final long delta) {
- checkKeyValue(key, delta);
- if (this.inTask)
- this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
- else
- this.memory.get(key).setValue(new Rule(Rule.Operation.INCR, this.<Long>getValue(key) + delta));
- }
-
- @Override
- public void and(final String key, final boolean bool) {
- checkKeyValue(key, bool);
- if (this.inTask)
- this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
- else
- this.memory.get(key).setValue(new Rule(Rule.Operation.AND, this.<Boolean>getValue(key) && bool));
- }
-
- @Override
- public void or(final String key, final boolean bool) {
- checkKeyValue(key, bool);
- if (this.inTask)
- this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
- else
- this.memory.get(key).setValue(new Rule(Rule.Operation.OR, this.<Boolean>getValue(key) || bool));
- }
-
- @Override
- public void set(final String key, final Object value) {
- checkKeyValue(key, value);
- if (this.inTask)
- this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
- else
- this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
- }
-
- @Override
- public String toString() {
- return StringFactory.memoryString(this);
- }
-
- protected void setInTask(final boolean inTask) {
- this.inTask = inTask;
- }
-
- protected void broadcastMemory(final JavaSparkContext sparkContext) {
- this.broadcast.destroy(true); // do we need to block?
- final Map<String, Object> toBroadcast = new HashMap<>();
- this.memory.forEach((key, rule) -> {
- if (null != rule.value().getObject())
- toBroadcast.put(key, rule.value().getObject());
- });
- this.broadcast = sparkContext.broadcast(toBroadcast);
- }
-
- private void checkKeyValue(final String key, final Object value) {
- if (!this.memoryKeys.contains(key))
- throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
- MemoryHelper.validateValue(value);
- }
-
- private <R> R getValue(final String key) {
- return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
deleted file mode 100644
index f52843b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkMessenger<M> implements Messenger<M> {
-
- private Vertex vertex;
- private Iterable<M> incomingMessages;
- private List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
-
- public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
- this.vertex = vertex;
- this.incomingMessages = incomingMessages;
- this.outgoingMessages = new ArrayList<>();
- }
-
- public List<Tuple2<Object, M>> getOutgoingMessages() {
- return this.outgoingMessages;
- }
-
- @Override
- public Iterator<M> receiveMessages() {
- return this.incomingMessages.iterator();
- }
-
- @Override
- public void sendMessage(final MessageScope messageScope, final M message) {
- if (messageScope instanceof MessageScope.Local) {
- final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
- final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
- final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
- incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
- } else {
- ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
- }
- }
-
- ///////////
-
- private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
- incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
- return (T) incidentTraversal;
- }
-
- private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
- final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
- return step.getDirection().opposite();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
deleted file mode 100644
index a5d0175..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkReduceEmitter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
-
- private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
-
- @Override
- public void emit(final OK key, final OV value) {
- this.emissions.add(new Tuple2<>(key, value));
- }
-
- public Iterator<Tuple2<OK, OV>> getEmissions() {
- final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
- this.emissions = new ArrayList<>();
- return iterator;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
deleted file mode 100644
index 082c3b6..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputFormatRDD.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class InputFormatRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
- final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
- return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
- (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
- NullWritable.class,
- VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
- .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
deleted file mode 100644
index 75f602b..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDD.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
- * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface InputRDD {
-
- /**
- * Read the graphRDD from the underlying graph system.
- * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
- * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
- * @return an adjacency list representation of the underlying graph system.
- */
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
deleted file mode 100644
index 8d64fd2..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputFormatRDD.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import scala.Tuple2;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class OutputFormatRDD implements OutputRDD {
-
- @Override
- public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
- final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
- final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
- if (null != outputLocation) {
- // map back to a <nullwritable,vertexwritable> stream for output
- graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
- .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
- NullWritable.class,
- VertexWritable.class,
- (Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
deleted file mode 100644
index 8f6ef7a..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDD.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface OutputRDD {
-
- public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD);
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
deleted file mode 100644
index 191c9ca..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/MessagePayload.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MessagePayload<M> implements Payload {
-
- private final M message;
-
- public MessagePayload(final M message) {
- this.message = message;
- }
-
- public M getMessage() {
- return this.message;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
deleted file mode 100644
index d901df7..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/Payload.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import java.io.Serializable;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface Payload extends Serializable {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
deleted file mode 100644
index b236fe9..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewIncomingPayload<M> implements Payload {
-
- private List<DetachedVertexProperty<Object>> view = null;
- private final List<M> incomingMessages;
-
-
- public ViewIncomingPayload() {
- this.incomingMessages = null;
- }
-
- public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
- this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
- }
-
- public ViewIncomingPayload(final ViewPayload viewPayload) {
- this.incomingMessages = null;
- this.view = viewPayload.getView();
- }
-
-
- public List<DetachedVertexProperty<Object>> getView() {
- return null == this.view ? Collections.emptyList() : this.view;
- }
-
-
- public List<M> getIncomingMessages() {
- return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
- }
-
- public boolean hasView() {
- return null != view;
- }
-
- ////////////////////
-
-
- private void mergeMessage(final M message, final MessageCombiner<M> messageCombiner) {
- if (this.incomingMessages.isEmpty() || null == messageCombiner)
- this.incomingMessages.add(message);
- else
- this.incomingMessages.set(0, messageCombiner.combine(this.incomingMessages.get(0), message));
- }
-
- private void mergeViewIncomingPayload(final ViewIncomingPayload<M> viewIncomingPayload, final MessageCombiner<M> messageCombiner) {
- if (this.view == null)
- this.view = viewIncomingPayload.view;
- else
- this.view.addAll(viewIncomingPayload.getView());
-
- for (final M message : viewIncomingPayload.getIncomingMessages()) {
- this.mergeMessage(message, messageCombiner);
- }
- }
-
- public void mergePayload(final Payload payload, final MessageCombiner<M> messageCombiner) {
- if (payload instanceof ViewPayload)
- this.view = ((ViewPayload) payload).getView();
- else if (payload instanceof MessagePayload)
- this.mergeMessage(((MessagePayload<M>) payload).getMessage(), messageCombiner);
- else if (payload instanceof ViewIncomingPayload)
- this.mergeViewIncomingPayload((ViewIncomingPayload<M>) payload, messageCombiner);
- else
- throw new IllegalArgumentException("The provided payload is an unsupported merge payload: " + payload);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
deleted file mode 100644
index c4daf47..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewOutgoingPayload.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-import scala.Tuple2;
-
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewOutgoingPayload<M> implements Payload {
-
- private final List<DetachedVertexProperty<Object>> view;
- private final List<Tuple2<Object,M>> outgoingMessages;
-
- public ViewOutgoingPayload(final List<DetachedVertexProperty<Object>> view, final List<Tuple2<Object,M>> outgoingMessages) {
- this.view = view;
- this.outgoingMessages = outgoingMessages;
- }
-
- public ViewPayload getView() {
- return new ViewPayload(this.view);
- }
-
- public List<Tuple2<Object,M>> getOutgoingMessages() {
- return this.outgoingMessages;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
deleted file mode 100644
index 0ec5ef5..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewPayload.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.payload;
-
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
-
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ViewPayload implements Payload {
-
- private final List<DetachedVertexProperty<Object>> view;
-
- public ViewPayload(final List<DetachedVertexProperty<Object>> view) {
- this.view = view;
- }
-
- public List<DetachedVertexProperty<Object>> getView() {
- return this.view;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 18d515e..3c3c9b7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopEdgeIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopVertexIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
@@ -192,10 +191,17 @@ public final class HadoopGraph implements Graph {
public <C extends GraphComputer> C compute(final Class<C> graphComputerClass) {
if (graphComputerClass.equals(GiraphGraphComputer.class))
return (C) new GiraphGraphComputer(this);
- else if (graphComputerClass.equals(SparkGraphComputer.class))
- return (C) new SparkGraphComputer(this);
- else
- throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
+ else {
+ try {
+ return graphComputerClass.getConstructor(HadoopGraph.class).newInstance(this);
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+ //else if (graphComputerClass.equals(SparkGraphComputer.class))
+ // return (C) new SparkGraphComputer(this);
+ //else
+ // throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
deleted file mode 100644
index af0d745..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/HadoopSparkGraphProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.GraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.giraph.GiraphGraphComputer;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@GraphProvider.Descriptor(computer = SparkGraphComputer.class)
-public final class HadoopSparkGraphProvider extends HadoopGraphProvider {
-
- public GraphTraversalSource traversal(final Graph graph) {
- return GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)).create(graph);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
deleted file mode 100644
index ae729fd..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputerProcessIntegrateTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.ProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(ProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkGraphComputerProcessIntegrateTest {
-}