You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2015/11/03 17:48:49 UTC

[02/14] incubator-tinkerpop git commit: Added a test case that verifies a PageRankVertexProgram to BulkLoaderVertexProgram load into Spark without touching HDFS. Need to do the GraphComputer.config() ticket to make this all pretty.

Added a test case that verifies a PageRankVertexProgram to BulkLoaderVertexProgram load into Spark without touching HDFS. Need to do the GraphComputer.config() ticket to make this all pretty.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3d2d6a69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3d2d6a69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3d2d6a69

Branch: refs/heads/TINKERPOP3-909
Commit: 3d2d6a69086166ebd34ee10ade656c0e61a1ac0c
Parents: 82bbc59
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Oct 27 16:02:42 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Oct 27 16:02:42 2015 -0600

----------------------------------------------------------------------
 .../process/computer/SparkGraphComputer.java    | 18 +++----
 .../io/SparkContextPersistenceTest.java         | 49 ++++++++++++++------
 2 files changed, 44 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3d2d6a69/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 3a5a8a7..7134f26 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -83,15 +83,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
         apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
         final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
-        if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-            try {
-                final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
-                apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-                hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
+        if (null == hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
+            if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+                try {
+                    final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+                    apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+                    hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+                } catch (final IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
             }
-        }
+        } // else WARN that both an INPUT_FORMAT and INPUT_RDD_NAME were provided?
 
         // create the completable future
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3d2d6a69/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
index 87bbf2b..e19d89f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
@@ -30,6 +30,8 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
 import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
@@ -37,10 +39,13 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
 import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
 import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
 import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
 import org.junit.Test;
 
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -56,7 +61,6 @@ public class SparkContextPersistenceTest {
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
         configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
         configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
         configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
@@ -89,19 +93,34 @@ public class SparkContextPersistenceTest {
 
     @Test
     public void testBulkLoaderVertexProgramChain() throws Exception {
-        final Configuration configuration = new BaseConfiguration();
-        configuration.setProperty("spark.master", "local[4]");
-        configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
-        configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
-        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
-        configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
-        configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
-        configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
-        Graph graph = GraphFactory.open(configuration);
+        final Configuration readConfiguration = new BaseConfiguration();
+        readConfiguration.setProperty("spark.master", "local[4]");
+        readConfiguration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+        readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+        readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
+        readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+        Graph graph = GraphFactory.open(readConfiguration);
 
-        //graph.compute().program(PageRankVertexProgram.build().create(graph)).submit().get().graph().compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(conf).create(graph)).submit().get()
+        ///////////////
+        final Configuration writeConfiguration = new BaseConfiguration();
+        writeConfiguration.setProperty(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph");
+        writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_FORMAT, "gryo");
+        writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+        final Graph secondGraph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(graph)).submit().get().graph();
+        secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
+        secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
+        secondGraph.compute(SparkGraphComputer.class)
+                .persist(GraphComputer.Persist.NOTHING)
+                .workers(1)
+                .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(secondGraph))
+                .submit().get();
+        final Graph finalGraph = TinkerGraph.open();
+        finalGraph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+        assertEquals(6l,finalGraph.traversal().V().count().next().longValue());
     }
 }