You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/09/09 22:20:32 UTC
[02/18] incubator-tinkerpop git commit: Spark is now isolated into
spark-gremlin package. Tests are passing. A few hacks here and there just to
get things building. However,
it was pretty easy to split apart -- which is pomising.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
deleted file mode 100644
index cc07bac..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkGraphComputerGroovyProcessIntegrateTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.HadoopSparkGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.GroovyProcessComputerSuite;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(GroovyProcessComputerSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkGraphComputerGroovyProcessIntegrateTest {
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
deleted file mode 100644
index 3d83142..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/groovy/SparkHadoopGremlinPluginTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopPluginSuite;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.HadoopSparkGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(HadoopPluginSuite.class)
-@GraphProviderClass(provider = HadoopSparkGraphProvider.class, graph = HadoopGraph.class)
-public class SparkHadoopGremlinPluginTest {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
deleted file mode 100644
index d456808..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleInputRDD.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.T;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
-import scala.Tuple2;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleInputRDD implements InputRDD {
-
- @Override
- public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
- final List<Vertex> list = new ArrayList<>();
- list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
- list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
- list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
- list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
- return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
deleted file mode 100644
index 45ed114..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/ExampleOutputRDD.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ExampleOutputRDD implements OutputRDD {
- @Override
- public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
- int totalAge = 0;
- final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
- while (iterator.hasNext()) {
- final Vertex vertex = iterator.next().get();
- if (vertex.label().equals("person"))
- totalAge = totalAge + vertex.<Integer>value("age");
- }
- assertEquals(123, totalAge);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
deleted file mode 100644
index ea1cb87..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputOutputRDDTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputOutputRDDTest {
-
- @Test
- public void shouldReadFromWriteToArbitraryRDD() throws Exception {
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- ////////
- Graph graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.EDGES)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
deleted file mode 100644
index c600327..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/InputRDDTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputRDDTest {
-
- @Test
- public void shouldReadFromArbitraryRDD() {
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- ////////
- Graph graph = GraphFactory.open(configuration);
- assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
- assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
deleted file mode 100644
index deea6ab..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/io/OutputRDDTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class OutputRDDTest {
-
- @Test
- public void shouldWriteToArbitraryRDD() throws Exception {
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, HadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "hadoop-gremlin/target/test-output");
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- ////////
- Graph graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.EDGES)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 769f84f..7a4411c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,6 +116,7 @@ limitations under the License.
<module>gremlin-driver</module>
<module>gremlin-console</module>
<module>gremlin-server</module>
+ <module>spark-gremlin</module>
</modules>
<scm>
<connection>scm:git:git@git-wip-us.apache.org:repos/asf/incubator-tinkerpop.git</connection>
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/spark-gremlin/pom.xml b/spark-gremlin/pom.xml
new file mode 100644
index 0000000..277ce11
--- /dev/null
+++ b/spark-gremlin/pom.xml
@@ -0,0 +1,249 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>tinkerpop</artifactId>
+ <version>3.1.0-SNAPSHOT</version>
+ </parent>
+ <artifactId>spark-gremlin</artifactId>
+ <name>Apache TinkerPop :: Spark Gremlin</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-groovy</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>hadoop-gremlin</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- SPARK GRAPH COMPUTER -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_2.10</artifactId>
+ <version>1.2.1</version>
+ <exclusions>
+ <!-- self conflicts -->
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-codec</groupId>
+ <artifactId>commons-codec</artifactId>
+ </exclusion>
+ <!-- gremlin-core conflicts -->
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ </exclusion>
+ <!-- gremlin-groovy conflicts -->
+ <exclusion>
+ <groupId>jline</groupId>
+ <artifactId>jline</artifactId>
+ </exclusion>
+ <!-- hadoop conflicts -->
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ <!-- giraph conflicts -->
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ <!-- lgpl conflicts -->
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>findbugs</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <!-- consistent dependencies -->
+<!-- <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.10.3</version>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ <version>3.5.13.Final</version>
+ </dependency>
+-->
+ <!-- TEST -->
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-test</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>gremlin-groovy-test</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tinkerpop</groupId>
+ <artifactId>tinkergraph-gremlin</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <repositories>
+ <repository>
+ <id>hyracks-releases</id>
+ <url>http://obelix.ics.uci.edu/nexus/content/groups/hyracks-public-releases/</url>
+ </repository>
+ </repositories>
+ <build>
+ <directory>${basedir}/target</directory>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ <resources>
+ <resource>
+ <directory>${basedir}/src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>${basedir}/src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>2.4</version>
+ <executions>
+ <execution>
+ <id>build-detached-assemblies</id>
+ <phase>package</phase>
+ <goals>
+ <goal>single</goal>
+ </goals>
+ <configuration>
+ <attach>false</attach>
+ <descriptors>
+ <descriptor>src/assembly/standalone.xml</descriptor>
+ <descriptor>src/assembly/hadoop-job.xml</descriptor>
+ </descriptors>
+ <finalName>${project.artifactId}-${project.version}</finalName>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.gmavenplus</groupId>
+ <artifactId>gmavenplus-plugin</artifactId>
+ <version>1.2</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>addSources</goal>
+ <goal>addTestSources</goal>
+ <goal>generateStubs</goal>
+ <goal>compile</goal>
+ <goal>testGenerateStubs</goal>
+ <goal>testCompile</goal>
+ <goal>removeStubs</goal>
+ <goal>removeTestStubs</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <invokeDynamic>true</invokeDynamic>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.5</version>
+ <configuration>
+ <archive>
+ <manifestEntries>
+ <Gremlin-Plugin-Dependencies>org.apache.hadoop:hadoop-core:1.2.1
+ </Gremlin-Plugin-Dependencies>
+ <!-- deletes the servlet-api jar from the path after install - causes conflicts -->
+ <Gremlin-Plugin-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+ </Gremlin-Plugin-Paths>
+ <Gremlin-Lib-Paths>servlet-api-2.5-6.1.14.jar=;servlet-api-2.5-20081211.jar=
+ </Gremlin-Lib-Paths>
+ </manifestEntries>
+ </archive>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <!-- log4j.configuration>log4j-core.properties</log4j.configuration> -->
+ <!--<argLine>-Xmx2048M</argLine>-->
+ <excludes>
+ <exclude>**/*IntegrateTest.java</exclude>
+ <exclude>**/*PerformanceTest.java</exclude>
+ <!-- this is technically a member of the integration test suite -->
+ <exclude>**/HadoopGremlinPluginTest.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
new file mode 100644
index 0000000..0b04300
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.groovy.plugin;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class SparkGremlinPlugin {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
new file mode 100644
index 0000000..bebd283
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/RuleAccumulator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.AccumulatorParam;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class RuleAccumulator implements AccumulatorParam<Rule> {
+
+ @Override
+ public Rule addAccumulator(final Rule a, final Rule b) {
+ if (a.getOperation().equals(Rule.Operation.NO_OP))
+ return b;
+ if (b.getOperation().equals(Rule.Operation.NO_OP))
+ return a;
+ else
+ return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
+ }
+
+ @Override
+ public Rule addInPlace(final Rule a, final Rule b) {
+ if (a.getOperation().equals(Rule.Operation.NO_OP))
+ return b;
+ if (b.getOperation().equals(Rule.Operation.NO_OP))
+ return a;
+ else
+ return new Rule(b.getOperation(), b.getOperation().compute(a.getObject(), b.getObject()));
+ }
+
+ @Override
+ public Rule zero(final Rule rule) {
+ return new Rule(Rule.Operation.NO_OP, null);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
new file mode 100644
index 0000000..288538f
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -0,0 +1,202 @@
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import com.google.common.base.Optional;
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.ComputerGraph;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.MessagePayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.Payload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewOutgoingPayload;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewPayload;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import scala.Tuple2;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkExecutor {
+
+ private static final String[] EMPTY_ARRAY = new String[0];
+
+ private SparkExecutor() {
+ }
+
+ ////////////////////
+ // VERTEX PROGRAM //
+ ////////////////////
+
+ public static <M> JavaPairRDD<Object, ViewIncomingPayload<M>> executeVertexProgramIteration(
+ final JavaPairRDD<Object, VertexWritable> graphRDD,
+ final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD,
+ final SparkMemory memory,
+ final Configuration apacheConfiguration) {
+
+ final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = (((null == viewIncomingRDD) ?
+ graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
+ graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
+ // for each partition of vertices
+ .mapPartitionsToPair(partitionIterator -> {
+ HadoopPools.initialize(apacheConfiguration);
+ final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
+ final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
+ final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? EMPTY_ARRAY : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+ final SparkMessenger<M> messenger = new SparkMessenger<>();
+ workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
+ return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
+ final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
+ // drop any compute properties that are cached in memory
+ if (elementComputeKeysArray.length > 0)
+ vertex.dropVertexProperties(elementComputeKeysArray);
+ final boolean hasViewAndMessages = vertexViewIncoming._2()._2().isPresent(); // if this is the first iteration, then there are no views or messages
+ final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
+ final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
+ previousView.forEach(property -> property.attach(Attachable.Method.create(vertex))); // attach the view to the vertex
+ ///
+ messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
+ workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
+ ///
+ final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ? // not all vertex programs have compute keys
+ Collections.emptyList() :
+ IteratorUtils.list(IteratorUtils.map(vertex.properties(elementComputeKeysArray), property -> DetachedFactory.detach(property, true)));
+ final List<Tuple2<Object, M>> outgoingMessages = messenger.getOutgoingMessages(); // get the outgoing messages
+ if (!partitionIterator.hasNext())
+ workerVertexProgram.workerIterationEnd(memory.asImmutable()); // if no more vertices in the partition, end the worker's iteration
+ return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
+ });
+ })).setName("viewOutgoingRDD");
+
+ // "message pass" by reducing on the vertex object id of the view and message payloads
+ final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(HadoopGraph.open(apacheConfiguration), apacheConfiguration).getMessageCombiner().orElse(null);
+ final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
+ .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
+ IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())), // emit the view payload
+ IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2()))))) // emit the outgoing message payloads one by one
+ .reduceByKey((a, b) -> { // reduce the view and outgoing messages into a single payload object representing the new view and incoming messages for a vertex
+ if (a instanceof ViewIncomingPayload) {
+ ((ViewIncomingPayload<M>) a).mergePayload(b, messageCombiner);
+ return a;
+ } else if (b instanceof ViewIncomingPayload) {
+ ((ViewIncomingPayload<M>) b).mergePayload(a, messageCombiner);
+ return b;
+ } else {
+ final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
+ c.mergePayload(a, messageCombiner);
+ c.mergePayload(b, messageCombiner);
+ return c;
+ }
+ })
+ .filter(payload -> !(payload._2() instanceof MessagePayload)) // this happens if there is a message to a vertex that does not exist
+ .filter(payload -> !((payload._2() instanceof ViewIncomingPayload) && !((ViewIncomingPayload<M>) payload._2()).hasView())) // this happens if there are many messages to a vertex that does not exist
+ .mapValues(payload -> payload instanceof ViewIncomingPayload ?
+ (ViewIncomingPayload<M>) payload : // this happens if there is a vertex with incoming messages
+ new ViewIncomingPayload<>((ViewPayload) payload)); // this happens if there is a vertex with no incoming messages
+
+ newViewIncomingRDD.setName("viewIncomingRDD")
+ .foreachPartition(partitionIterator -> {
+ HadoopPools.initialize(apacheConfiguration);
+ }); // need to complete a task so its BSP and the memory for this iteration is updated
+ return newViewIncomingRDD;
+ }
+
+ /////////////////
+ // MAP REDUCE //
+ ////////////////
+
+ public static <M> JavaPairRDD<Object, VertexWritable> prepareGraphRDDForMapReduce(final JavaPairRDD<Object, VertexWritable> graphRDD, final JavaPairRDD<Object, ViewIncomingPayload<M>> viewIncomingRDD, final String[] elementComputeKeys) {
+ return (null == viewIncomingRDD) ? // there was no vertex program
+ graphRDD.mapValues(vertexWritable -> {
+ vertexWritable.get().dropEdges();
+ return vertexWritable;
+ }) :
+ graphRDD.leftOuterJoin(viewIncomingRDD)
+ .mapValues(tuple -> {
+ final StarGraph.StarVertex vertex = tuple._1().get();
+ vertex.dropEdges();
+ vertex.dropVertexProperties(elementComputeKeys);
+ final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
+ view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
+ return tuple._1();
+ });
+ }
+
+ public static <K, V> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, VertexWritable> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
+ HadoopPools.initialize(apacheConfiguration);
+ final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+ workerMapReduce.workerStart(MapReduce.Stage.MAP);
+ final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
+ return () -> IteratorUtils.flatMap(partitionIterator, vertexWritable -> {
+ workerMapReduce.map(ComputerGraph.mapReduce(vertexWritable._2().get()), mapEmitter);
+ if (!partitionIterator.hasNext())
+ workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+ return mapEmitter.getEmissions();
+ });
+ });
+ if (mapReduce.getMapKeySort().isPresent())
+ mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
+ return mapRDD;
+ }
+
+ // TODO: public static executeCombine() is this necessary? YES --- we groupByKey in reduce() where we want to combine first.
+
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
+ JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
+ HadoopPools.initialize(apacheConfiguration);
+ final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(HadoopGraph.open(apacheConfiguration), apacheConfiguration);
+ workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
+ final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
+ return () -> IteratorUtils.flatMap(partitionIterator, keyValue -> {
+ workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter);
+ if (!partitionIterator.hasNext())
+ workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
+ return reduceEmitter.getEmissions();
+ });
+ });
+ if (mapReduce.getReduceKeySort().isPresent())
+ reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
+ return reduceRDD;
+ }
+
+ ///////////////////
+ // Input/Output //
+ //////////////////
+
+ public static void saveMapReduceRDD(final JavaPairRDD<Object, Object> mapReduceRDD, final MapReduce mapReduce, final Memory.Admin memory, final org.apache.hadoop.conf.Configuration hadoopConfiguration) {
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+ if (null != outputLocation) {
+ // map back to a Hadoop stream for output
+ mapReduceRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + mapReduce.getMemoryKey(),
+ ObjectWritable.class,
+ ObjectWritable.class,
+ SequenceFileOutputFormat.class, hadoopConfiguration);
+ // TODO: mapReduce.addResultToMemory(memory, mapReduceRDD.map(tuple -> new KeyValue<>(tuple._1(), tuple._2())).collect().iterator());
+ try {
+ mapReduce.addResultToMemory(memory, new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + mapReduce.getMemoryKey())));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
new file mode 100644
index 0000000..3aa4383
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.commons.configuration.FileConfiguration;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.InputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputFormatRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.process.computer.payload.ViewIncomingPayload;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.HadoopHelper;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
+
+ public SparkGraphComputer(final HadoopGraph hadoopGraph) {
+ super(hadoopGraph);
+ }
+
+ @Override
+ public Future<ComputerResult> submit() {
+ super.validateStatePriorToExecution();
+ // apache and hadoop configurations that are used throughout the graph computer computation
+ final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
+ apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
+ final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
+ if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+ try {
+ final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+ apacheConfiguration.setProperty(Constants.MAPRED_INPUT_DIR, inputLocation);
+ hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, inputLocation);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ // create the completable future
+ return CompletableFuture.<ComputerResult>supplyAsync(() -> {
+ final long startTime = System.currentTimeMillis();
+ SparkMemory memory = null;
+ // delete output location
+ final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+ if (null != outputLocation) {
+ try {
+ FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ // wire up a spark context
+ final SparkConf sparkConfiguration = new SparkConf();
+ sparkConfiguration.setAppName(Constants.GREMLIN_HADOOP_SPARK_JOB_PREFIX + (null == this.vertexProgram ? "No VertexProgram" : this.vertexProgram) + "[" + this.mapReducers + "]");
+ /*final List<Class> classes = new ArrayList<>();
+ classes.addAll(IOClasses.getGryoClasses(GryoMapper.build().create()));
+ classes.addAll(IOClasses.getSharedHadoopClasses());
+ classes.add(ViewPayload.class);
+ classes.add(MessagePayload.class);
+ classes.add(ViewIncomingPayload.class);
+ classes.add(ViewOutgoingPayload.class);
+ sparkConfiguration.registerKryoClasses(classes.toArray(new Class[classes.size()]));*/ // TODO: fix for user submitted jars in Spark 1.3.0
+
+ // create the spark configuration from the graph computer configuration
+ hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+ // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
+ try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+ // add the project jars to the cluster
+ this.loadJars(sparkContext, hadoopConfiguration);
+ // create a message-passing friendly rdd from the input rdd
+ final JavaPairRDD<Object, VertexWritable> graphRDD;
+ try {
+ graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
+ .newInstance()
+ .readGraphRDD(apacheConfiguration, sparkContext)
+ .setName("graphRDD")
+ .cache();
+ } catch (final InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
+
+ ////////////////////////////////
+ // process the vertex program //
+ ////////////////////////////////
+ if (null != this.vertexProgram) {
+ // set up the vertex program and wire up configurations
+ memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
+ this.vertexProgram.setup(memory);
+ memory.broadcastMemory(sparkContext);
+ final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
+ this.vertexProgram.storeState(vertexProgramConfiguration);
+ ConfigurationUtils.copy(vertexProgramConfiguration, apacheConfiguration);
+ ConfUtil.mergeApacheIntoHadoopConfiguration(vertexProgramConfiguration, hadoopConfiguration);
+
+ // execute the vertex program
+ while (true) {
+ memory.setInTask(true);
+ viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(graphRDD, viewIncomingRDD, memory, vertexProgramConfiguration);
+ memory.setInTask(false);
+ if (this.vertexProgram.terminate(memory))
+ break;
+ else {
+ memory.incrIteration();
+ memory.broadcastMemory(sparkContext);
+ }
+ }
+ // write the graph rdd using the output rdd
+ if (!this.persist.equals(GraphComputer.Persist.NOTHING)) {
+ try {
+ hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_RDD, OutputFormatRDD.class, OutputRDD.class)
+ .newInstance()
+ .writeGraphRDD(apacheConfiguration, graphRDD);
+ } catch (final InstantiationException | IllegalAccessException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ }
+
+ final Memory.Admin finalMemory = null == memory ? new MapMemory() : new MapMemory(memory);
+
+ //////////////////////////////
+ // process the map reducers //
+ //////////////////////////////
+ if (!this.mapReducers.isEmpty()) {
+ final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
+ final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD, elementComputeKeys).setName("mapReduceGraphRDD").cache();
+ for (final MapReduce mapReduce : this.mapReducers) {
+ // execute the map reduce job
+ final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
+ mapReduce.storeState(newApacheConfiguration);
+ // map
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
+ // combine TODO: is this really needed
+ // reduce
+ final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
+ // write the map reduce output back to disk (memory)
+ SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
+ }
+ }
+ // update runtime and return the newly computed graph
+ finalMemory.setRuntime(System.currentTimeMillis() - startTime);
+ return new DefaultComputerResult(HadoopHelper.getOutputGraph(this.hadoopGraph, this.resultGraph, this.persist), finalMemory.asImmutable());
+ }
+ });
+ }
+
+ /////////////////
+
+ private void loadJars(final JavaSparkContext sparkContext, final Configuration hadoopConfiguration) {
+ if (hadoopConfiguration.getBoolean(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, true)) {
+ final String hadoopGremlinLocalLibs = System.getenv(Constants.HADOOP_GREMLIN_LIBS);
+ if (null == hadoopGremlinLocalLibs)
+ this.logger.warn(Constants.HADOOP_GREMLIN_LIBS + " is not set -- proceeding regardless");
+ else {
+ final String[] paths = hadoopGremlinLocalLibs.split(":");
+ for (final String path : paths) {
+ final File file = new File(path);
+ if (file.exists())
+ Stream.of(file.listFiles()).filter(f -> f.getName().endsWith(Constants.DOT_JAR)).forEach(f -> sparkContext.addJar(f.getAbsolutePath()));
+ else
+ this.logger.warn(path + " does not reference a valid directory -- proceeding regardless");
+ }
+ }
+ }
+ }
+
+ public static void main(final String[] args) throws Exception {
+ final FileConfiguration configuration = new PropertiesConfiguration(args[0]);
+ new SparkGraphComputer(HadoopGraph.open(configuration)).program(VertexProgram.createVertexProgram(HadoopGraph.open(configuration), configuration)).submit().get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
new file mode 100644
index 0000000..cf31249
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMapEmitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
+
+ private List<Tuple2<K, V>> emissions = new ArrayList<>();
+
+ @Override
+ public void emit(final K key, final V value) {
+ this.emissions.add(new Tuple2<>(key, value));
+ }
+
+ public Iterator<Tuple2<K, V>> getEmissions() {
+ final Iterator<Tuple2<K,V>> iterator = this.emissions.iterator();
+ this.emissions = new ArrayList<>();
+ return iterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
new file mode 100644
index 0000000..dbfadcc
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMemory.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.broadcast.Broadcast;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.Rule;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.util.MemoryHelper;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMemory implements Memory.Admin, Serializable {
+
+ public final Set<String> memoryKeys = new HashSet<>();
+ private final AtomicInteger iteration = new AtomicInteger(0); // do these need to be atomics?
+ private final AtomicLong runtime = new AtomicLong(0l);
+ private final Map<String, Accumulator<Rule>> memory = new HashMap<>();
+ private Broadcast<Map<String, Object>> broadcast;
+ private boolean inTask = false;
+
+ public SparkMemory(final VertexProgram<?> vertexProgram, final Set<MapReduce> mapReducers, final JavaSparkContext sparkContext) {
+ if (null != vertexProgram) {
+ for (final String key : vertexProgram.getMemoryComputeKeys()) {
+ MemoryHelper.validateKey(key);
+ this.memoryKeys.add(key);
+ }
+ }
+ for (final MapReduce mapReduce : mapReducers) {
+ this.memoryKeys.add(mapReduce.getMemoryKey());
+ }
+ for (final String key : this.memoryKeys) {
+ this.memory.put(key, sparkContext.accumulator(new Rule(Rule.Operation.NO_OP, null), key, new RuleAccumulator()));
+ }
+ this.broadcast = sparkContext.broadcast(new HashMap<>());
+ }
+
+ @Override
+ public Set<String> keys() {
+ if (this.inTask)
+ return this.broadcast.getValue().keySet();
+ else {
+ final Set<String> trueKeys = new HashSet<>();
+ this.memory.forEach((key, value) -> {
+ if (value.value().getObject() != null)
+ trueKeys.add(key);
+ });
+ return Collections.unmodifiableSet(trueKeys);
+ }
+ }
+
+ @Override
+ public void incrIteration() {
+ this.iteration.getAndIncrement();
+ }
+
+ @Override
+ public void setIteration(final int iteration) {
+ this.iteration.set(iteration);
+ }
+
+ @Override
+ public int getIteration() {
+ return this.iteration.get();
+ }
+
+ @Override
+ public void setRuntime(final long runTime) {
+ this.runtime.set(runTime);
+ }
+
+ @Override
+ public long getRuntime() {
+ return this.runtime.get();
+ }
+
+ @Override
+ public <R> R get(final String key) throws IllegalArgumentException {
+ final R r = this.getValue(key);
+ if (null == r)
+ throw Memory.Exceptions.memoryDoesNotExist(key);
+ else
+ return r;
+ }
+
+ @Override
+ public void incr(final String key, final long delta) {
+ checkKeyValue(key, delta);
+ if (this.inTask)
+ this.memory.get(key).add(new Rule(Rule.Operation.INCR, delta));
+ else
+ this.memory.get(key).setValue(new Rule(Rule.Operation.INCR, this.<Long>getValue(key) + delta));
+ }
+
+ @Override
+ public void and(final String key, final boolean bool) {
+ checkKeyValue(key, bool);
+ if (this.inTask)
+ this.memory.get(key).add(new Rule(Rule.Operation.AND, bool));
+ else
+ this.memory.get(key).setValue(new Rule(Rule.Operation.AND, this.<Boolean>getValue(key) && bool));
+ }
+
+ @Override
+ public void or(final String key, final boolean bool) {
+ checkKeyValue(key, bool);
+ if (this.inTask)
+ this.memory.get(key).add(new Rule(Rule.Operation.OR, bool));
+ else
+ this.memory.get(key).setValue(new Rule(Rule.Operation.OR, this.<Boolean>getValue(key) || bool));
+ }
+
+ @Override
+ public void set(final String key, final Object value) {
+ checkKeyValue(key, value);
+ if (this.inTask)
+ this.memory.get(key).add(new Rule(Rule.Operation.SET, value));
+ else
+ this.memory.get(key).setValue(new Rule(Rule.Operation.SET, value));
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.memoryString(this);
+ }
+
+ protected void setInTask(final boolean inTask) {
+ this.inTask = inTask;
+ }
+
+ protected void broadcastMemory(final JavaSparkContext sparkContext) {
+ this.broadcast.destroy(true); // do we need to block?
+ final Map<String, Object> toBroadcast = new HashMap<>();
+ this.memory.forEach((key, rule) -> {
+ if (null != rule.value().getObject())
+ toBroadcast.put(key, rule.value().getObject());
+ });
+ this.broadcast = sparkContext.broadcast(toBroadcast);
+ }
+
+ private void checkKeyValue(final String key, final Object value) {
+ if (!this.memoryKeys.contains(key))
+ throw GraphComputer.Exceptions.providedKeyIsNotAMemoryComputeKey(key);
+ MemoryHelper.validateValue(value);
+ }
+
+ private <R> R getValue(final String key) {
+ return this.inTask ? (R) this.broadcast.value().get(key) : (R) this.memory.get(key).value().getObject();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
new file mode 100644
index 0000000..f32c684
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkMessenger.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.VertexStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.StartStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkMessenger<M> implements Messenger<M> {
+
+ private Vertex vertex;
+ private Iterable<M> incomingMessages;
+ private List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
+
+ public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
+ this.vertex = vertex;
+ this.incomingMessages = incomingMessages;
+ this.outgoingMessages = new ArrayList<>();
+ }
+
+ public List<Tuple2<Object, M>> getOutgoingMessages() {
+ return this.outgoingMessages;
+ }
+
+ @Override
+ public Iterator<M> receiveMessages() {
+ return this.incomingMessages.iterator();
+ }
+
+ @Override
+ public void sendMessage(final MessageScope messageScope, final M message) {
+ if (messageScope instanceof MessageScope.Local) {
+ final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
+ final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
+ final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
+ incidentTraversal.forEachRemaining(edge -> this.outgoingMessages.add(new Tuple2<>(edge.vertices(direction).next().id(), message)));
+ } else {
+ ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoingMessages.add(new Tuple2<>(v.id(), message)));
+ }
+ }
+
+ ///////////
+
+ private static <T extends Traversal.Admin<Vertex, Edge>> T setVertexStart(final Traversal<Vertex, Edge> incidentTraversal, final Vertex vertex) {
+ incidentTraversal.asAdmin().addStep(0, new StartStep<>(incidentTraversal.asAdmin(), vertex));
+ return (T) incidentTraversal;
+ }
+
+ private static Direction getOppositeDirection(final Traversal.Admin<Vertex, Edge> incidentTraversal) {
+ final VertexStep step = TraversalHelper.getLastStepOfAssignableClass(VertexStep.class, incidentTraversal).get();
+ return step.getDirection().opposite();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
new file mode 100644
index 0000000..e2252fa
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkReduceEmitter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer;
+
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkReduceEmitter<OK, OV> implements MapReduce.ReduceEmitter<OK, OV> {
+
+ private List<Tuple2<OK, OV>> emissions = new ArrayList<>();
+
+ @Override
+ public void emit(final OK key, final OV value) {
+ this.emissions.add(new Tuple2<>(key, value));
+ }
+
+ public Iterator<Tuple2<OK, OV>> getEmissions() {
+ final Iterator<Tuple2<OK, OV>> iterator = this.emissions.iterator();
+ this.emissions = new ArrayList<>();
+ return iterator;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
new file mode 100644
index 0000000..adb080b
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputFormatRDD.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import scala.Tuple2;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InputFormatRDD implements InputRDD {
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+ final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+ return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
+ (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
+ NullWritable.class,
+ VertexWritable.class)
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+ .reduceByKey((a, b) -> a); // if this is not done, then the graph is partitioned and you can have duplicate vertices
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/04f5651e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
new file mode 100644
index 0000000..19d79a8
--- /dev/null
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDD.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.process.computer.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+
+/**
+ * An InputRDD is used to read data from the underlying graph system and yield the respective adjacency list.
+ * Note that {@link InputFormatRDD} is a type of InputRDD that simply uses the specified {@link org.apache.hadoop.mapreduce.InputFormat} to generate the respective graphRDD.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface InputRDD {
+
+ /**
+ * Read the graphRDD from the underlying graph system.
+ * @param configuration the configuration for the {@link org.apache.tinkerpop.gremlin.hadoop.process.computer.spark.SparkGraphComputer}.
+ * @param sparkContext the Spark context with the requisite methods for generating a {@link JavaPairRDD}.
+ * @return an adjacency list representation of the underlying graph system.
+ */
+ public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext);
+}