You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/12 20:01:17 UTC
incubator-tinkerpop git commit: HadoopGraph is smart about toString()
with RDDs. Did some cleanup/simplification to SparkGraphComputer. Made it so
we don't get the WARN messages for having both RDD and Formats declared in
test suite by updated the SparkH
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1073 [created] ac08360e7
HadoopGraph is smart about toString() with RDDs. Did some cleanup/simplification to SparkGraphComputer. Made it so we don't get the WARN messages for having both RDD and Formats declared in test suite by updated the SparkHadoopGraphProvider. Fixed up a bunch of awkward issues around double declaration of input_format and input_rdd. I think in the future (3.2.0) we should just have ONE declaration -- InputFormat that is for both RDDs and Formats.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ac08360e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ac08360e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ac08360e
Branch: refs/heads/TINKERPOP-1073
Commit: ac08360e72be41a0519ab1da0dace9c628d4f040
Parents: 1d36fea
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 12 12:01:05 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 12 12:01:05 2016 -0700
----------------------------------------------------------------------
.../computer/AbstractHadoopGraphComputer.java | 28 ++++++------
.../hadoop/structure/HadoopConfiguration.java | 4 +-
.../gremlin/hadoop/structure/HadoopGraph.java | 19 +++++---
.../gremlin/hadoop/HadoopGraphProvider.java | 1 +
.../hadoop/structure/HadoopGraphTest.java | 48 ++++++++++++++++++++
.../process/computer/SparkGraphComputer.java | 15 +++---
.../spark/structure/io/InputOutputHelper.java | 10 ++--
.../spark/structure/io/PersistedOutputRDD.java | 9 +++-
.../computer/SparkHadoopGraphProvider.java | 6 ++-
.../spark/structure/io/ToyGraphInputRDD.java | 12 ++---
10 files changed, 112 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index 1553240..02d23f6 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -18,13 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
@@ -161,18 +156,21 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
@Override
public boolean supportsResultGraphPersistCombination(final ResultGraph resultGraph, final Persist persist) {
- if (hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT)) {
- final OutputFormat<NullWritable, VertexWritable> outputFormat = ReflectionUtils.newInstance(hadoopGraph.configuration().getGraphOutputFormat(), ConfUtil.makeHadoopConfiguration(hadoopGraph.configuration()));
- if (outputFormat instanceof PersistResultGraphAware)
- return ((PersistResultGraphAware) outputFormat).supportsResultGraphPersistCombination(resultGraph, persist);
- else {
- logger.warn(outputFormat.getClass() + " does not implement " + PersistResultGraphAware.class.getSimpleName() + " and thus, persistence options are unknown -- assuming all options are possible");
- return true;
+ final String outputClass = hadoopGraph.configuration().containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) ?
+ hadoopGraph.configuration().getString(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) :
+ hadoopGraph.configuration().containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) ?
+ hadoopGraph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) : null;
+ if (null != outputClass) {
+ try {
+ final Object outputObject = Class.forName(outputClass).newInstance();
+ if (outputObject instanceof PersistResultGraphAware)
+ return ((PersistResultGraphAware) outputObject).supportsResultGraphPersistCombination(resultGraph, persist);
+ } catch (final Exception e) {
+ // do nothing -- unknown output class so assume everything works
}
- } else {
- logger.warn("Unknown OutputFormat class and thus, persistence options are unknown -- assuming all options are possible");
- return true;
}
+ logger.warn("Unknown output class and thus, persistence options are unknown -- assuming all options are possible");
+ return true;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 244ead9..2453829 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -84,7 +84,9 @@ public final class HadoopConfiguration extends AbstractConfiguration implements
public Class<InputFormat<NullWritable, VertexWritable>> getGraphInputFormat() {
try {
- return (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT));
+ return this.containsKey(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD) ?
+ (Class) Class.forName("org.apache.tinkerpop.gremlin.spark.structure.io.InputRDDFormat") :
+ (Class) Class.forName(this.getString(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT));
} catch (final ClassNotFoundException e) {
throw new RuntimeException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index 22f42f4..7273a6e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -235,14 +235,21 @@ public final class HadoopGraph implements Graph {
return this.configuration;
}
+ @Override
public String toString() {
final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(this.configuration);
- final String fromString = this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT) ?
- hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class).getSimpleName() :
- "no-input";
- final String toString = this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) ?
- hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class).getSimpleName() :
- "no-output";
+ final String fromString =
+ this.configuration.containsKey(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD) ?
+ hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, Object.class).getSimpleName() :
+ this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT) ?
+ hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class).getSimpleName() :
+ "no-input";
+ final String toString =
+ this.configuration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD) ?
+ hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Object.class).getSimpleName() :
+ this.configuration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT) ?
+ hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class).getSimpleName() :
+ "no-output";
return StringFactory.graphString(this, fromString.toLowerCase() + "->" + toString.toLowerCase());
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
index 90f5132..2e79bad 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/HadoopGraphProvider.java
@@ -108,6 +108,7 @@ public class HadoopGraphProvider extends AbstractGraphProvider {
put(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GryoOutputFormat.class.getCanonicalName());
put(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, getWorkingDirectory());
put(Constants.GREMLIN_HADOOP_JARS_IN_DISTRIBUTED_CACHE, false);
+ if (null != loadGraphWith) put("gremlin.hadoop.loadGraphWith", loadGraphWith.name().toLowerCase());
}};
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java
new file mode 100644
index 0000000..16ded44
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraphTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONOutputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat;
+import org.junit.Test;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class HadoopGraphTest {
+
+ @Test
+ public void shouldHaveProperToString() {
+ // the overwrites are what important not that these are legal RDD and Format classes
+ final Configuration configuration = new BaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, GryoInputFormat.class.getCanonicalName());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, GraphSONOutputFormat.class.getCanonicalName());
+ assertTrue(HadoopGraph.open(configuration).toString().contains((GryoInputFormat.class.getSimpleName() + "->" + GraphSONOutputFormat.class.getSimpleName()).toLowerCase()));
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, String.class.getCanonicalName());
+ assertTrue(HadoopGraph.open(configuration).toString().contains((String.class.getSimpleName() + "->" + GraphSONOutputFormat.class.getSimpleName()).toLowerCase()));
+ configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, Long.class.getCanonicalName());
+ assertTrue(HadoopGraph.open(configuration).toString().contains((String.class.getSimpleName() + "->" + Long.class.getSimpleName()).toLowerCase()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index a87f95f..40cc923 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -55,6 +55,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
@@ -107,7 +108,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
@Override
public Future<ComputerResult> submit() {
this.validateStatePriorToExecution();
-
return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "SparkSubmitter");
}
@@ -194,7 +194,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
}
}
// write the graph rdd using the output rdd
- final String[] elementComputeKeys = this.vertexProgram == null ? new String[0] : this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
+ final String[] elementComputeKeys = this.vertexProgram.getElementComputeKeys().toArray(new String[this.vertexProgram.getElementComputeKeys().size()]);
graphRDD = SparkExecutor.prepareFinalGraphRDD(graphRDD, viewIncomingRDD, elementComputeKeys);
if ((hadoopConfiguration.get(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, null) != null ||
hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null) != null) &&
@@ -244,12 +244,15 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// unpersist the graphRDD if it will no longer be used
if (!PersistedOutputRDD.class.equals(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)) || this.persist.equals(GraphComputer.Persist.NOTHING)) {
graphRDD.unpersist();
- if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
+ }
+ // remove all output rdds and files if they will no longer be used
+ if (this.persist.equals(GraphComputer.Persist.NOTHING) && apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)) {
+ if (PersistedOutputRDD.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedInputRDD.class)))
SparkContextStorage.open().rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+ // delete any file system output if persist nothing
+ if (FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, FileInputFormat.class)))
+ FileSystemStorage.open(hadoopConfiguration).rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
}
- // delete any file system output if persist nothing
- if (FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, FileInputFormat.class)) && this.persist.equals(GraphComputer.Persist.NOTHING))
- FileSystemStorage.open(hadoopConfiguration).rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
index 4753028..0beb748 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputOutputHelper.java
@@ -67,10 +67,12 @@ public final class InputOutputHelper {
final HadoopConfiguration hadoopConfiguration = new HadoopConfiguration(configuration);
final BaseConfiguration newConfiguration = new BaseConfiguration();
newConfiguration.copy(org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper.getOutputGraph(configuration, resultGraph, persist).configuration());
- if (resultGraph.equals(GraphComputer.ResultGraph.NEW) && hadoopConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
- newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputRDDFormat.class.getCanonicalName());
- //newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputRDDFormat.class.getCanonicalName());
- newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getString(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD))).getCanonicalName());
+ if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
+ if (newConfiguration.containsKey(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD)) {
+ newConfiguration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputOutputHelper.getInputFormat((Class) Class.forName(hadoopConfiguration.getString(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD))).getCanonicalName());
+ } else {
+ newConfiguration.clearProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD);
+ }
}
return HadoopGraph.open(newConfiguration);
} catch (final ClassNotFoundException e) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index 4ae6248..b1b8b46 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -23,7 +23,9 @@ import org.apache.commons.configuration.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.storage.StorageLevel;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.process.computer.PersistResultGraphAware;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -35,7 +37,7 @@ import java.util.Iterator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class PersistedOutputRDD implements OutputRDD {
+public final class PersistedOutputRDD implements OutputRDD, PersistResultGraphAware {
private static final Logger LOGGER = LoggerFactory.getLogger(PersistedOutputRDD.class);
@@ -70,4 +72,9 @@ public final class PersistedOutputRDD implements OutputRDD {
Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
return IteratorUtils.map(memoryRDD.toLocalIterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
}
+
+ @Override
+ public boolean supportsResultGraphPersistCombination(final GraphComputer.ResultGraph resultGraph, final GraphComputer.Persist persist) {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
index 108d0ed..c09185b 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkHadoopGraphProvider.java
@@ -25,8 +25,9 @@ import org.apache.tinkerpop.gremlin.hadoop.HadoopGraphProvider;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorageCheck;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
import org.apache.tinkerpop.gremlin.process.traversal.engine.ComputerTraversalEngine;
-import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
+import org.apache.tinkerpop.gremlin.spark.structure.io.InputOutputHelper;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorageCheck;
import org.apache.tinkerpop.gremlin.spark.structure.io.ToyGraphInputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer;
import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -48,12 +49,15 @@ public final class SparkHadoopGraphProvider extends HadoopGraphProvider {
config.put(Constants.GREMLIN_SPARK_PERSIST_CONTEXT, true); // this makes the test suite go really fast
if (!test.equals(FileSystemStorageCheck.class) && null != loadGraphWith && RANDOM.nextBoolean()) {
config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName());
+ config.remove(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT);
}
// tests persisted RDDs
if (test.equals(SparkContextStorageCheck.class)) {
config.put(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, ToyGraphInputRDD.class.getCanonicalName());
config.put(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, PersistedOutputRDD.class.getCanonicalName());
+ config.remove(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT);
+ config.remove(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT);
}
/// spark configuration
config.put("spark.master", "local[4]");
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac08360e/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
index ea3636f..677e6d0 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/ToyGraphInputRDD.java
@@ -22,7 +22,6 @@ package org.apache.tinkerpop.gremlin.spark.structure.io;
import org.apache.commons.configuration.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopPools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.structure.Graph;
@@ -46,15 +45,16 @@ public final class ToyGraphInputRDD implements InputRDD {
@Override
public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
+ final String loadGraphWith = configuration.getString("gremlin.hadoop.loadGraphWith", "nothing");
HadoopPools.initialize(TinkerGraph.open().configuration());
final List<VertexWritable> vertices;
- if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("modern"))
+ if (loadGraphWith.contains("modern"))
vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createModern().vertices(), VertexWritable::new));
- else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("classic"))
+ else if (loadGraphWith.contains("classic"))
vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createClassic().vertices(), VertexWritable::new));
- else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("crew"))
+ else if (loadGraphWith.contains("crew"))
vertices = IteratorUtils.list(IteratorUtils.map(TinkerFactory.createTheCrew().vertices(), VertexWritable::new));
- else if (configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION).contains("grateful")) {
+ else if (loadGraphWith.contains("grateful")) {
try {
final Graph graph = TinkerGraph.open();
final GraphReader reader = GryoReader.build().mapper(graph.io(GryoIo.build()).mapper().create()).create();
@@ -66,7 +66,7 @@ public final class ToyGraphInputRDD implements InputRDD {
throw new IllegalStateException(e.getMessage(), e);
}
} else
- throw new IllegalArgumentException("No legal toy graph was provided to load: " + configuration.getProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+ throw new IllegalArgumentException("No legal toy graph was provided to load: " + loadGraphWith);
return sparkContext.parallelize(vertices).mapToPair(vertex -> new Tuple2<>(vertex.get().id(), vertex));
}