You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/11/03 17:48:54 UTC
[07/14] incubator-tinkerpop git commit: @RussellSpitzer has schooled
me in cache()/unpersist(). I now am smart to unpersist() RDDs that are just
dangling around (especailly since we now have persistent contexts). Finally,
I did some refactoring of packag
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
deleted file mode 100644
index b83314a..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputOutputRDDTest.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputOutputRDDTest {
-
- @Test
- public void shouldReadFromWriteToArbitraryRDD() throws Exception {
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- ////////
- Graph graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.EDGES)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
deleted file mode 100644
index 3070fea..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/InputRDDTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class InputRDDTest {
-
- @Test
- public void shouldReadFromArbitraryRDD() {
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- ////////
- Graph graph = GraphFactory.open(configuration);
- assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
- assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
deleted file mode 100644
index febece6..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/OutputRDDTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.junit.Test;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class OutputRDDTest {
-
- @Test
- public void shouldWriteToArbitraryRDD() throws Exception {
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- ////////
- Graph graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.EDGES)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
deleted file mode 100644
index a7d1ac0..0000000
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/PersistedInputOutputRDDTest.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.spark.process.computer.io;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.spark.SparkConf;
-import org.apache.spark.SparkContext;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
-import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
-import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.IoCore;
-import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.junit.Test;
-
-import java.util.UUID;
-
-import static org.junit.Assert.*;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class PersistedInputOutputRDDTest {
-
- @Test
- public void shouldNotPersistRDDAcrossJobs() throws Exception {
- final String rddName = "target/test-output/" + UUID.randomUUID();
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
- configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false); // because the spark context is NOT persisted, neither is the RDD
- Graph graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.EDGES)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
- ////////
- SparkConf sparkConfiguration = new SparkConf();
- sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
- ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
- JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
- assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
- }
-
- @Test
- public void shouldPersistRDDAcrossJobs() throws Exception {
- final String rddName = "target/test-output/" + UUID.randomUUID();
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
- Graph graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.EDGES)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
- ////////
- SparkConf sparkConfiguration = new SparkConf();
- sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
- ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
- JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
- assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
- ///////
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
- graph = GraphFactory.open(configuration);
- graph.compute(SparkGraphComputer.class)
- .result(GraphComputer.ResultGraph.NEW)
- .persist(GraphComputer.Persist.NOTHING)
- .program(TraversalVertexProgram.build()
- .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
- "gremlin-groovy",
- "g.V()").create(graph)).submit().get();
- }
-
- @Test
- public void testBulkLoaderVertexProgramChain() throws Exception {
- final String rddName = "target/test-output/" + UUID.randomUUID().toString();
- final Configuration readConfiguration = new BaseConfiguration();
- readConfiguration.setProperty("spark.master", "local[4]");
- readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
- readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
- readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
- readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
- readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
- Graph pageRankGraph = GraphFactory.open(readConfiguration);
- ///////////////
- final Configuration writeConfiguration = new BaseConfiguration();
- writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
- writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
- writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
- final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
- bulkLoaderGraph.compute(SparkGraphComputer.class)
- .persist(GraphComputer.Persist.NOTHING)
- .workers(1)
- .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
- .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
- .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
- .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
- .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
- .submit().get();
- ////
- SparkConf sparkConfiguration = new SparkConf();
- sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
- JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
- assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
- ////
- final Graph graph = TinkerGraph.open();
- final GraphTraversalSource g = graph.traversal();
- graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
- assertEquals(6l, g.V().count().next().longValue());
- assertEquals(6l, g.E().count().next().longValue());
- assertEquals("marko", g.V().has("name", "marko").values("name").next());
- assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
- assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
- assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
- assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
new file mode 100644
index 0000000..8417d1c
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleInputRDD.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
+import org.apache.tinkerpop.gremlin.structure.T;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.star.StarGraph;
+import scala.Tuple2;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleInputRDD implements InputRDD {
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+ final List<Vertex> list = new ArrayList<>();
+ list.add(StarGraph.open().addVertex(T.id, 1l, T.label,"person","age", 29));
+ list.add(StarGraph.open().addVertex(T.id, 2l, T.label,"person","age", 27));
+ list.add(StarGraph.open().addVertex(T.id, 4l, T.label,"person","age", 32));
+ list.add(StarGraph.open().addVertex(T.id, 6l, T.label,"person","age", 35));
+ return sparkContext.parallelize(list).mapToPair(vertex -> new Tuple2<>(vertex.id(), new VertexWritable(vertex)));
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
new file mode 100644
index 0000000..9a9e5ef
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ExampleOutputRDD.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ExampleOutputRDD implements OutputRDD {
+ @Override
+ public void writeGraphRDD(final Configuration configuration, final JavaPairRDD<Object, VertexWritable> graphRDD) {
+ int totalAge = 0;
+ final Iterator<VertexWritable> iterator = graphRDD.values().toLocalIterator();
+ while (iterator.hasNext()) {
+ final Vertex vertex = iterator.next().get();
+ if (vertex.label().equals("person"))
+ totalAge = totalAge + vertex.<Integer>value("age");
+ }
+ assertEquals(123, totalAge);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
new file mode 100644
index 0000000..1feeb61
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputRDDTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputOutputRDDTest {
+
+ @Test
+ public void shouldReadFromWriteToArbitraryRDD() throws Exception {
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+ configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ ////////
+ Graph graph = GraphFactory.open(configuration);
+ graph.compute(SparkGraphComputer.class)
+ .result(GraphComputer.ResultGraph.NEW)
+ .persist(GraphComputer.Persist.EDGES)
+ .program(TraversalVertexProgram.build()
+ .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+ "gremlin-groovy",
+ "g.V()").create(graph)).submit().get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
new file mode 100644
index 0000000..8d6c06e
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputRDDTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class InputRDDTest {
+
+ @Test
+ public void shouldReadFromArbitraryRDD() {
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ExampleInputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+ configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ ////////
+ Graph graph = GraphFactory.open(configuration);
+ assertEquals(Double.valueOf(123.0d), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().values("age").sum().next());
+ assertEquals(Long.valueOf(4l), graph.traversal(GraphTraversalSource.computer(SparkGraphComputer.class)).V().count().next());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
new file mode 100644
index 0000000..43dcdbd
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputRDDTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.junit.Test;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class OutputRDDTest {
+
+ @Test
+ public void shouldWriteToArbitraryRDD() throws Exception {
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
+ configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, ExampleOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+ configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ ////////
+ Graph graph = GraphFactory.open(configuration);
+ graph.compute(SparkGraphComputer.class)
+ .result(GraphComputer.ResultGraph.NEW)
+ .persist(GraphComputer.Persist.EDGES)
+ .program(TraversalVertexProgram.build()
+ .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+ "gremlin-groovy",
+ "g.V()").create(graph)).submit().get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/cb972387/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
new file mode 100644
index 0000000..6ee7aaa
--- /dev/null
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.spark.structure.io;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.spark.SparkConf;
+import org.apache.spark.SparkContext;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.junit.Assert.*;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class PersistedInputOutputRDDTest {
+
+ @Test
+ public void shouldNotPersistRDDAcrossJobs() throws Exception {
+ final String rddName = "target/test-output/" + UUID.randomUUID();
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+ configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, false); // because the spark context is NOT persisted, neither is the RDD
+ Graph graph = GraphFactory.open(configuration);
+ graph.compute(SparkGraphComputer.class)
+ .result(GraphComputer.ResultGraph.NEW)
+ .persist(GraphComputer.Persist.EDGES)
+ .program(TraversalVertexProgram.build()
+ .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+ "gremlin-groovy",
+ "g.V()").create(graph)).submit().get();
+ ////////
+ SparkConf sparkConfiguration = new SparkConf();
+ sparkConfiguration.setAppName("shouldNotPersistRDDAcrossJobs");
+ ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+ JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+ assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+ }
+
+ @Test
+ public void shouldPersistRDDAcrossJobs() throws Exception {
+ final String rddName = "target/test-output/" + UUID.randomUUID();
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty("spark.master", "local[4]");
+ configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+ configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+ configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ Graph graph = GraphFactory.open(configuration);
+ graph.compute(SparkGraphComputer.class)
+ .result(GraphComputer.ResultGraph.NEW)
+ .persist(GraphComputer.Persist.EDGES)
+ .program(TraversalVertexProgram.build()
+ .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+ "gremlin-groovy",
+ "g.V()").create(graph)).submit().get();
+ ////////
+ SparkConf sparkConfiguration = new SparkConf();
+ sparkConfiguration.setAppName("shouldPersistRDDAcrossJobs");
+ ConfUtil.makeHadoopConfiguration(configuration).forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
+ JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+ assertTrue(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+ ///////
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null);
+ configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
+ graph = GraphFactory.open(configuration);
+ graph.compute(SparkGraphComputer.class)
+ .result(GraphComputer.ResultGraph.NEW)
+ .persist(GraphComputer.Persist.NOTHING)
+ .program(TraversalVertexProgram.build()
+ .traversal(GraphTraversalSource.build().engine(ComputerTraversalEngine.build().computer(SparkGraphComputer.class)),
+ "gremlin-groovy",
+ "g.V()").create(graph)).submit().get();
+ }
+
+ @Test
+ public void testBulkLoaderVertexProgramChain() throws Exception {
+ final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+ final Configuration readConfiguration = new BaseConfiguration();
+ readConfiguration.setProperty("spark.master", "local[4]");
+ readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+ readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ Graph pageRankGraph = GraphFactory.open(readConfiguration);
+ ///////////////
+ final Configuration writeConfiguration = new BaseConfiguration();
+ writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+ writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+ writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+ final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+ bulkLoaderGraph.compute(SparkGraphComputer.class)
+ .persist(GraphComputer.Persist.NOTHING)
+ .workers(1)
+ .configure(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName())
+ .configure(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName)
+ .configure(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)
+ .configure(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null)
+ .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+ .submit().get();
+ ////
+ SparkConf sparkConfiguration = new SparkConf();
+ sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
+ JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+ assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+ ////
+ final Graph graph = TinkerGraph.open();
+ final GraphTraversalSource g = graph.traversal();
+ graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+ assertEquals(6l, g.V().count().next().longValue());
+ assertEquals(6l, g.E().count().next().longValue());
+ assertEquals("marko", g.V().has("name", "marko").values("name").next());
+ assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+ assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+ assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+ assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+ }
+
+ @Test
+ public void testBulkLoaderVertexProgramChainWithInputOutputHelperMapping() throws Exception {
+ final String rddName = "target/test-output/" + UUID.randomUUID().toString();
+ final Configuration readConfiguration = new BaseConfiguration();
+ readConfiguration.setProperty("spark.master", "local[4]");
+ readConfiguration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
+ readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ Graph pageRankGraph = GraphFactory.open(readConfiguration);
+ ///////////////
+ final Configuration writeConfiguration = new BaseConfiguration();
+ writeConfiguration.setProperty(Graph.GRAPH, TinkerGraph.class.getCanonicalName());
+ writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_FORMAT, "gryo");
+ writeConfiguration.setProperty(TinkerGraph.GREMLIN_TINKERGRAPH_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+ final Graph bulkLoaderGraph = pageRankGraph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().create(pageRankGraph)).submit().get().graph();
+ bulkLoaderGraph.compute(SparkGraphComputer.class)
+ .persist(GraphComputer.Persist.NOTHING)
+ .workers(1)
+ .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(bulkLoaderGraph))
+ .submit().get();
+ ////
+ SparkConf sparkConfiguration = new SparkConf();
+ sparkConfiguration.setAppName("testBulkLoaderVertexProgramChain");
+ JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+ assertFalse(PersistedInputRDD.getPersistedRDD(sparkContext, rddName).isPresent());
+ ////
+ final Graph graph = TinkerGraph.open();
+ final GraphTraversalSource g = graph.traversal();
+ graph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+ assertEquals(6l, g.V().count().next().longValue());
+ assertEquals(6l, g.E().count().next().longValue());
+ assertEquals("marko", g.V().has("name", "marko").values("name").next());
+ assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+ assertEquals(0l, g.E().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
+ assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+ assertEquals(0l, g.E().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
+ }
+}