You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2016/01/11 17:49:13 UTC
[05/30] incubator-tinkerpop git commit: Greatly greatly simplified
Hadoop OLTP and interactions with HDFS and SparkContext. The trend -- dir/~g
for graphs and dir/x for memory. A consistent persistence schema makes
everything so much simpler. I always as
Greatly greatly simplified Hadoop OLTP and interactions with HDFS and SparkContext. The trend -- dir/~g for graphs and dir/x for memory. A consistent persistence schema makes everything so much simpler. I always assumed this would be all generalized/blah/blah. Never actually did it so, hell, stick with a consistent schema and watch the code just fall away.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b4d8e960
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b4d8e960
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b4d8e960
Branch: refs/heads/TINKERPOP-320
Commit: b4d8e9608d4eca3ae177b28fe588518a9d77506c
Parents: 2c0d327
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 9 15:58:50 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 9 15:58:50 2015 -0700
----------------------------------------------------------------------
.../tinkerpop/gremlin/structure/io/Storage.java | 44 +++-----
.../tinkerpop/gremlin/hadoop/Constants.java | 16 ++-
.../groovy/plugin/HadoopGremlinPlugin.java | 6 +-
.../gremlin/hadoop/structure/HadoopGraph.java | 4 +-
.../hadoop/structure/hdfs/HDFSTools.java | 113 -------------------
.../structure/hdfs/HadoopEdgeIterator.java | 83 --------------
.../structure/hdfs/HadoopElementIterator.java | 74 ------------
.../structure/hdfs/HadoopVertexIterator.java | 82 --------------
.../hadoop/structure/hdfs/HiddenFileFilter.java | 44 --------
.../hadoop/structure/hdfs/TextIterator.java | 91 ---------------
.../hadoop/structure/io/FileSystemStorage.java | 106 +++++++++++++----
.../hadoop/structure/io/HadoopEdgeIterator.java | 79 +++++++++++++
.../structure/io/HadoopElementIterator.java | 75 ++++++++++++
.../structure/io/HadoopVertexIterator.java | 78 +++++++++++++
.../hadoop/structure/io/HiddenFileFilter.java | 44 ++++++++
.../structure/io/ObjectWritableIterator.java | 12 +-
.../hadoop/structure/io/TextIterator.java | 91 +++++++++++++++
.../structure/io/VertexWritableIterator.java | 10 +-
.../groovy/plugin/FileSystemStorageCheck.java | 63 +++++++++++
.../groovy/plugin/GraphMemoryHDFSCheck.java | 64 -----------
.../groovy/plugin/HadoopGremlinPluginCheck.java | 4 +-
.../hadoop/groovy/plugin/HadoopPluginSuite.java | 2 +-
.../spark/groovy/plugin/SparkGremlinPlugin.java | 3 +-
.../process/computer/SparkGraphComputer.java | 4 +
.../spark/structure/io/PersistedOutputRDD.java | 3 +-
.../spark/structure/io/SparkContextStorage.java | 53 ++++++++-
.../structure/io/GraphMemorySparkTest.java | 75 ------------
.../io/PersistedInputOutputRDDTest.java | 14 +--
.../structure/io/SparkContextStorageTest.java | 74 ++++++++++++
29 files changed, 699 insertions(+), 712 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
index 1f1bcf4..3b69ff2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
@@ -19,6 +19,9 @@
package org.apache.tinkerpop.gremlin.structure.io;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
import java.util.Iterator;
import java.util.List;
@@ -41,42 +44,21 @@ public interface Storage {
public boolean rmr(final String location);
- public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass);
+ public Iterator<String> head(final String location, final int totalLines);
- public default Iterator<Object> head(final String location) {
- return this.head(location, Object.class);
+ public default Iterator<String> head(final String location) {
+ return this.head(location, Integer.MAX_VALUE);
}
- public default Iterator<Object> head(final String location, final int totalLines) {
- return this.head(location, totalLines, Object.class);
- }
+ public Iterator<Vertex> headGraph(final String location, final int totalLines, final Class parserClass);
- public default <V> Iterator<V> head(final String location, final Class<V> objectClass) {
- return this.head(location, Integer.MAX_VALUE, objectClass);
+ public default Iterator<Vertex> headGraph(final String location, final Class parserClass) {
+ return this.headGraph(location, Integer.MAX_VALUE, parserClass);
}
- /*
-
- FileSystem.metaClass.copyToLocal = { final String from, final String to ->
- return ((FileSystem) delegate).copyToLocalFile(new Path(from), new Path(to));
- }
-
- FileSystem.metaClass.copyFromLocal = { final String from, final String to ->
- return ((FileSystem) delegate).copyFromLocalFile(new Path(from), new Path(to));
- }
+ public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final int totalLines, final Class parserClass);
- FileSystem.metaClass.mergeToLocal = { final String from, final String to ->
- final FileSystem fs = (FileSystem) delegate;
- final FileSystem local = FileSystem.getLocal(new Configuration());
- final FSDataOutputStream outA = local.create(new Path(to));
-
- HDFSTools.getAllFilePaths(fs, new Path(from), HiddenFileFilter.instance()).each {
- final FSDataInputStream inA = fs.open(it);
- IOUtils.copyBytes(inA, outA, 8192);
- inA.close();
- }
- outA.close();
- }
-
- */
+ public default <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final Class parserClass) {
+ return this.headMemory(location, memoryKey, Integer.MAX_VALUE, parserClass);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 8678441..4a91106 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -19,6 +19,9 @@
package org.apache.tinkerpop.gremlin.hadoop;
import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+
+import java.util.Optional;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -54,10 +57,19 @@ public final class Constants {
public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
public static String getGraphLocation(final String location) {
- return location + "/" + Constants.HIDDEN_G;
+ return location.endsWith("/") ? location + Constants.HIDDEN_G : location + "/" + Constants.HIDDEN_G;
}
public static String getMemoryLocation(final String location, final String memoryKey) {
- return location + "/" + memoryKey;
+ return location.endsWith("/") ? location + memoryKey : location + "/" + memoryKey;
+ }
+
+ public static Optional<String> getSearchGraphLocation(final String location, final Storage storage) {
+ if (storage.exists(getGraphLocation(location)))
+ return Optional.of(getGraphLocation(location));
+ else if (storage.exists(location))
+ return Optional.of(location);
+ else
+ return Optional.empty();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index 55ba020..c50d226 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -29,7 +29,6 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceGraphComputer;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
@@ -65,7 +64,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
add(IMPORT_SPACE + GryoInputFormat.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + GraphSONInputFormat.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + ScriptInputFormat.class.getPackage().getName() + DOT_STAR);
- add(IMPORT_SPACE + HDFSTools.class.getPackage().getName() + DOT_STAR);
////
add(IMPORT_SPACE + MapReduceGraphComputer.class.getPackage().getName() + DOT_STAR);
}};
@@ -79,8 +77,8 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
public void afterPluginTo(final PluginAcceptor pluginAcceptor) throws PluginInitializationException, IllegalEnvironmentException {
pluginAcceptor.addImports(IMPORTS);
try {
- pluginAcceptor.addBinding("hdfs", new FileSystemStorage(FileSystem.get(new Configuration())));
- pluginAcceptor.addBinding("local", new FileSystemStorage(FileSystem.getLocal(new Configuration())));
+ pluginAcceptor.addBinding("hdfs", FileSystemStorage.open(FileSystem.get(new Configuration())));
+ pluginAcceptor.addBinding("local", FileSystemStorage.open(FileSystem.getLocal(new Configuration())));
if (null == System.getenv(Constants.HADOOP_GREMLIN_LIBS))
HadoopGraph.LOGGER.warn("Be sure to set the environmental variable: " + Constants.HADOOP_GREMLIN_LIBS);
else
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index a9f758c..22f42f4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.tinkerpop.gremlin.hadoop.Constants;
import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopEdgeIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopVertexIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopEdgeIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopVertexIterator;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.structure.Edge;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java
deleted file mode 100644
index e4da530..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HDFSTools {
-
- private static final String FORWARD_SLASH = "/";
- private static final String FORWARD_ASTERISK = "/*";
-
- private HDFSTools() {
- }
-
- public static long getFileSize(final FileSystem fs, final Path path, final PathFilter filter) throws IOException {
- long totalSize = 0l;
- for (final Path p : getAllFilePaths(fs, path, filter)) {
- totalSize = totalSize + fs.getFileStatus(p).getLen();
- }
- return totalSize;
- }
-
- public static List<Path> getAllFilePaths(final FileSystem fs, Path path, final PathFilter filter) throws IOException {
- if (null == path) path = fs.getHomeDirectory();
- if (path.toString().equals(FORWARD_SLASH)) path = new Path("");
-
- final List<Path> paths = new ArrayList<Path>();
- if (fs.isFile(path))
- paths.add(path);
- else {
- for (final FileStatus status : fs.globStatus(new Path(path + FORWARD_ASTERISK), filter)) {
- final Path next = status.getPath();
- paths.addAll(getAllFilePaths(fs, next, filter));
- }
- }
- return paths;
- }
-
-
- public static void decompressPath(final FileSystem fs, final String in, final String out, final String compressedFileSuffix, final boolean deletePrevious) throws IOException {
- final Path inPath = new Path(in);
-
- if (fs.isFile(inPath))
- HDFSTools.decompressFile(fs, in, out, deletePrevious);
- else {
- final Path outPath = new Path(out);
- if (!fs.exists(outPath))
- fs.mkdirs(outPath);
- for (final Path path : FileUtil.stat2Paths(fs.globStatus(new Path(in + FORWARD_ASTERISK)))) {
- if (path.getName().endsWith(compressedFileSuffix))
- HDFSTools.decompressFile(fs, path.toString(), outPath.toString() + FORWARD_SLASH + path.getName().split("\\.")[0], deletePrevious);
- }
- }
- }
-
- public static void decompressFile(final FileSystem fs, final String inFile, final String outFile, boolean deletePrevious) throws IOException {
- final Path inPath = new Path(inFile);
- final Path outPath = new Path(outFile);
- final CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
- final CompressionCodec codec = factory.getCodec(inPath);
- final OutputStream out = fs.create(outPath);
- final InputStream in = codec.createInputStream(fs.open(inPath));
- IOUtils.copyBytes(in, out, 8192);
- IOUtils.closeStream(in);
- IOUtils.closeStream(out);
-
- if (deletePrevious)
- fs.delete(new Path(inFile), true);
-
- }
-
- public static boolean globDelete(final FileSystem fs, final String path, final boolean recursive) throws IOException {
- boolean deleted = false;
- for (final Path p : FileUtil.stat2Paths(fs.globStatus(new Path(path)))) {
- fs.delete(p, recursive);
- deleted = true;
- }
- return deleted;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
deleted file mode 100644
index 59a4d2c..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HadoopEdgeIterator extends HadoopElementIterator<Edge> {
-
- private Iterator<Edge> edgeIterator = Collections.emptyIterator();
-
- public HadoopEdgeIterator(final HadoopGraph graph) throws IOException {
- super(graph);
- }
-
- @Override
- public Edge next() {
- try {
- while (true) {
- if (this.edgeIterator.hasNext())
- return new HadoopEdge(this.edgeIterator.next(), this.graph);
- if (this.readers.isEmpty())
- throw FastNoSuchElementException.instance();
- if (this.readers.peek().nextKeyValue()) {
- this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
- } else {
- this.readers.remove();
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- @Override
- public boolean hasNext() {
- try {
- while (true) {
- if (this.edgeIterator.hasNext())
- return true;
- if (this.readers.isEmpty())
- return false;
- if (this.readers.peek().nextKeyValue()) {
- this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
- } else {
- this.readers.remove();
- }
- }
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
deleted file mode 100644
index 45f3c55..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.structure.Element;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.UUID;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public abstract class HadoopElementIterator<E extends Element> implements Iterator<E> {
-
- protected final HadoopGraph graph;
- protected final Queue<RecordReader<NullWritable, VertexWritable>> readers = new LinkedList<>();
-
- public HadoopElementIterator(final HadoopGraph graph) throws IOException {
- try {
- this.graph = graph;
- final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
- final InputFormat<NullWritable, VertexWritable> inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance();
- if (inputFormat instanceof FileInputFormat) {
- if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
- return; // there is no input location and thus, no data (empty graph)
- if (!FileSystem.get(configuration).exists(new Path(this.graph.configuration().getInputLocation())))
- return; // there is no data at the input location (empty graph)
- configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, this.graph.configuration().getInputLocation());
- }
- final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1)));
- for (final InputSplit split : splits) {
- this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
- }
- } catch (Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
deleted file mode 100644
index 8f13c59..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HadoopVertexIterator extends HadoopElementIterator<Vertex> {
-
- private HadoopVertex nextVertex = null;
-
- public HadoopVertexIterator(final HadoopGraph graph) throws IOException {
- super(graph);
- }
-
- @Override
- public Vertex next() {
- try {
- if (this.nextVertex != null) {
- final Vertex temp = this.nextVertex;
- this.nextVertex = null;
- return temp;
- } else {
- while (!this.readers.isEmpty()) {
- if (this.readers.peek().nextKeyValue())
- return new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
- else
- this.readers.remove();
- }
- }
- throw FastNoSuchElementException.instance();
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- @Override
- public boolean hasNext() {
- try {
- if (null != this.nextVertex) return true;
- else {
- while (!this.readers.isEmpty()) {
- if (this.readers.peek().nextKeyValue()) {
- this.nextVertex = new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
- return true;
- } else
- this.readers.remove();
- }
- }
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- return false;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java
deleted file mode 100644
index 4ea0958..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HiddenFileFilter implements PathFilter {
-
- private static final HiddenFileFilter INSTANCE = new HiddenFileFilter();
-
- private HiddenFileFilter() {
-
- }
-
- @Override
- public boolean accept(final Path path) {
- final String name = path.getName();
- return !name.startsWith("_") && !name.startsWith(".");
- }
-
- public static HiddenFileFilter instance() {
- return INSTANCE;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java
deleted file mode 100644
index c8424f6..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TextIterator implements Iterator<String> {
-
- private String line;
- private boolean available = false;
- private final Queue<BufferedReader> readers = new LinkedList<>();
-
- public TextIterator(final Configuration configuration, final Path path) throws IOException {
- final FileSystem fs = FileSystem.get(configuration);
- for (final FileStatus status : fs.listStatus(path, HiddenFileFilter.instance())) {
- this.readers.add(new BufferedReader(new InputStreamReader(fs.open(status.getPath()))));
- }
- }
-
- @Override
- public boolean hasNext() {
- try {
- if (this.available) {
- return true;
- } else {
- while (true) {
- if (this.readers.isEmpty())
- return false;
- if ((this.line = this.readers.peek().readLine()) != null) {
- this.available = true;
- return true;
- } else
- this.readers.remove();
- }
- }
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- @Override
- public String next() {
- try {
- if (this.available) {
- this.available = false;
- return this.line;
- } else {
- while (true) {
- if (this.readers.isEmpty())
- throw FastNoSuchElementException.instance();
- if ((this.line = this.readers.peek().readLine()) != null) {
- return this.line;
- } else
- this.readers.remove();
- }
- }
- } catch (final IOException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
index 56dfe52..5d3995c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+import org.apache.commons.configuration.BaseConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -26,16 +27,20 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.TextIterator;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.io.Storage;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
@@ -48,13 +53,31 @@ public final class FileSystemStorage implements Storage {
private static final String SPACE = " ";
private static final String D_SPACE = "(D) ";
+ private static final String FORWARD_SLASH = "/";
+ private static final String FORWARD_ASTERISK = "/*";
private final FileSystem fs;
- public FileSystemStorage(final FileSystem fileSystem) {
+ private FileSystemStorage(final FileSystem fileSystem) {
this.fs = fileSystem;
}
+ public static FileSystemStorage open() {
+ return FileSystemStorage.open(new Configuration());
+ }
+
+ public static FileSystemStorage open(final Configuration configuration) {
+ try {
+ return new FileSystemStorage(FileSystem.get(configuration));
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ public static FileSystemStorage open(final FileSystem fileSystem) {
+ return new FileSystemStorage(fileSystem);
+ }
+
private static String fileStatusString(final FileStatus status) {
StringBuilder s = new StringBuilder();
s.append(status.getPermission()).append(" ");
@@ -113,7 +136,7 @@ public final class FileSystemStorage implements Storage {
@Override
public boolean rm(final String location) {
try {
- return HDFSTools.globDelete(this.fs, location, false);
+ return FileSystemStorage.globDelete(this.fs, location, false);
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
@@ -122,35 +145,53 @@ public final class FileSystemStorage implements Storage {
@Override
public boolean rmr(final String location) {
try {
- return HDFSTools.globDelete(this.fs, location, true);
+ return FileSystemStorage.globDelete(this.fs, location, true);
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Override
- public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
- return headMaker(this.fs, location, totalLines, (Class<? extends Writable>) objectClass);
+ public Iterator<String> head(final String location, final int totalLines) {
+ try {
+ return IteratorUtils.limit((Iterator) new TextIterator(fs.getConf(), new Path(location)), totalLines);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
}
@Override
- public String toString() {
- return StringFactory.storageString(this.fs.toString());
+ public Iterator<Vertex> headGraph(final String location, final int totalLines, final Class parserClass) {
+ final org.apache.commons.configuration.Configuration configuration = new BaseConfiguration();
+ configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getSearchGraphLocation(location, this).get());
+ configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
+ try {
+ if (InputFormat.class.isAssignableFrom(parserClass))
+ return IteratorUtils.limit(new HadoopVertexIterator(HadoopGraph.open(configuration)), totalLines);
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ throw new IllegalArgumentException("The provided parser class must be an " + InputFormat.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
+
}
- private static Iterator headMaker(final FileSystem fs, final String path, final int totalLines, final Class<? extends Writable> writableClass) {
+ @Override
+ public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final int totalLines, final Class parserClass) {
+ if (!parserClass.equals(SequenceFileInputFormat.class))
+ throw new IllegalArgumentException("Only " + SequenceFileInputFormat.class.getCanonicalName() + " memories are supported");
+ final Configuration configuration = new Configuration();
try {
- if (writableClass.equals(ObjectWritable.class))
- return IteratorUtils.limit(new ObjectWritableIterator(fs.getConf(), new Path(path)), totalLines);
- else if (writableClass.equals(VertexWritable.class))
- return IteratorUtils.limit(new VertexWritableIterator(fs.getConf(), new Path(path)), totalLines);
- else
- return IteratorUtils.limit(new TextIterator(fs.getConf(), new Path(path)), totalLines);
+ return IteratorUtils.limit((Iterator) new ObjectWritableIterator(configuration, new Path(Constants.getMemoryLocation(location, memoryKey))), totalLines);
} catch (final IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
+ @Override
+ public String toString() {
+ return StringFactory.storageString(this.fs.toString());
+ }
+
/////////
public void copyToLocal(final String fromLocation, final String toLocation) {
@@ -173,7 +214,7 @@ public final class FileSystemStorage implements Storage {
try {
final FileSystem local = FileSystem.getLocal(new Configuration());
final FSDataOutputStream outA = local.create(new Path(toLocation));
- for (final Path path : HDFSTools.getAllFilePaths(fs, new Path(fromLocation), HiddenFileFilter.instance())) {
+ for (final Path path : FileSystemStorage.getAllFilePaths(fs, new Path(fromLocation), HiddenFileFilter.instance())) {
final FSDataInputStream inA = fs.open(path);
IOUtils.copyBytes(inA, outA, 8192);
inA.close();
@@ -183,4 +224,31 @@ public final class FileSystemStorage implements Storage {
throw new IllegalStateException(e.getMessage(), e);
}
}
+
+ ////////////
+
+ private static boolean globDelete(final FileSystem fs, final String path, final boolean recursive) throws IOException {
+ boolean deleted = false;
+ for (final Path p : FileUtil.stat2Paths(fs.globStatus(new Path(path)))) {
+ fs.delete(p, recursive);
+ deleted = true;
+ }
+ return deleted;
+ }
+
+ private static List<Path> getAllFilePaths(final FileSystem fs, Path path, final PathFilter filter) throws IOException {
+ if (null == path) path = fs.getHomeDirectory();
+ if (path.toString().equals(FORWARD_SLASH)) path = new Path("");
+
+ final List<Path> paths = new ArrayList<Path>();
+ if (fs.isFile(path))
+ paths.add(path);
+ else {
+ for (final FileStatus status : fs.globStatus(new Path(path + FORWARD_ASTERISK), filter)) {
+ final Path next = status.getPath();
+ paths.addAll(getAllFilePaths(fs, next, filter));
+ }
+ }
+ return paths;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java
new file mode 100644
index 0000000..8f5452f
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class HadoopEdgeIterator extends HadoopElementIterator<Edge> {
+
+ private Iterator<Edge> edgeIterator = Collections.emptyIterator();
+
+ public HadoopEdgeIterator(final HadoopGraph graph) throws IOException {
+ super(graph);
+ }
+
+ @Override
+ public Edge next() {
+ try {
+ while (true) {
+ if (this.edgeIterator.hasNext())
+ return new HadoopEdge(this.edgeIterator.next(), this.graph);
+ if (this.readers.isEmpty())
+ throw FastNoSuchElementException.instance();
+ if (this.readers.peek().nextKeyValue()) {
+ this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
+ } else {
+ this.readers.remove().close();
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ while (true) {
+ if (this.edgeIterator.hasNext())
+ return true;
+ if (this.readers.isEmpty())
+ return false;
+ if (this.readers.peek().nextKeyValue()) {
+ this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
+ } else {
+ this.readers.remove().close();
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
new file mode 100644
index 0000000..9196ce3
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class HadoopElementIterator<E extends Element> implements Iterator<E> {
+
+ protected final HadoopGraph graph;
+ protected final Queue<RecordReader<NullWritable, VertexWritable>> readers = new LinkedList<>();
+
+ public HadoopElementIterator(final HadoopGraph graph) {
+ try {
+ this.graph = graph;
+ final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
+ final InputFormat<NullWritable, VertexWritable> inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance();
+ if (inputFormat instanceof FileInputFormat) {
+ final Storage storage = FileSystemStorage.open(configuration);
+
+ if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
+ return; // there is no input location and thus, no data (empty graph)
+ if (!Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).isPresent())
+ return; // there is no data at the input location (empty graph)
+ configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).get());
+ }
+ final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1)));
+ for (final InputSplit split : splits) {
+ this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java
new file mode 100644
index 0000000..45b0cad
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class HadoopVertexIterator extends HadoopElementIterator<Vertex> {
+
+ private HadoopVertex nextVertex = null;
+
+ public HadoopVertexIterator(final HadoopGraph graph) throws IOException {
+ super(graph);
+ }
+
+ @Override
+ public Vertex next() {
+ try {
+ if (this.nextVertex != null) {
+ final Vertex temp = this.nextVertex;
+ this.nextVertex = null;
+ return temp;
+ } else {
+ while (!this.readers.isEmpty()) {
+ if (this.readers.peek().nextKeyValue())
+ return new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
+ else
+ this.readers.remove().close();
+ }
+ }
+ throw FastNoSuchElementException.instance();
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (null != this.nextVertex) return true;
+ else {
+ while (!this.readers.isEmpty()) {
+ if (this.readers.peek().nextKeyValue()) {
+ this.nextVertex = new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
+ return true;
+ } else
+ this.readers.remove().close();
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ return false;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java
new file mode 100644
index 0000000..8b01571
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class HiddenFileFilter implements PathFilter {
+
+ private static final HiddenFileFilter INSTANCE = new HiddenFileFilter();
+
+ private HiddenFileFilter() {
+
+ }
+
+ @Override
+ public boolean accept(final Path path) {
+ final String name = path.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+
+ public static HiddenFileFilter instance() {
+ return INSTANCE;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
index 60bf930..676ca07 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
@@ -19,11 +19,10 @@
package org.apache.tinkerpop.gremlin.hadoop.structure.io;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
@@ -43,9 +42,8 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
private final Queue<SequenceFile.Reader> readers = new LinkedList<>();
public ObjectWritableIterator(final Configuration configuration, final Path path) throws IOException {
- final FileSystem fs = FileSystem.get(configuration);
- for (final Path path2 : HDFSTools.getAllFilePaths(fs, path, HiddenFileFilter.instance())) {
- this.readers.add(new SequenceFile.Reader(configuration, SequenceFile.Reader.file(path2)));
+ for (final FileStatus status : FileSystem.get(configuration).listStatus(path, HiddenFileFilter.instance())) {
+ this.readers.add(new SequenceFile.Reader(configuration, SequenceFile.Reader.file(status.getPath())));
}
}
@@ -62,7 +60,7 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
this.available = true;
return true;
} else
- this.readers.remove();
+ this.readers.remove().close();
}
}
} catch (final IOException e) {
@@ -83,7 +81,7 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
if (this.readers.peek().next(this.key, this.value)) {
return new KeyValue<>(this.key.get(), this.value.get());
} else
- this.readers.remove();
+ this.readers.remove().close();
}
}
} catch (final IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java
new file mode 100644
index 0000000..ecb6ede
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TextIterator implements Iterator<String> {
+
+ private String line;
+ private boolean available = false;
+ private final Queue<BufferedReader> readers = new LinkedList<>();
+
+ public TextIterator(final Configuration configuration, final Path path) throws IOException {
+ final FileSystem fs = FileSystem.get(configuration);
+ for (final FileStatus status : fs.listStatus(path, HiddenFileFilter.instance())) {
+ this.readers.add(new BufferedReader(new InputStreamReader(fs.open(status.getPath()))));
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ try {
+ if (this.available) {
+ return true;
+ } else {
+ while (true) {
+ if (this.readers.isEmpty())
+ return false;
+ if ((this.line = this.readers.peek().readLine()) != null) {
+ this.available = true;
+ return true;
+ } else
+ this.readers.remove().close();
+ }
+ }
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public String next() {
+ try {
+ if (this.available) {
+ this.available = false;
+ return this.line;
+ } else {
+ while (true) {
+ if (this.readers.isEmpty())
+ throw FastNoSuchElementException.instance();
+ if ((this.line = this.readers.peek().readLine()) != null) {
+ return this.line;
+ } else
+ this.readers.remove().close();
+ }
+ }
+ } catch (final IOException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
index daaffb8..d3e1fd0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -42,9 +41,8 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
private final Queue<SequenceFile.Reader> readers = new LinkedList<>();
public VertexWritableIterator(final Configuration configuration, final Path path) throws IOException {
- final FileSystem fs = FileSystem.get(configuration);
- for (final FileStatus status : fs.listStatus(path, HiddenFileFilter.instance())) {
- this.readers.add(new SequenceFile.Reader(fs, status.getPath(), configuration));
+ for (final FileStatus status : FileSystem.get(configuration).listStatus(path, HiddenFileFilter.instance())) {
+ this.readers.add(new SequenceFile.Reader(configuration, SequenceFile.Reader.file(status.getPath())));
}
}
@@ -61,7 +59,7 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
this.available = true;
return true;
} else
- this.readers.remove();
+ this.readers.remove().close();
}
}
} catch (final IOException e) {
@@ -82,7 +80,7 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
if (this.readers.peek().next(this.value)) {
return this.value.get();
} else
- this.readers.remove();
+ this.readers.remove().close();
}
}
} catch (final IOException e) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
new file mode 100644
index 0000000..a8c5307
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin;
+
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class FileSystemStorageCheck extends AbstractGremlinTest {
+
+ @Test
+ @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
+ public void shouldPersistGraphAndMemory() throws Exception {
+ final ComputerResult result = graph.compute(graphComputerClass.get()).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+ /////
+ final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration()));
+ // TEST GRAPH PERSISTENCE
+ assertTrue(storage.exists(Constants.getGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))));
+ assertEquals(6, result.graph().traversal().V().count().next().longValue());
+ assertEquals(0, result.graph().traversal().E().count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+ assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+ assertEquals(2, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).dedup().count().next().longValue());
+ /////
+ // TEST MEMORY PERSISTENCE
+ assertEquals(2, (int) result.memory().get("clusterCount"));
+ assertTrue(storage.exists(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")));
+ assertEquals(1, IteratorUtils.count(storage.headMemory(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class)));
+ assertEquals(2, storage.headMemory(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class).next().getValue());
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
deleted file mode 100644
index d47ce43..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
-import org.apache.tinkerpop.gremlin.LoadGraphWith;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
-import org.apache.tinkerpop.gremlin.structure.io.Storage;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class GraphMemoryHDFSCheck extends AbstractGremlinTest {
-
- @Test
- @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
- public void shouldPersistGraphAndMemory() throws Exception {
- final ComputerResult result = graph.compute(graphComputerClass.get()).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
- /////
- final Storage storage = new FileSystemStorage(FileSystem.get(ConfUtil.makeHadoopConfiguration(graph.configuration())));
- // TEST GRAPH PERSISTENCE
- assertTrue(storage.exists(Constants.getGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))));
- assertEquals(6, result.graph().traversal().V().count().next().longValue());
- assertEquals(0, result.graph().traversal().E().count().next().longValue());
- assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
- assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
- assertEquals(2, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).dedup().count().next().longValue());
- /////
- // TEST MEMORY PERSISTENCE
- assertEquals(2, (int) result.memory().get("clusterCount"));
- assertTrue(storage.exists(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")));
- // System.out.println(IteratorUtils.list(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
-// assertEquals(1, IteratorUtils.count(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
- assertEquals(2, storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")).next());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
index 4108e35..b558169 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
@@ -137,7 +137,7 @@ public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
AbstractGremlinProcessTest.checkResults(Arrays.asList("ripple", "lop"), traversal);
assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/m')"));
assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
- final List<KeyValue<Integer, Collection<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/m',ObjectWritable)"));
+ final List<KeyValue<Integer, Collection<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.headMemory('target/test-output','m',SequenceFileInputFormat)"));
assertEquals(4, mList.size());
mList.forEach(keyValue -> {
if (keyValue.getKey().equals(29))
@@ -151,7 +151,7 @@ public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
else
throw new IllegalStateException("The provided key/value is unknown: " + keyValue);
});
- final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/" + TraverserMapReduce.TRAVERSERS + "',ObjectWritable)"));
+ final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.headMemory('target/test-output/'," + "'" + TraverserMapReduce.TRAVERSERS + "',SequenceFileInputFormat)"));
assertEquals(2, traversersList.size());
traversersList.forEach(keyValue -> {
assertEquals(MapReduce.NullObject.instance(), keyValue.getKey());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
index 7dc8143..16d654f 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
@@ -29,6 +29,6 @@ import org.junit.runners.model.RunnerBuilder;
*/
public class HadoopPluginSuite extends AbstractGremlinSuite {
public HadoopPluginSuite(final Class<?> klass, final RunnerBuilder builder) throws InitializationError {
- super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, true, TraversalEngine.Type.COMPUTER);
+ super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class, FileSystemStorageCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class, FileSystemStorageCheck.class}, true, TraversalEngine.Type.COMPUTER);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
index fcb234c..a7e333c 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -42,7 +42,6 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
protected static final Set<String> IMPORTS = new HashSet<String>() {{
add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
add(IMPORT_SPACE + Spark.class.getPackage().getName() + DOT_STAR);
- add(IMPORT_SPACE + SparkContextStorage.class.getPackage().getName() + DOT_STAR);
}};
@Override
@@ -54,7 +53,7 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
public void afterPluginTo(final PluginAcceptor pluginAcceptor) throws PluginInitializationException, IllegalEnvironmentException {
pluginAcceptor.addImports(IMPORTS);
try {
- pluginAcceptor.eval("spark = SparkContextStorage.open()");
+ pluginAcceptor.addBinding("spark", SparkContextStorage.open());
} catch (final Exception e) {
throw new PluginInitializationException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 03b3016..f96fd15 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -239,6 +239,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
// unpersist the graphRDD if it will no longer be used
if (!PersistedOutputRDD.class.equals(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)) || this.persist.equals(GraphComputer.Persist.NOTHING)) {
graphRDD.unpersist();
+ if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)) {
+ Spark.removeRDD(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+ Spark.removeRDD(Constants.getGraphLocation(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
+ }
}
// update runtime and return the newly computed graph
finalMemory.setRuntime(System.currentTimeMillis() - startTime);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index 5cb9edf..b78caa9 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -46,6 +46,7 @@ public final class PersistedOutputRDD implements OutputRDD {
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
final String graphRDDName = Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
Spark.removeRDD(graphRDDName); // this might be bad cause it unpersists the job RDD
+ Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), SparkContextStorage.open(configuration)).ifPresent(Spark::removeRDD); // this might be bad cause it unpersists the job RDD
if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true))
graphRDD.mapValues(vertex -> {
vertex.get().dropEdges();
@@ -62,7 +63,7 @@ public final class PersistedOutputRDD implements OutputRDD {
LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
- final String sideEffectRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
+ final String sideEffectRDDName = configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + memoryKey;
Spark.removeRDD(sideEffectRDDName);
memoryRDD.setName(sideEffectRDDName).cache();
return IteratorUtils.map(memoryRDD.toLocalIterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));