You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2016/01/11 17:49:14 UTC
[06/30] incubator-tinkerpop git commit: lots more clean up, tests,
and organization. She is a real beauty.
lots more clean up, tests, and organization. She is a real beauty.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/3fff8f54
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/3fff8f54
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/3fff8f54
Branch: refs/heads/TINKERPOP-320
Commit: 3fff8f546501d10a4c1d34762a626a2493e758be
Parents: b4d8e96
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 9 16:57:28 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 9 16:57:28 2015 -0700
----------------------------------------------------------------------
.../GiraphHadoopGremlinPluginIntegrateTest.java | 33 --------------------
.../GiraphHadoopGremlinPluginIntegrateTest.java | 33 ++++++++++++++++++++
.../tinkerpop/gremlin/structure/io/Storage.java | 12 +++----
.../hadoop/structure/io/FileSystemStorage.java | 4 +--
.../groovy/plugin/FileSystemStorageCheck.java | 4 +--
.../groovy/plugin/HadoopGremlinPluginCheck.java | 4 +--
.../process/computer/SparkGraphComputer.java | 7 ++---
.../spark/structure/io/InputFormatRDD.java | 6 ++--
.../spark/structure/io/OutputFormatRDD.java | 6 ++--
.../spark/structure/io/PersistedInputRDD.java | 9 ++----
.../spark/structure/io/PersistedOutputRDD.java | 16 +++++-----
.../spark/structure/io/SparkContextStorage.java | 18 +++++------
.../structure/io/SparkContextStorageTest.java | 4 +--
13 files changed, 75 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
deleted file mode 100644
index 947b776..0000000
--- a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/GiraphHadoopGremlinPluginIntegrateTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.giraph.process.computer.groovy;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphHadoopGraphProvider;
-import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopPluginSuite;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(HadoopPluginSuite.class)
-@GraphProviderClass(provider = GiraphHadoopGraphProvider.class, graph = HadoopGraph.class)
-public class GiraphHadoopGremlinPluginIntegrateTest {
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/plugin/GiraphHadoopGremlinPluginIntegrateTest.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/plugin/GiraphHadoopGremlinPluginIntegrateTest.java b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/plugin/GiraphHadoopGremlinPluginIntegrateTest.java
new file mode 100644
index 0000000..4660c42
--- /dev/null
+++ b/giraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/giraph/process/computer/groovy/plugin/GiraphHadoopGremlinPluginIntegrateTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.giraph.process.computer.groovy.plugin;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.giraph.process.computer.GiraphHadoopGraphProvider;
+import org.apache.tinkerpop.gremlin.hadoop.groovy.plugin.HadoopPluginSuite;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(HadoopPluginSuite.class)
+@GraphProviderClass(provider = GiraphHadoopGraphProvider.class, graph = HadoopGraph.class)
+public class GiraphHadoopGremlinPluginIntegrateTest {
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
index 3b69ff2..b6e6ebe 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
@@ -50,15 +50,15 @@ public interface Storage {
return this.head(location, Integer.MAX_VALUE);
}
- public Iterator<Vertex> headGraph(final String location, final int totalLines, final Class parserClass);
+ public Iterator<Vertex> head(final String location, final Class parserClass, final int totalLines);
- public default Iterator<Vertex> headGraph(final String location, final Class parserClass) {
- return this.headGraph(location, Integer.MAX_VALUE, parserClass);
+ public default Iterator<Vertex> head(final String location, final Class parserClass) {
+ return this.head(location, parserClass, Integer.MAX_VALUE);
}
- public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final int totalLines, final Class parserClass);
+ public <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class parserClass, final int totalLines);
- public default <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final Class parserClass) {
- return this.headMemory(location, memoryKey, Integer.MAX_VALUE, parserClass);
+ public default <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class parserClass) {
+ return this.head(location, memoryKey, parserClass, Integer.MAX_VALUE);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
index 5d3995c..b34f7a3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -161,7 +161,7 @@ public final class FileSystemStorage implements Storage {
}
@Override
- public Iterator<Vertex> headGraph(final String location, final int totalLines, final Class parserClass) {
+ public Iterator<Vertex> head(final String location, final Class parserClass, final int totalLines) {
final org.apache.commons.configuration.Configuration configuration = new BaseConfiguration();
configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getSearchGraphLocation(location, this).get());
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
@@ -176,7 +176,7 @@ public final class FileSystemStorage implements Storage {
}
@Override
- public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final int totalLines, final Class parserClass) {
+ public <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class parserClass, final int totalLines) {
if (!parserClass.equals(SequenceFileInputFormat.class))
throw new IllegalArgumentException("Only " + SequenceFileInputFormat.class.getCanonicalName() + " memories are supported");
final Configuration configuration = new Configuration();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
index a8c5307..b0517ad 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
@@ -57,7 +57,7 @@ public class FileSystemStorageCheck extends AbstractGremlinTest {
// TEST MEMORY PERSISTENCE
assertEquals(2, (int) result.memory().get("clusterCount"));
assertTrue(storage.exists(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")));
- assertEquals(1, IteratorUtils.count(storage.headMemory(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class)));
- assertEquals(2, storage.headMemory(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class).next().getValue());
+ assertEquals(1, IteratorUtils.count(storage.head(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class)));
+ assertEquals(2, storage.head(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class).next().getValue());
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
index b558169..4e4353d 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
@@ -137,7 +137,7 @@ public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
AbstractGremlinProcessTest.checkResults(Arrays.asList("ripple", "lop"), traversal);
assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/m')"));
assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
- final List<KeyValue<Integer, Collection<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.headMemory('target/test-output','m',SequenceFileInputFormat)"));
+ final List<KeyValue<Integer, Collection<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output','m',SequenceFileInputFormat)"));
assertEquals(4, mList.size());
mList.forEach(keyValue -> {
if (keyValue.getKey().equals(29))
@@ -151,7 +151,7 @@ public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
else
throw new IllegalStateException("The provided key/value is unknown: " + keyValue);
});
- final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.headMemory('target/test-output/'," + "'" + TraverserMapReduce.TRAVERSERS + "',SequenceFileInputFormat)"));
+ final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/'," + "'" + TraverserMapReduce.TRAVERSERS + "',SequenceFileInputFormat)"));
assertEquals(2, traversersList.size());
traversersList.forEach(keyValue -> {
assertEquals(MapReduce.NullObject.instance(), keyValue.getKey());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index f96fd15..3f7efaa 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -53,6 +53,7 @@ import org.apache.tinkerpop.gremlin.spark.structure.io.InputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputFormatRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.OutputRDD;
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD;
+import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import java.io.File;
import java.io.IOException;
@@ -239,10 +240,8 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// unpersist the graphRDD if it will no longer be used
if (!PersistedOutputRDD.class.equals(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)) || this.persist.equals(GraphComputer.Persist.NOTHING)) {
graphRDD.unpersist();
- if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)) {
- Spark.removeRDD(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
- Spark.removeRDD(Constants.getGraphLocation(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
- }
+ if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
+ SparkContextStorage.open().rmr(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
}
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
index 12a8268..57d7080 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/InputFormatRDD.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
@@ -39,6 +40,7 @@ public final class InputFormatRDD implements InputRDD {
@Override
public JavaPairRDD<Object, VertexWritable> readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
+ hadoopConfiguration.set(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), FileSystemStorage.open(hadoopConfiguration)).get());
return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
@@ -49,11 +51,11 @@ public final class InputFormatRDD implements InputRDD {
@Override
public <K, V> JavaPairRDD<K, V> readMemoryRDD(final Configuration configuration, final String memoryKey, final JavaSparkContext sparkContext) {
final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
- // use FileInput location
+ hadoopConfiguration.set(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), memoryKey));
return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
SequenceFileInputFormat.class,
ObjectWritable.class,
ObjectWritable.class)
- .mapToPair(tuple -> new Tuple2<>((K) ((Tuple2<ObjectWritable,ObjectWritable>)tuple)._1().get(), (V) ((Tuple2<ObjectWritable,ObjectWritable>)tuple)._2().get()));
+ .mapToPair(tuple -> new Tuple2<>((K) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._1().get(), (V) ((Tuple2<ObjectWritable, ObjectWritable>) tuple)._2().get()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
index 92158af..edb541b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/OutputFormatRDD.java
@@ -49,7 +49,7 @@ public final class OutputFormatRDD implements OutputRDD {
if (null != outputLocation) {
// map back to a <nullwritable,vertexwritable> stream for output
graphRDD.mapToPair(tuple -> new Tuple2<>(NullWritable.get(), tuple._2()))
- .saveAsNewAPIHadoopFile(outputLocation + "/" + Constants.HIDDEN_G,
+ .saveAsNewAPIHadoopFile(Constants.getGraphLocation(outputLocation),
NullWritable.class,
VertexWritable.class,
(Class<OutputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, OutputFormat.class), hadoopConfiguration);
@@ -62,12 +62,12 @@ public final class OutputFormatRDD implements OutputRDD {
final String outputLocation = hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
if (null != outputLocation) {
// map back to a Hadoop stream for output
- memoryRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(outputLocation + "/" + memoryKey,
+ memoryRDD.mapToPair(keyValue -> new Tuple2<>(new ObjectWritable<>(keyValue._1()), new ObjectWritable<>(keyValue._2()))).saveAsNewAPIHadoopFile(Constants.getMemoryLocation(outputLocation, memoryKey),
ObjectWritable.class,
ObjectWritable.class,
SequenceFileOutputFormat.class, hadoopConfiguration);
try {
- return (Iterator) new ObjectWritableIterator(hadoopConfiguration, new Path(outputLocation + "/" + memoryKey));
+ return (Iterator) new ObjectWritableIterator(hadoopConfiguration, new Path(Constants.getMemoryLocation(outputLocation, memoryKey)));
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
index 55bf53b..d926686 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedInputRDD.java
@@ -37,18 +37,13 @@ public final class PersistedInputRDD implements InputRDD {
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
Spark.create(sparkContext.sc());
- final String inputLocation = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
- final String graphRDDName = Spark.hasRDD(inputLocation) ? inputLocation : Constants.getGraphLocation(inputLocation);
- return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(graphRDDName).toJavaRDD());
+ return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), SparkContextStorage.open(sparkContext.sc())).get()).toJavaRDD());
}
@Override
public <K, V> JavaPairRDD<K, V> readMemoryRDD(final Configuration configuration, final String memoryKey, final JavaSparkContext sparkContext) {
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_INPUT_LOCATION + " to read the persisted RDD from");
- final String inputLocation = configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION);
- final String memoryRDDName = Spark.hasRDD(inputLocation) ? inputLocation : Constants.getMemoryLocation(inputLocation, memoryKey);
- Spark.create(sparkContext.sc());
- return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(memoryRDDName).toJavaRDD());
+ return JavaPairRDD.fromJavaRDD((JavaRDD) Spark.getRDD(Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_INPUT_LOCATION), memoryKey)).toJavaRDD());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index b78caa9..7833701 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -44,17 +44,15 @@ public final class PersistedOutputRDD implements OutputRDD {
LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
- final String graphRDDName = Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
- Spark.removeRDD(graphRDDName); // this might be bad cause it unpersists the job RDD
- Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), SparkContextStorage.open(configuration)).ifPresent(Spark::removeRDD); // this might be bad cause it unpersists the job RDD
+ SparkContextStorage.open(configuration).rmr(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
vertex.get().dropEdges();
return vertex;
- }).setName(graphRDDName).cache();
+ }).setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).cache();
else
- graphRDD.setName(graphRDDName).cache();
- Spark.refresh();
+ graphRDD.setName(Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))).cache();
+ Spark.refresh(); // necessary to do really fast so the Spark GC doesn't clear out the RDD
}
@Override
@@ -63,9 +61,9 @@ public final class PersistedOutputRDD implements OutputRDD {
LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
- final String sideEffectRDDName = configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + memoryKey;
- Spark.removeRDD(sideEffectRDDName);
- memoryRDD.setName(sideEffectRDDName).cache();
+ final String memoryRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
+ Spark.removeRDD(memoryRDDName);
+ memoryRDD.setName(memoryRDDName).cache();
return IteratorUtils.map(memoryRDD.toLocalIterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index 887e2f9..2ea0fc3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -85,12 +85,12 @@ public final class SparkContextStorage implements Storage {
@Override
public boolean mkdir(final String location) {
- throw new UnsupportedOperationException("This operation does not make sense for a persited SparkContext");
+ throw new UnsupportedOperationException("This operation does not make sense for a persisted SparkContext");
}
@Override
public boolean cp(final String fromLocation, final String toLocation) {
- Spark.getRDD(fromLocation).setName(toLocation).cache();
+ Spark.getRDD(fromLocation).setName(toLocation).cache().count();
Spark.removeRDD(fromLocation);
return true;
}
@@ -111,7 +111,7 @@ public final class SparkContextStorage implements Storage {
@Override
public boolean rmr(final String location) {
final List<String> rdds = new ArrayList<>();
- final String wildCardLocation = location.replace(".", "\\.").replace("*", ".*");
+ final String wildCardLocation = (location.endsWith("*") ? location : location + "*").replace(".", "\\.").replace("*", ".*");
for (final RDD<?> rdd : Spark.getRDDs()) {
if (rdd.name().matches(wildCardLocation))
rdds.add(rdd.name());
@@ -121,9 +121,9 @@ public final class SparkContextStorage implements Storage {
}
@Override
- public Iterator<Vertex> headGraph(final String location, int totalLines, final Class parserClass) {
+ public Iterator<Vertex> head(final String location, final Class parserClass, final int totalLines) {
final Configuration configuration = new BaseConfiguration();
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getSearchGraphLocation(location, this).get());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, location);
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, parserClass.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
try {
@@ -135,13 +135,13 @@ public final class SparkContextStorage implements Storage {
} catch (final Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
- throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or a " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
+ throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or an " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
}
@Override
- public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, int totalLines, Class parserClass) {
+ public <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class parserClass, final int totalLines) {
final Configuration configuration = new BaseConfiguration();
- configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getMemoryLocation(location, memoryKey));
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, location);
configuration.setProperty(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, parserClass.getCanonicalName());
configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
try {
@@ -153,7 +153,7 @@ public final class SparkContextStorage implements Storage {
} catch (final Exception e) {
throw new IllegalArgumentException(e.getMessage(), e);
}
- throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or a " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
+ throw new IllegalArgumentException("The provided parserClass must be an " + InputFormat.class.getCanonicalName() + " or an " + InputRDD.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/3fff8f54/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
index 43e8508..8c99c9e 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageTest.java
@@ -59,7 +59,7 @@ public class SparkContextStorageTest extends AbstractSparkTest {
assertEquals(2, storage.ls().size());
// TEST GRAPH PERSISTENCE
assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
- assertEquals(6, IteratorUtils.count(storage.headGraph(outputLocation, PersistedInputRDD.class)));
+ assertEquals(6, IteratorUtils.count(storage.head(outputLocation, PersistedInputRDD.class)));
assertEquals(6, result.graph().traversal().V().count().next().longValue());
assertEquals(0, result.graph().traversal().E().count().next().longValue());
assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
@@ -68,7 +68,7 @@ public class SparkContextStorageTest extends AbstractSparkTest {
// TEST MEMORY PERSISTENCE
assertEquals(2, (int) result.memory().get("clusterCount"));
assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
- assertEquals(2, storage.headMemory(outputLocation, "clusterCount", PersistedInputRDD.class).next().getValue());
+ assertEquals(2, storage.head(outputLocation, "clusterCount", PersistedInputRDD.class).next().getValue());
}
}