You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/01/19 20:02:12 UTC

[09/11] incubator-tinkerpop git commit: cleaned up SparkGraphComputer file handling a bit. Just better code organization and no so many configuration.get() calls. Fixed a bug in PersitedInputOutputRDDTest. Moved the loadJars() line closer to context crea

cleaned up SparkGraphComputer file handling a bit. Just better code organization and no so many configuration.get() calls. Fixed a bug in PersitedInputOutputRDDTest. Moved the loadJars() line closer to context creation (@dkuppitz). CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/1670e08f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/1670e08f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/1670e08f

Branch: refs/heads/TINKERPOP-998
Commit: 1670e08fb76387a52cbae3fb91e4789ab78b7e5c
Parents: cf2e3b1
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Sat Jan 16 10:18:41 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Sat Jan 16 10:18:53 2016 -0700

----------------------------------------------------------------------
 .../hadoop/structure/io/InputOutputHelper.java  |  4 +--
 .../process/computer/SparkGraphComputer.java    | 31 ++++++++++++--------
 .../io/PersistedInputOutputRDDTest.java         | 11 +++----
 3 files changed, 25 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1670e08f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
index 48c2ad4..11c579e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
@@ -80,11 +80,9 @@ public final class InputOutputHelper {
             newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopConfiguration.getOutputLocation());
             if (hadoopConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
                 newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopConfiguration.getGraphOutputFormat()).getCanonicalName());
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
             newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, persist.equals(GraphComputer.Persist.EDGES));
-        } else {
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
         }
+        newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");
         return HadoopGraph.open(newConfiguration);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1670e08f/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index a87f95f..bb7e8bb 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -57,6 +57,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
 import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
 
 import java.io.File;
 import java.io.IOException;
@@ -131,14 +132,18 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
         // create the completable future
         return CompletableFuture.<ComputerResult>supplyAsync(() -> {
             final long startTime = System.currentTimeMillis();
+            final Storage fileSystemStorage = FileSystemStorage.open(hadoopConfiguration);
+            final Storage sparkContextStorage = SparkContextStorage.open(apacheConfiguration);
+            final boolean outputToHDFS = FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, Object.class));
+            final boolean outputToSpark = PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Object.class));
             SparkMemory memory = null;
             // delete output location
             final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, null);
-            try {
-                if (null != outputLocation && FileSystem.get(hadoopConfiguration).exists(new Path(outputLocation)))
-                    FileSystem.get(hadoopConfiguration).delete(new Path(outputLocation), true);
-            } catch (final IOException e) {
-                throw new IllegalStateException(e.getMessage(), e);
+            if (null != outputLocation) {
+                if (outputToHDFS && fileSystemStorage.exists(outputLocation))
+                    fileSystemStorage.rm(outputLocation);
+                if (outputToSpark && sparkContextStorage.exists(outputLocation))
+                    sparkContextStorage.rm(outputLocation);
             }
             // wire up a spark context
             final SparkConf sparkConfiguration = new SparkConf();
@@ -149,10 +154,9 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
             // execute the vertex program and map reducers and if there is a failure, auto-close the spark context
             try {
                 final JavaSparkContext sparkContext = new JavaSparkContext(SparkContext.getOrCreate(sparkConfiguration));
+                this.loadJars(sparkContext, hadoopConfiguration); // add the project jars to the cluster
                 Spark.create(sparkContext.sc()); // this is the context RDD holder that prevents GC
                 updateLocalConfiguration(sparkContext, sparkConfiguration);
-                // add the project jars to the cluster
-                this.loadJars(sparkContext, hadoopConfiguration);
                 // create a message-passing friendly rdd from the input rdd
                 JavaPairRDD<Object, VertexWritable> graphRDD;
                 try {
@@ -242,14 +246,15 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 }
 
                 // unpersist the graphRDD if it will no longer be used
-                if (!PersistedOutputRDD.class.equals(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)) || this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                if (!outputToSpark || this.persist.equals(GraphComputer.Persist.NOTHING))
                     graphRDD.unpersist();
-                    if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
-                        SparkContextStorage.open().rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
-                }
                 // delete any file system output if persist nothing
-                if (FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, FileInputFormat.class)) && this.persist.equals(GraphComputer.Persist.NOTHING))
-                    FileSystemStorage.open(hadoopConfiguration).rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+                if (null != outputLocation && this.persist.equals(GraphComputer.Persist.NOTHING)) {
+                    if (outputToHDFS)
+                        fileSystemStorage.rm(outputLocation);
+                    if (outputToSpark)
+                        sparkContextStorage.rm(outputLocation);
+                }
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);
                 return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1670e08f/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
index 5076e0b..895df01 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputOutputRDDTest.java
@@ -255,6 +255,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         Spark.create("local[4]");
 
         final String rddName = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD");
+        final String rddName2 = TestHelper.makeTestDataDirectory(PersistedInputOutputRDDTest.class, "testComplexChain", "graphRDD2");
         final Configuration configuration = new BaseConfiguration();
         configuration.setProperty("spark.master", "local[4]");
         configuration.setProperty("spark.serializer", GryoSerializer.class.getCanonicalName());
@@ -280,7 +281,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, PersistedInputRDD.class.getCanonicalName());
         configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, rddName);
         configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
-        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName);
+        configuration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, rddName2);
         ////
         graph = GraphFactory.open(configuration);
         graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.EDGES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
@@ -290,7 +291,7 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
         assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
         ////
-        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
         ////
         graph = GraphFactory.open(configuration);
         graph = graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.VERTEX_PROPERTIES).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
@@ -300,18 +301,18 @@ public class PersistedInputOutputRDDTest extends AbstractSparkTest {
         assertEquals(6l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
         assertEquals(6l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
         ////
-        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertTrue(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
         ////
         graph = GraphFactory.open(configuration);
         graph.compute(SparkGraphComputer.class).persist(GraphComputer.Persist.NOTHING).program(PageRankVertexProgram.build().iterations(2).create(graph)).submit().get().graph();
-        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
         g = graph.traversal();
         assertEquals(0l, g.V().count().next().longValue());
         assertEquals(0l, g.E().count().next().longValue());
         assertEquals(0l, g.V().values(PageRankVertexProgram.PAGE_RANK).count().next().longValue());
         assertEquals(0l, g.V().values(PageRankVertexProgram.EDGE_COUNT).count().next().longValue());
         ////
-        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName)));
+        assertFalse(Spark.hasRDD(Constants.getGraphLocation(rddName2)));
         Spark.close();
     }
 }