You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/11/02 16:52:04 UTC
[02/12] incubator-tinkerpop git commit: Added a test case that
verifies a PageRankVertexProgram to BulkLoaderVertexProgram load into Spark
without touching HDFS. Need to do the GraphComputer.config() ticket to make
this all pretty.
Added a test case that verifies a PageRankVertexProgram to BulkLoaderVertexProgram load into Spark without touching HDFS. Need to do the GraphComputer.config() ticket to make this all pretty.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3d2d6a69
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3d2d6a69
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3d2d6a69
Branch: refs/heads/master
Commit: 3d2d6a69086166ebd34ee10ade656c0e61a1ac0c
Parents: 82bbc59
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Oct 27 16:02:42 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Oct 27 16:02:42 2015 -0600
----------------------------------------------------------------------
.../process/computer/SparkGraphComputer.java | 18 +++----
.../io/SparkContextPersistenceTest.java | 49 ++++++++++++++------
2 files changed, 44 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3d2d6a69/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 3a5a8a7..7134f26 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -83,15 +83,17 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
final org.apache.commons.configuration.Configuration apacheConfiguration = new HadoopConfiguration(this.hadoopGraph.configuration());
apacheConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(GraphComputer.Persist.EDGES));
final Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(apacheConfiguration);
- if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
- try {
- final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
- apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
- hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
+ if (null == hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, null)) {
+ if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
+ try {
+ final String inputLocation = FileSystem.get(hadoopConfiguration).getFileStatus(new Path(hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION))).getPath().toString();
+ apacheConfiguration.setProperty(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+ hadoopConfiguration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, inputLocation);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
}
- }
+ } // else WARN that both an INPUT_FORMAT and INPUT_RDD_NAME were provided?
// create the completable future
return CompletableFuture.<ComputerResult>supplyAsync(() -> {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3d2d6a69/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
index 87bbf2b..e19d89f 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/io/SparkContextPersistenceTest.java
@@ -30,6 +30,8 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
+import org.apache.tinkerpop.gremlin.process.computer.bulkloading.BulkLoaderVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
@@ -37,10 +39,13 @@ import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkHadoopGraphProvider;
import org.apache.tinkerpop.gremlin.spark.process.computer.util.SparkHelper;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.IoCore;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
import org.junit.Test;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -56,7 +61,6 @@ public class SparkContextPersistenceTest {
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
@@ -89,19 +93,34 @@ public class SparkContextPersistenceTest {
@Test
public void testBulkLoaderVertexProgramChain() throws Exception {
- final Configuration configuration = new BaseConfiguration();
- configuration.setProperty("spark.master", "local[4]");
- configuration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
- configuration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
- configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
- configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
- configuration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
- configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
- configuration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
- Graph graph = GraphFactory.open(configuration);
+ final Configuration readConfiguration = new BaseConfiguration();
+ readConfiguration.setProperty("spark.master", "local[4]");
+ readConfiguration.setProperty("spark.serializer", "org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer");
+ readConfiguration.setProperty(Graph.GRAPH, HadoopGraph.class.getName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, SparkHadoopGraphProvider.PATHS.get("tinkerpop-modern.kryo"));
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, NullOutputFormat.class.getCanonicalName());
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, "target/test-output");
+ readConfiguration.setProperty(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ readConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, "a-random-name-for-testing");
+ readConfiguration.setProperty(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true);
+ Graph graph = GraphFactory.open(readConfiguration);
- //graph.compute().program(PageRankVertexProgram.build().create(graph)).submit().get().graph().compute().workers(1).program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(conf).create(graph)).submit().get()
+ ///////////////
+ final Configuration writeConfiguration = new BaseConfiguration();
+ writeConfiguration.setProperty(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph");
+ writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_FORMAT, "gryo");
+ writeConfiguration.setProperty(TinkerGraph.CONFIG_GRAPH_LOCATION, "target/test-output/tinkergraph.kryo");
+ final Graph secondGraph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().create(graph)).submit().get().graph();
+ secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD_NAME, "a-random-name-for-testing");
+ secondGraph.configuration().setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD_NAME, null);
+ secondGraph.compute(SparkGraphComputer.class)
+ .persist(GraphComputer.Persist.NOTHING)
+ .workers(1)
+ .program(BulkLoaderVertexProgram.build().userSuppliedIds(true).writeGraph(writeConfiguration).create(secondGraph))
+ .submit().get();
+ final Graph finalGraph = TinkerGraph.open();
+ finalGraph.io(IoCore.gryo()).readGraph("target/test-output/tinkergraph.kryo");
+ assertEquals(6l,finalGraph.traversal().V().count().next().longValue());
}
}