You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/06 23:58:25 UTC
incubator-tinkerpop git commit: added documentation, upgrade docs,
JavaDoc, more test cases,
and fixed up some random inconsistencies in BulkLoaderVertexProgram
documentation examples.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1033 b0f3e4a96 -> 5c9e81b0c
added documentation, upgrade docs, JavaDoc, more test cases, and fixed up some random inconsistencies in BulkLoaderVertexProgram documentation examples.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5c9e81b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5c9e81b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5c9e81b0
Branch: refs/heads/TINKERPOP-1033
Commit: 5c9e81b0cebd8c3841e2442a8ef13b3d23d44295
Parents: b0f3e4a
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 6 15:58:18 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 6 15:58:18 2016 -0700
----------------------------------------------------------------------
CHANGELOG.asciidoc | 5 +
docs/src/reference/implementations.asciidoc | 107 +++++++++----------
.../upgrade/release-3.1.x-incubating.asciidoc | 24 +++++
.../process/computer/GiraphGraphComputer.java | 13 ++-
.../tinkerpop/gremlin/structure/io/Storage.java | 96 ++++++++++++++---
.../conf/hadoop-grateful-gryo.properties | 6 +-
.../hadoop/structure/HadoopConfiguration.java | 5 +
.../hadoop/structure/io/FileSystemStorage.java | 16 +--
.../structure/io/AbstractStorageCheck.java | 16 ++-
.../structure/io/FileSystemStorageCheck.java | 4 +-
.../spark/groovy/plugin/SparkGremlinPlugin.java | 1 +
.../process/computer/SparkGraphComputer.java | 4 +-
.../spark/structure/io/PersistedOutputRDD.java | 2 +-
.../spark/structure/io/SparkContextStorage.java | 19 +---
.../structure/io/SparkContextStorageCheck.java | 4 +-
15 files changed, 197 insertions(+), 125 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/CHANGELOG.asciidoc
----------------------------------------------------------------------
diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc
index 43fb4b6..fad0630 100644
--- a/CHANGELOG.asciidoc
+++ b/CHANGELOG.asciidoc
@@ -26,6 +26,11 @@ image::https://raw.githubusercontent.com/apache/incubator-tinkerpop/master/docs/
TinkerPop 3.1.1 (NOT OFFICIALLY RELEASED YET)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+* It is possible to completely avoid using HDFS with Spark if `PersistedInputRDD` and `PersistedOutpuRDD` are leveraged.
+* `InputRDD` and `OutputRDD` can now process both graphs and memory (i.e. sideEffects).
+* Removed Groovy specific meta-programming overloads for handling Hadoop `FileSystem` (instead, its all accessible via `FileSystemStorage`).
+* Added `FileSystemStorage` and `SparkContextStorage` which both implement the new `Storage` API.
+* Added `Storage` to the gremlin-core io-package which providers can implement to allow conventional access to data sources (e.g. `ls()`, `rm()`, `cp()`, etc.).
* Execute the `LifeCycle.beforeEval()` in the same thread that `eval()` is executed in for `GremlinExecutor`.
* Improved error handling of Gremlin Console initialization scripts to better separate errors in initialization script I/O versus execution of the script itself.
* Fixed a bug in `Graph.OptOut` when trying to opt-out of certain test cases with the `method` property set to "*".
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/docs/src/reference/implementations.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/implementations.asciidoc b/docs/src/reference/implementations.asciidoc
index c3a1df6..add8555 100644
--- a/docs/src/reference/implementations.asciidoc
+++ b/docs/src/reference/implementations.asciidoc
@@ -1213,30 +1213,8 @@ Using a Persisted Context
It is possible to persist the graph RDD between jobs within the `SparkContext` (e.g. SparkServer) by leveraging `PersistedOutputRDD`.
Note that `gremlin.spark.persistContext` should be set to `true` or else the persisted RDD will be destroyed when the `SparkContext` closes.
-The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration.
-Similarly, `PersistedInputRDD` is used with respective `gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.
-
-There is a static `spark` object that can be used to manage persisted RDDs much like `hdfs` is used to manage HDFS files (see <<interacting-with-hdfs, Interacting with HDFS>>).
-
-[gremlin-groovy]
-----
-spark.create('local[4]') // the SparkContext location (master)
-graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
-graph.configuration().setProperty('gremlin.spark.persistContext',true)
-graph.configuration().setProperty('gremlin.spark.graphOutputRDD','org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD')
-graph.configuration().setProperty('gremlin.hadoop.outputLocation','pageRankGraph')
-graph.compute(SparkGraphComputer).program(PageRankVertexProgram.build().create()).submit().get()
-spark.ls()
-graph.configuration().setProperty('gremlin.hadoop.outputLocation','peerPressureGraph')
-graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create()).submit().get()
-spark.ls()
-spark.rm('pageRankGraph')
-spark.head('peerPressureGraph')
-spark.describe('peerPressureGraph')
-spark.rm('peerPressureGraph')
-spark.ls()
-spark.close()
-----
+The persisted RDD is named by the `gremlin.hadoop.outputLocation` configuration. Similarly, `PersistedInputRDD` is used with respective
+`gremlin.hadoop.inputLocation` to retrieve the persisted RDD from the `SparkContext`.
When using a persistent `SparkContext` the configuration used by the original Spark Configuration will be inherited by all threaded
references to that Spark Context. The exception to this rule are those properties which have a specific thread local effect.
@@ -1247,6 +1225,8 @@ references to that Spark Context. The exception to this rule are those propertie
. spark.job.interruptOnCancel
. spark.scheduler.pool
+Finally, there is a `spark` object that can be used to manage persisted RDDs (see <<interacting-with-spark, Interacting with Spark>>).
+
Loading with BulkLoaderVertexProgram
++++++++++++++++++++++++++++++++++++
@@ -1256,7 +1236,7 @@ Grateful Dead graph from HadoopGraph into TinkerGraph over Spark:
[gremlin-groovy]
----
-hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
+hdfs.copyFromLocal('data/grateful-dead.kryo', 'grateful-dead.kryo')
readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
writeGraph = 'conf/tinkergraph-gryo.properties'
blvp = BulkLoaderVertexProgram.build().
@@ -1279,10 +1259,8 @@ graph.close()
#
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.inputLocation=data/grateful-dead.kryo
+gremlin.hadoop.inputLocation=grateful-dead.kryo
gremlin.hadoop.outputLocation=output
-gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
#
@@ -1385,7 +1363,7 @@ the Grateful Dead graph from HadoopGraph into TinkerGraph over Giraph:
[gremlin-groovy]
----
-hdfs.copyFromLocal('data/grateful-dead.kryo', 'data/grateful-dead.kryo')
+hdfs.copyFromLocal('data/grateful-dead.kryo', 'grateful-dead.kryo')
readGraph = GraphFactory.open('conf/hadoop/hadoop-grateful-gryo.properties')
writeGraph = 'conf/tinkergraph-gryo.properties'
blvp = BulkLoaderVertexProgram.build().
@@ -1409,10 +1387,8 @@ graph.close()
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.hadoop.mapreduce.lib.output.NullOutputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.inputLocation=data/grateful-dead.kryo
+gremlin.hadoop.inputLocation=grateful-dead.kryo
gremlin.hadoop.outputLocation=output
-gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
#
@@ -1477,12 +1453,14 @@ simple (easy to create and parse).
The data below represents an adjacency list representation of the classic TinkerGraph toy graph in GraphSON format.
[source,json]
+----
{"id":1,"label":"person","outE":{"created":[{"id":9,"inV":3,"properties":{"weight":0.4}}],"knows":[{"id":7,"inV":2,"properties":{"weight":0.5}},{"id":8,"inV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":0,"value":"marko"}],"age":[{"id":1,"value":29}]}}
{"id":2,"label":"person","inE":{"knows":[{"id":7,"outV":1,"properties":{"weight":0.5}}]},"properties":{"name":[{"id":2,"value":"vadas"}],"age":[{"id":3,"value":27}]}}
{"id":3,"label":"software","inE":{"created":[{"id":9,"outV":1,"properties":{"weight":0.4}},{"id":11,"outV":4,"properties":{"weight":0.4}},{"id":12,"outV":6,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":4,"value":"lop"}],"lang":[{"id":5,"value":"java"}]}}
{"id":4,"label":"person","inE":{"knows":[{"id":8,"outV":1,"properties":{"weight":1.0}}]},"outE":{"created":[{"id":10,"inV":5,"properties":{"weight":1.0}},{"id":11,"inV":3,"properties":{"weight":0.4}}]},"properties":{"name":[{"id":6,"value":"josh"}],"age":[{"id":7,"value":32}]}}
{"id":5,"label":"software","inE":{"created":[{"id":10,"outV":4,"properties":{"weight":1.0}}]},"properties":{"name":[{"id":8,"value":"ripple"}],"lang":[{"id":9,"value":"java"}]}}
{"id":6,"label":"person","outE":{"created":[{"id":12,"inV":3,"properties":{"weight":0.2}}]},"properties":{"name":[{"id":10,"value":"peter"}],"age":[{"id":11,"value":35}]}}
+----
[[script-io-format]]
Script I/O Format
@@ -1575,45 +1553,58 @@ def stringify(vertex) {
return [v, outE].join('\t')
}
+
+
+Storage Systems
+~~~~~~~~~~~~~~~
+
+Hadoop-Gremlin provides two implementations of the `Storage` API:
+
+* `FileSystemStorage`: Access HDFS and local file system data.
+* `SparkContextStorage`: Access Spark persisted RDD data.
+
[[interacting-with-hdfs]]
Interacting with HDFS
-~~~~~~~~~~~~~~~~~~~~~
+^^^^^^^^^^^^^^^^^^^^^
The distributed file system of Hadoop is called link:http://en.wikipedia.org/wiki/Apache_Hadoop#Hadoop_distributed_file_system[HDFS].
-The results of any OLAP operation are stored in HDFS accessible via `hdfs`.
+The results of any OLAP operation are stored in HDFS accessible via `hdfs`. For local file system access, there is `local`.
[gremlin-groovy]
----
graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
-g = graph.traversal(computer(SparkGraphComputer))
-:remote connect tinkerpop.hadoop graph g
-:> g.V().group().by{it.value('name')[1]}.by('name')
+graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
hdfs.ls()
hdfs.ls('output')
-hdfs.ls('output/~reducing')
-hdfs.head('output/~reducing', SequenceFileInputFormat)
+hdfs.head('output', GryoInputFormat)
+hdfs.head('output', 'clusterCount', SequenceFileInputFormat)
+hdfs.rm('output')
+hdfs.ls()
----
-A list of the HDFS methods available are itemized below. Note that these methods are also available for the 'local' variable:
+[[interacting-with-spark]]
+Interacting with Spark
+^^^^^^^^^^^^^^^^^^^^^^
-[width="100%",cols="13,10",options="header"]
-|=========================================================
-| Method| Description
-|hdfs.ls(String path)| List the contents of the supplied directory.
-|hdfs.cp(String from, String to)| Copy the specified path to the specified path.
-|hdfs.exists(String path)| Whether the specified path exists.
-|hdfs.rm(String path)| Remove the specified path.
-|hdfs.rmr(String path)| Remove the specified path and its contents recurssively.
-|hdfs.copyToLocal(String from, String to)| Copy the specified HDFS path to the specified local path.
-|hdfs.copyFromLocal(String from, String to)| Copy the specified local path to the specified HDFS path.
-|hdfs.mergeToLocal(String from, String to)| Merge the files in path to the specified local path.
-|hdfs.head(String path)| Display the data in the path as text.
-|hdfs.head(String path, int lineCount)| Text display only the first `lineCount`-number of lines in the path.
-|hdfs.head(String path, int totalKeyValues, Class<InputFormat> inputFormatClass)| Parse and display the data using the InputFormat.
-|hdfs.head(String path, Class<InputFormat> inputFormatClass)| Parse and display the data using the InputFormat.
-|hdfs.head(String path, String memoryKey, Class<InputFormat> inputFormatClass, int totalKeyValues) | Parse and display memory data using the InputFormat.
-|hdfs.head(String path, String memoryKey, Class<InputFormat> inputFormatClass) | Parse and display memory data using the InputFormat.
-|=========================================================
+If a Spark context is persisted, then Spark RDDs will remain the Spark cache and accessible over subsequent jobs.
+RDDs are retrieved and saved to the `SparkContext` via `PersistedInputRDD` and `PersistedOutputRDD` respectivly.
+Persisted RDDs can be accessed using `spark`.
+
+[gremlin-groovy]
+----
+Spark.create('local[4]')
+graph = GraphFactory.open('conf/hadoop/hadoop-gryo.properties')
+graph.configuration().setProperty('gremlin.spark.graphOutputRDD', PersistedOutputRDD.class.getCanonicalName())
+graph.configuration().clearProperty('gremlin.hadoop.graphOutputFormat')
+graph.configuration().setProperty('gremlin.spark.persistContext',true)
+graph.compute(SparkGraphComputer).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey('clusterCount').create()).submit().get();
+spark.ls()
+spark.ls('output')
+spark.head('output', PersistedInputRDD)
+spark.head('output', 'clusterCount', PersistedInputRDD)
+spark.rm('output')
+spark.ls()
+----
A Command Line Example
~~~~~~~~~~~~~~~~~~~~~~
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/docs/src/upgrade/release-3.1.x-incubating.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/upgrade/release-3.1.x-incubating.asciidoc b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
index 8f487a2..f026d02 100644
--- a/docs/src/upgrade/release-3.1.x-incubating.asciidoc
+++ b/docs/src/upgrade/release-3.1.x-incubating.asciidoc
@@ -32,6 +32,30 @@ Please see the link:https://github.com/apache/incubator-tinkerpop/blob/3.1.1-inc
Upgrading for Users
~~~~~~~~~~~~~~~~~~~
+Storage I/O
+^^^^^^^^^^^
+
+The `gremlin-core` io-package now has a `Storage` interface. The methods that were available via `hdfs`
+(e.g. `rm()`, `ls()`, `head()`, etc.) are now part of `Storage`. Both HDFS and Spark implement `Storage` via
+`FileSystemStorage` and `SparkContextStorage`, respectively. `SparkContextStorage` adds support for interacting with
+persisted RDDs in the Spark cache.
+
+This update changed a few of the file handling methods. As it stands, these changes only effect manual Gremlin Console
+usage as HDFS support was previously provided via Groovy meta-programing. Thus, these are not "code-based" breaking changes.
+
+* `hdfs.rmr()` no longer exists. `hdfs.rm()` is now recursive. Simply change all references to `rmr()` to `rm()` for identical behavior.
+* `hdfs.head(location,lines,writableClass)` no longer exists.
+** For graph locations, use `hdfs.head(location,writableClass,lines)`.
+** For memory locations, use `hdfs.head(location,memoryKey,writableClass,lines)`.
+* `hdfs.head(...,ObjectWritable)` no longer exists. Use `SequenceFileInputFormat` as an input format is the parsing class.
+
+Given that HDFS (and now Spark) interactions are possible via `Storage` and no longer via Groovy meta-programming,
+developers can use these `Storage` implementations in their Java code. In fact, `Storage` has greatly simplified
+complex file/RDD operations in both `GiraphGraphComputer` and `SparkGraphComputer`.
+
+See: link:https://issues.apache.org/jira/browse/TINKERPOP-1033[TINKERPOP-1033],
+link:https://issues.apache.org/jira/browse/TINKERPOP-1023[TINKERPOP-1023]
+
Gremlin Server Transaction Management
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 7e3de5e..dfe8e8c 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -117,7 +117,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
@Override
public Future<ComputerResult> submit() {
super.validateStatePriorToExecution();
-
return ComputerSubmissionHelper.runWithBackgroundThread(this::submitWithExecutor, "GiraphSubmitter");
}
@@ -142,7 +141,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
@Override
public int run(final String[] args) {
final Storage storage = FileSystemStorage.open(this.giraphConfiguration);
- storage.rmr(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+ storage.rm(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
try {
// it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
@@ -190,12 +189,12 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
if (iterator.hasNext()) {
this.memory.set(memoryKey, iterator.next().getValue());
}
- storage.rmr(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey));
+ storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey));
}
}
final Path path = new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));
this.memory.setIteration((Integer) new ObjectWritableIterator(this.giraphConfiguration, path).next().getValue());
- storage.rmr(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));
+ storage.rm(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));
}
// do map reduce jobs
this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true));
@@ -204,9 +203,9 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
MapReduceHelper.executeMapReduceJob(mapReduce, this.memory, this.giraphConfiguration);
}
- // if no persistence, delete the map reduce output
+ // if no persistence, delete the graph output
if (this.persist.equals(Persist.NOTHING))
- storage.rmr(Constants.getGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
+ storage.rm(Constants.getGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -244,7 +243,7 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
} catch (final Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
- } catch (Exception e) {
+ } catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
});
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
index d9c6927..1779b38 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
@@ -26,43 +26,113 @@ import java.util.Iterator;
import java.util.List;
/**
+ * Storage is a standard API that providers can implement to allow "file-system"-based access to data sources.
+ * The methods provided by Storage are similar in form and behavior to standard Linux operating system commands.
+ *
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public interface Storage {
+ /**
+ * List all the data sources in the root directory.
+ *
+ * @return the data sources in the root directory
+ */
public List<String> ls();
+ /**
+ * List all the data sources at the specified location.
+ *
+ * @param location a location
+ * @return the data sources at the specified location
+ */
public List<String> ls(final String location);
- public boolean mkdir(final String location);
-
- public boolean cp(final String fromLocation, final String toLocation);
-
+ /**
+ * Recursively copy all the data sources from the source location to the target location.
+ *
+ * @param sourceLocation the source location
+ * @param targetLocation the target location
+ * @return whether data sources were copied
+ */
+ public boolean cp(final String sourceLocation, final String targetLocation);
+
+ /**
+ * Determine whether the specified location has a data source.
+ *
+ * @param location a location to check
+ * @return whether that location has a data source.
+ */
public boolean exists(final String location);
+ /**
+ * Recursively remove the data source at the specified location.
+ *
+ * @param location the location of the data source
+ * @return whether a data source was removed.
+ */
public boolean rm(final String location);
- public boolean rmr(final String location);
-
- public Iterator<String> head(final String location, final int totalLines);
-
+ /**
+ * Get a string representation of the specified number of lines at the data source location.
+ *
+ * @param location the data source location
+ * @return an iterator of lines
+ */
public default Iterator<String> head(final String location) {
return this.head(location, Integer.MAX_VALUE);
}
- public Iterator<Vertex> head(final String location, final Class parserClass, final int totalLines);
+ /**
+ * Get a string representation of the specified number of lines at the data source location.
+ *
+ * @param location the data source location
+ * @param totalLines the total number of lines to retrieve
+ * @return an iterator of lines.
+ */
+ public Iterator<String> head(final String location, final int totalLines);
- @Deprecated
- public default Iterator<Vertex> head(final String location, final int totalLines, final Class parserClass) {
- return this.head(location,parserClass,totalLines);
- }
+ /**
+ * Get the vertices at the specified graph location.
+ *
+ * @param location the location of the graph (or the root location and search will be made)
+ * @param parserClass the class of the parser that understands the graph format
+ * @param totalLines the total number of lines of the graph to return
+ * @return an iterator of vertices.
+ */
+ public Iterator<Vertex> head(final String location, final Class parserClass, final int totalLines);
+ /**
+ * Get the vertices at the specified graph location.
+ *
+ * @param location the location of the graph (or the root location and search will be made)
+ * @param parserClass the class of the parser that understands the graph format
+ * @return an iterator of vertices.
+ */
public default Iterator<Vertex> head(final String location, final Class parserClass) {
return this.head(location, parserClass, Integer.MAX_VALUE);
}
+ /**
+ * Get the {@link KeyValue} data at the specified memory location.
+ *
+ * @param location the root location of the data
+ * @param memoryKey the memory key
+ * @param parserClass the class of the parser that understands the memory format
+ * @param totalLines the total number of key-values to return
+ * @return an iterator of key-values.
+ */
public <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class parserClass, final int totalLines);
+
+ /**
+ * Get the {@link KeyValue} data at the specified memory location.
+ *
+ * @param location the root location of the data
+ * @param memoryKey the memory key
+ * @param parserClass the class of the parser that understands the memory format
+ * @return an iterator of key-values.
+ */
public default <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class parserClass) {
return this.head(location, memoryKey, parserClass, Integer.MAX_VALUE);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/hadoop-gremlin/conf/hadoop-grateful-gryo.properties
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/conf/hadoop-grateful-gryo.properties b/hadoop-gremlin/conf/hadoop-grateful-gryo.properties
index 0554fcc..e247c2a 100644
--- a/hadoop-gremlin/conf/hadoop-grateful-gryo.properties
+++ b/hadoop-gremlin/conf/hadoop-grateful-gryo.properties
@@ -21,10 +21,8 @@
gremlin.graph=org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph
gremlin.hadoop.graphInputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoInputFormat
gremlin.hadoop.graphOutputFormat=org.apache.tinkerpop.gremlin.hadoop.structure.io.gryo.GryoOutputFormat
-gremlin.hadoop.memoryOutputFormat=org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
-gremlin.hadoop.inputLocation=data/grateful-dead.kryo
+gremlin.hadoop.inputLocation=grateful-dead.kryo
gremlin.hadoop.outputLocation=output
-gremlin.hadoop.deriveMemory=false
gremlin.hadoop.jarsInDistributedCache=true
#
@@ -45,5 +43,5 @@ giraph.maxMessagesInMemory=100000
#
spark.master=local[1]
spark.executor.memory=1g
-spark.serializer=org.apache.spark.serializer.KryoSerializer
+spark.serializer=org.apache.tinkerpop.gremlin.spark.structure.io.gryo.GryoSerializer
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
index 25b01ff..d4578b4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopConfiguration.java
@@ -50,6 +50,11 @@ public final class HadoopConfiguration extends AbstractConfiguration implements
this.properties.put(key, value);
}
+ @Override
+ protected void clearPropertyDirect(final String key) {
+ this.properties.remove(key);
+ }
+
public HadoopConfiguration(final Configuration configuration) {
this();
this.copy(configuration);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
index 4f648ee..ed112f7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -106,7 +106,6 @@ public final class FileSystemStorage implements Storage {
}
}
- @Override
public boolean mkdir(final String location) {
try {
return this.fs.mkdirs(new Path(location));
@@ -116,9 +115,9 @@ public final class FileSystemStorage implements Storage {
}
@Override
- public boolean cp(final String fromLocation, final String toLocation) {
+ public boolean cp(final String sourceLocation, final String targetLocation) {
try {
- return FileUtil.copy(this.fs, new Path(fromLocation), this.fs, new Path(toLocation), false, new Configuration());
+ return FileUtil.copy(this.fs, new Path(sourceLocation), this.fs, new Path(targetLocation), false, new Configuration());
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -136,15 +135,6 @@ public final class FileSystemStorage implements Storage {
@Override
public boolean rm(final String location) {
try {
- return FileSystemStorage.globDelete(this.fs, location, false);
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- @Override
- public boolean rmr(final String location) {
- try {
return FileSystemStorage.globDelete(this.fs, location, true);
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -177,7 +167,7 @@ public final class FileSystemStorage implements Storage {
@Override
public <K, V> Iterator<KeyValue<K, V>> head(final String location, final String memoryKey, final Class parserClass, final int totalLines) {
- if (!parserClass.equals(SequenceFileInputFormat.class) && !parserClass.equals(ObjectWritable.class)) // object writable support for backwards compatibility
+ if (!parserClass.equals(SequenceFileInputFormat.class))
throw new IllegalArgumentException("Only " + SequenceFileInputFormat.class.getCanonicalName() + " memories are supported");
final Configuration configuration = new Configuration();
try {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
index 195f50d..bec9c72 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
@@ -77,14 +77,14 @@ public abstract class AbstractStorageCheck extends AbstractGremlinTest {
assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
assertEquals(2, storage.ls(outputLocation).size());
- assertTrue(storage.rmr(Constants.getGraphLocation(outputLocation)));
+ assertTrue(storage.rm(Constants.getGraphLocation(outputLocation)));
assertEquals(1, storage.ls(outputLocation).size());
- assertTrue(storage.rmr(Constants.getMemoryLocation(outputLocation, "clusterCount")));
+ assertTrue(storage.rm(Constants.getMemoryLocation(outputLocation, "clusterCount")));
assertEquals(0, storage.ls(outputLocation).size());
assertFalse(storage.exists(Constants.getGraphLocation(outputLocation)));
assertFalse(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
if (storage.exists(outputLocation))
- assertTrue(storage.rmr(outputLocation));
+ assertTrue(storage.rm(outputLocation));
assertFalse(storage.exists(outputLocation));
////////////////
@@ -94,12 +94,12 @@ public abstract class AbstractStorageCheck extends AbstractGremlinTest {
assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
assertEquals(2, storage.ls(outputLocation).size());
- assertTrue(storage.rmr(outputLocation));
+ assertTrue(storage.rm(outputLocation));
assertFalse(storage.exists(outputLocation));
assertEquals(0, storage.ls(outputLocation).size());
}
- public void checkCopyMethods(final Storage storage, final String outputLocation, final String newOutputLocation) throws Exception {
+ public void checkCopyMethods(final Storage storage, final String outputLocation, final String newOutputLocation, final Class outputGraphParserClass, final Class outputMemoryParserClass) throws Exception {
graph.compute(graphComputerClass.get()).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
assertTrue(storage.exists(outputLocation));
assertTrue(storage.exists(Constants.getGraphLocation(outputLocation)));
@@ -113,5 +113,11 @@ public abstract class AbstractStorageCheck extends AbstractGremlinTest {
assertTrue(storage.exists(newOutputLocation));
assertTrue(storage.exists(Constants.getGraphLocation(newOutputLocation)));
assertTrue(storage.exists(Constants.getMemoryLocation(newOutputLocation, "clusterCount")));
+
+ assertEquals(2, storage.ls(newOutputLocation).size());
+ assertEquals(6, IteratorUtils.count(storage.head(outputLocation, outputGraphParserClass)));
+ assertEquals(6, IteratorUtils.count(storage.head(newOutputLocation, outputGraphParserClass)));
+ assertEquals(1, IteratorUtils.count(storage.head(outputLocation, "clusterCount", outputMemoryParserClass)));
+ assertEquals(1, IteratorUtils.count(storage.head(newOutputLocation, "clusterCount", outputMemoryParserClass)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
index f528df9..1b2c04e 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
@@ -47,8 +47,6 @@ public class FileSystemStorageCheck extends AbstractStorageCheck {
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
// TestHelper creates the directory and we need it not to exist
deleteDirectory(outputLocation);
- super.checkHeadMethods(storage, inputLocation, outputLocation, InputOutputHelper.getInputFormat((Class) Class.forName(graph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))), ObjectWritable.class);
- deleteDirectory(outputLocation);
super.checkHeadMethods(storage, inputLocation, outputLocation, InputOutputHelper.getInputFormat((Class) Class.forName(graph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))), SequenceFileInputFormat.class);
}
@@ -68,7 +66,7 @@ public class FileSystemStorageCheck extends AbstractStorageCheck {
final String newOutputLocation = TestHelper.makeTestDataDirectory(FileSystemStorageCheck.class, "new-location-for-copy");
// TestHelper creates the directory and we need it not to exist
deleteDirectory(newOutputLocation);
- super.checkCopyMethods(storage, outputLocation, newOutputLocation);
+ super.checkCopyMethods(storage, outputLocation, newOutputLocation, InputOutputHelper.getInputFormat((Class) Class.forName(graph.configuration().getString(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))), SequenceFileInputFormat.class);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
index a7e333c..1fe23e3 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -42,6 +42,7 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
protected static final Set<String> IMPORTS = new HashSet<String>() {{
add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + Spark.class.getPackage().getName() + DOT_STAR);
+ add(IMPORT_SPACE + SparkContextStorage.class.getPackage().getName() + DOT_STAR);
}};
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 82c4331..b48fac5 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -243,11 +243,11 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
if (!PersistedOutputRDD.class.equals(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)) || this.persist.equals(GraphComputer.Persist.NOTHING)) {
graphRDD.unpersist();
if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
- SparkContextStorage.open().rmr(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+ SparkContextStorage.open().rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
}
// delete any file system output if persist nothing
if (FileOutputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT, FileInputFormat.class)) && this.persist.equals(GraphComputer.Persist.NOTHING))
- FileSystemStorage.open(hadoopConfiguration).rmr(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+ FileSystemStorage.open(hadoopConfiguration).rm(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
return new DefaultComputerResult(InputOutputHelper.getOutputGraph(apacheConfiguration, this.resultGraph, this.persist), finalMemory.asImmutable());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index 7833701..27b87f5 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -44,7 +44,7 @@ public final class PersistedOutputRDD implements OutputRDD {
LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
- SparkContextStorage.open(configuration).rmr(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
+ SparkContextStorage.open(configuration).rm(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)); // this might be bad cause it unpersists the job RDD
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
vertex.get().dropEdges();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index d0fc984..6801304 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -84,17 +84,12 @@ public final class SparkContextStorage implements Storage {
}
@Override
- public boolean mkdir(final String location) {
- throw new UnsupportedOperationException("This operation does not make sense for a persisted SparkContext");
- }
-
- @Override
- public boolean cp(final String fromLocation, final String toLocation) {
- final List<String> rdds = Spark.getRDDs().stream().filter(r -> r.name().startsWith(fromLocation)).map(RDD::name).collect(Collectors.toList());
+ public boolean cp(final String sourceLocation, final String targetLocation) {
+ final List<String> rdds = Spark.getRDDs().stream().filter(r -> r.name().startsWith(sourceLocation)).map(RDD::name).collect(Collectors.toList());
if (rdds.size() == 0)
return false;
for (final String rdd : rdds) {
- Spark.getRDD(rdd).toJavaRDD().filter(a -> false).setName(rdd.equals(fromLocation) ? toLocation : rdd.replace(fromLocation, toLocation)).cache().count();
+ Spark.getRDD(rdd).toJavaRDD().filter(a -> true).setName(rdd.equals(sourceLocation) ? targetLocation : rdd.replace(sourceLocation, targetLocation)).cache().count();
}
return true;
}
@@ -106,14 +101,6 @@ public final class SparkContextStorage implements Storage {
@Override
public boolean rm(final String location) {
- if (!Spark.hasRDD(location))
- return false;
- Spark.removeRDD(location);
- return true;
- }
-
- @Override
- public boolean rmr(final String location) {
final List<String> rdds = new ArrayList<>();
final String wildCardLocation = (location.endsWith("*") ? location : location + "*").replace(".", "\\.").replace("*", ".*");
for (final RDD<?> rdd : Spark.getRDDs()) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5c9e81b0/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
index 60b44ef..9d9fa37 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
@@ -23,8 +23,6 @@ import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.AbstractStorageCheck;
import org.apache.tinkerpop.gremlin.spark.structure.Spark;
-import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedInputRDD;
-import org.apache.tinkerpop.gremlin.spark.structure.io.SparkContextStorage;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.junit.Before;
import org.junit.Test;
@@ -66,6 +64,6 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
final Storage storage = SparkContextStorage.open("local[4]");
final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
final String newOutputLocation = "new-location-for-copy";
- super.checkCopyMethods(storage, outputLocation, newOutputLocation);
+ super.checkCopyMethods(storage, outputLocation, newOutputLocation, PersistedInputRDD.class, PersistedInputRDD.class);
}
}
\ No newline at end of file