You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by dk...@apache.org on 2016/01/11 17:49:13 UTC

[05/30] incubator-tinkerpop git commit: Greatly greatly simplified Hadoop OLTP and interactions with HDFS and SparkContext. The trend -- dir/~g for graphs and dir/x for memory. A consistent persistence schema makes everything so much simpler. I always as

Greatly greatly simplified Hadoop OLTP and interactions with HDFS and SparkContext. The trend -- dir/~g for graphs and dir/x for memory. A consistent persistence schema makes everything so much simpler. I always assumed this would be all generalized/blah/blah. Never actually did it so, hell, stick with a consistent schema and watch the code just fall away.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b4d8e960
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b4d8e960
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b4d8e960

Branch: refs/heads/TINKERPOP-320
Commit: b4d8e9608d4eca3ae177b28fe588518a9d77506c
Parents: 2c0d327
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 9 15:58:50 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 9 15:58:50 2015 -0700

----------------------------------------------------------------------
 .../tinkerpop/gremlin/structure/io/Storage.java |  44 +++-----
 .../tinkerpop/gremlin/hadoop/Constants.java     |  16 ++-
 .../groovy/plugin/HadoopGremlinPlugin.java      |   6 +-
 .../gremlin/hadoop/structure/HadoopGraph.java   |   4 +-
 .../hadoop/structure/hdfs/HDFSTools.java        | 113 -------------------
 .../structure/hdfs/HadoopEdgeIterator.java      |  83 --------------
 .../structure/hdfs/HadoopElementIterator.java   |  74 ------------
 .../structure/hdfs/HadoopVertexIterator.java    |  82 --------------
 .../hadoop/structure/hdfs/HiddenFileFilter.java |  44 --------
 .../hadoop/structure/hdfs/TextIterator.java     |  91 ---------------
 .../hadoop/structure/io/FileSystemStorage.java  | 106 +++++++++++++----
 .../hadoop/structure/io/HadoopEdgeIterator.java |  79 +++++++++++++
 .../structure/io/HadoopElementIterator.java     |  75 ++++++++++++
 .../structure/io/HadoopVertexIterator.java      |  78 +++++++++++++
 .../hadoop/structure/io/HiddenFileFilter.java   |  44 ++++++++
 .../structure/io/ObjectWritableIterator.java    |  12 +-
 .../hadoop/structure/io/TextIterator.java       |  91 +++++++++++++++
 .../structure/io/VertexWritableIterator.java    |  10 +-
 .../groovy/plugin/FileSystemStorageCheck.java   |  63 +++++++++++
 .../groovy/plugin/GraphMemoryHDFSCheck.java     |  64 -----------
 .../groovy/plugin/HadoopGremlinPluginCheck.java |   4 +-
 .../hadoop/groovy/plugin/HadoopPluginSuite.java |   2 +-
 .../spark/groovy/plugin/SparkGremlinPlugin.java |   3 +-
 .../process/computer/SparkGraphComputer.java    |   4 +
 .../spark/structure/io/PersistedOutputRDD.java  |   3 +-
 .../spark/structure/io/SparkContextStorage.java |  53 ++++++++-
 .../structure/io/GraphMemorySparkTest.java      |  75 ------------
 .../io/PersistedInputOutputRDDTest.java         |  14 +--
 .../structure/io/SparkContextStorageTest.java   |  74 ++++++++++++
 29 files changed, 699 insertions(+), 712 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
index 1f1bcf4..3b69ff2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/io/Storage.java
@@ -19,6 +19,9 @@
 
 package org.apache.tinkerpop.gremlin.structure.io;
 
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
 import java.util.Iterator;
 import java.util.List;
 
@@ -41,42 +44,21 @@ public interface Storage {
 
     public boolean rmr(final String location);
 
-    public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass);
+    public Iterator<String> head(final String location, final int totalLines);
 
-    public default Iterator<Object> head(final String location) {
-        return this.head(location, Object.class);
+    public default Iterator<String> head(final String location) {
+        return this.head(location, Integer.MAX_VALUE);
     }
 
-    public default Iterator<Object> head(final String location, final int totalLines) {
-        return this.head(location, totalLines, Object.class);
-    }
+    public Iterator<Vertex> headGraph(final String location, final int totalLines, final Class parserClass);
 
-    public default <V> Iterator<V> head(final String location, final Class<V> objectClass) {
-        return this.head(location, Integer.MAX_VALUE, objectClass);
+    public default Iterator<Vertex> headGraph(final String location, final Class parserClass) {
+        return this.headGraph(location, Integer.MAX_VALUE, parserClass);
     }
 
-  /*
-
-        FileSystem.metaClass.copyToLocal = { final String from, final String to ->
-            return ((FileSystem) delegate).copyToLocalFile(new Path(from), new Path(to));
-        }
-
-        FileSystem.metaClass.copyFromLocal = { final String from, final String to ->
-            return ((FileSystem) delegate).copyFromLocalFile(new Path(from), new Path(to));
-        }
+    public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final int totalLines, final Class parserClass);
 
-        FileSystem.metaClass.mergeToLocal = { final String from, final String to ->
-            final FileSystem fs = (FileSystem) delegate;
-            final FileSystem local = FileSystem.getLocal(new Configuration());
-            final FSDataOutputStream outA = local.create(new Path(to));
-
-            HDFSTools.getAllFilePaths(fs, new Path(from), HiddenFileFilter.instance()).each {
-                final FSDataInputStream inA = fs.open(it);
-                IOUtils.copyBytes(inA, outA, 8192);
-                inA.close();
-            }
-            outA.close();
-        }
-
-     */
+    public default <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final Class parserClass) {
+        return this.headMemory(location, memoryKey, Integer.MAX_VALUE, parserClass);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
index 8678441..4a91106 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/Constants.java
@@ -19,6 +19,9 @@
 package org.apache.tinkerpop.gremlin.hadoop;
 
 import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+
+import java.util.Optional;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -54,10 +57,19 @@ public final class Constants {
     public static final String GREMLIN_SPARK_PERSIST_CONTEXT = "gremlin.spark.persistContext";
 
     public static String getGraphLocation(final String location) {
-        return location + "/" + Constants.HIDDEN_G;
+        return location.endsWith("/") ? location + Constants.HIDDEN_G : location + "/" + Constants.HIDDEN_G;
     }
 
     public static String getMemoryLocation(final String location, final String memoryKey) {
-        return location + "/" + memoryKey;
+        return location.endsWith("/") ? location + memoryKey : location + "/" + memoryKey;
+    }
+
+    public static Optional<String> getSearchGraphLocation(final String location, final Storage storage) {
+        if (storage.exists(getGraphLocation(location)))
+            return Optional.of(getGraphLocation(location));
+        else if (storage.exists(location))
+            return Optional.of(location);
+        else
+            return Optional.empty();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
index 55ba020..c50d226 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPlugin.java
@@ -29,7 +29,6 @@ import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.mapreduce.MapReduceGraphComputer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopConfiguration;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.graphson.GraphSONInputFormat;
@@ -65,7 +64,6 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
         add(IMPORT_SPACE + GryoInputFormat.class.getPackage().getName() + DOT_STAR);
         add(IMPORT_SPACE + GraphSONInputFormat.class.getPackage().getName() + DOT_STAR);
         add(IMPORT_SPACE + ScriptInputFormat.class.getPackage().getName() + DOT_STAR);
-        add(IMPORT_SPACE + HDFSTools.class.getPackage().getName() + DOT_STAR);
         ////
         add(IMPORT_SPACE + MapReduceGraphComputer.class.getPackage().getName() + DOT_STAR);
     }};
@@ -79,8 +77,8 @@ public final class HadoopGremlinPlugin extends AbstractGremlinPlugin {
     public void afterPluginTo(final PluginAcceptor pluginAcceptor) throws PluginInitializationException, IllegalEnvironmentException {
         pluginAcceptor.addImports(IMPORTS);
         try {
-            pluginAcceptor.addBinding("hdfs", new FileSystemStorage(FileSystem.get(new Configuration())));
-            pluginAcceptor.addBinding("local", new FileSystemStorage(FileSystem.getLocal(new Configuration())));
+            pluginAcceptor.addBinding("hdfs", FileSystemStorage.open(FileSystem.get(new Configuration())));
+            pluginAcceptor.addBinding("local", FileSystemStorage.open(FileSystem.getLocal(new Configuration())));
             if (null == System.getenv(Constants.HADOOP_GREMLIN_LIBS))
                 HadoopGraph.LOGGER.warn("Be sure to set the environmental variable: " + Constants.HADOOP_GREMLIN_LIBS);
             else

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
index a9f758c..22f42f4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/HadoopGraph.java
@@ -26,8 +26,8 @@ import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphComputer;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopEdgeIterator;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HadoopVertexIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopEdgeIterator;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.HadoopVertexIterator;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
 import org.apache.tinkerpop.gremlin.structure.Edge;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java
deleted file mode 100644
index e4da530..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HDFSTools.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionCodecFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HDFSTools {
-
-    private static final String FORWARD_SLASH = "/";
-    private static final String FORWARD_ASTERISK = "/*";
-
-    private HDFSTools() {
-    }
-
-    public static long getFileSize(final FileSystem fs, final Path path, final PathFilter filter) throws IOException {
-        long totalSize = 0l;
-        for (final Path p : getAllFilePaths(fs, path, filter)) {
-            totalSize = totalSize + fs.getFileStatus(p).getLen();
-        }
-        return totalSize;
-    }
-
-    public static List<Path> getAllFilePaths(final FileSystem fs, Path path, final PathFilter filter) throws IOException {
-        if (null == path) path = fs.getHomeDirectory();
-        if (path.toString().equals(FORWARD_SLASH)) path = new Path("");
-
-        final List<Path> paths = new ArrayList<Path>();
-        if (fs.isFile(path))
-            paths.add(path);
-        else {
-            for (final FileStatus status : fs.globStatus(new Path(path + FORWARD_ASTERISK), filter)) {
-                final Path next = status.getPath();
-                paths.addAll(getAllFilePaths(fs, next, filter));
-            }
-        }
-        return paths;
-    }
-
-
-    public static void decompressPath(final FileSystem fs, final String in, final String out, final String compressedFileSuffix, final boolean deletePrevious) throws IOException {
-        final Path inPath = new Path(in);
-
-        if (fs.isFile(inPath))
-            HDFSTools.decompressFile(fs, in, out, deletePrevious);
-        else {
-            final Path outPath = new Path(out);
-            if (!fs.exists(outPath))
-                fs.mkdirs(outPath);
-            for (final Path path : FileUtil.stat2Paths(fs.globStatus(new Path(in + FORWARD_ASTERISK)))) {
-                if (path.getName().endsWith(compressedFileSuffix))
-                    HDFSTools.decompressFile(fs, path.toString(), outPath.toString() + FORWARD_SLASH + path.getName().split("\\.")[0], deletePrevious);
-            }
-        }
-    }
-
-    public static void decompressFile(final FileSystem fs, final String inFile, final String outFile, boolean deletePrevious) throws IOException {
-        final Path inPath = new Path(inFile);
-        final Path outPath = new Path(outFile);
-        final CompressionCodecFactory factory = new CompressionCodecFactory(new Configuration());
-        final CompressionCodec codec = factory.getCodec(inPath);
-        final OutputStream out = fs.create(outPath);
-        final InputStream in = codec.createInputStream(fs.open(inPath));
-        IOUtils.copyBytes(in, out, 8192);
-        IOUtils.closeStream(in);
-        IOUtils.closeStream(out);
-
-        if (deletePrevious)
-            fs.delete(new Path(inFile), true);
-
-    }
-
-    public static boolean globDelete(final FileSystem fs, final String path, final boolean recursive) throws IOException {
-        boolean deleted = false;
-        for (final Path p : FileUtil.stat2Paths(fs.globStatus(new Path(path)))) {
-            fs.delete(p, recursive);
-            deleted = true;
-        }
-        return deleted;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
deleted file mode 100644
index 59a4d2c..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopEdgeIterator.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.tinkerpop.gremlin.structure.Direction;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HadoopEdgeIterator extends HadoopElementIterator<Edge> {
-
-    private Iterator<Edge> edgeIterator = Collections.emptyIterator();
-
-    public HadoopEdgeIterator(final HadoopGraph graph) throws IOException {
-        super(graph);
-    }
-
-    @Override
-    public Edge next() {
-        try {
-            while (true) {
-                if (this.edgeIterator.hasNext())
-                    return new HadoopEdge(this.edgeIterator.next(), this.graph);
-                if (this.readers.isEmpty())
-                    throw FastNoSuchElementException.instance();
-                if (this.readers.peek().nextKeyValue()) {
-                    this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
-                } else {
-                    this.readers.remove();
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public boolean hasNext() {
-        try {
-            while (true) {
-                if (this.edgeIterator.hasNext())
-                    return true;
-                if (this.readers.isEmpty())
-                    return false;
-                if (this.readers.peek().nextKeyValue()) {
-                    this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
-                } else {
-                    this.readers.remove();
-                }
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e.getMessage(), e);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
deleted file mode 100644
index 45f3c55..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopElementIterator.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.InputSplit;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.structure.Element;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.UUID;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public abstract class HadoopElementIterator<E extends Element> implements Iterator<E> {
-
-    protected final HadoopGraph graph;
-    protected final Queue<RecordReader<NullWritable, VertexWritable>> readers = new LinkedList<>();
-
-    public HadoopElementIterator(final HadoopGraph graph) throws IOException {
-        try {
-            this.graph = graph;
-            final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
-            final InputFormat<NullWritable, VertexWritable> inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance();
-            if (inputFormat instanceof FileInputFormat) {
-                if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
-                    return; // there is no input location and thus, no data (empty graph)
-                if (!FileSystem.get(configuration).exists(new Path(this.graph.configuration().getInputLocation())))
-                    return; // there is no data at the input location (empty graph)
-                configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, this.graph.configuration().getInputLocation());
-            }
-            final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1)));
-            for (final InputSplit split : splits) {
-                this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
deleted file mode 100644
index 8f13c59..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HadoopVertexIterator.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.InputFormat;
-
-import java.io.IOException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HadoopVertexIterator extends HadoopElementIterator<Vertex> {
-
-    private HadoopVertex nextVertex = null;
-
-    public HadoopVertexIterator(final HadoopGraph graph) throws IOException {
-        super(graph);
-    }
-
-    @Override
-    public Vertex next() {
-        try {
-            if (this.nextVertex != null) {
-                final Vertex temp = this.nextVertex;
-                this.nextVertex = null;
-                return temp;
-            } else {
-                while (!this.readers.isEmpty()) {
-                    if (this.readers.peek().nextKeyValue())
-                        return new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
-                    else
-                        this.readers.remove();
-                }
-            }
-            throw FastNoSuchElementException.instance();
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public boolean hasNext() {
-        try {
-            if (null != this.nextVertex) return true;
-            else {
-                while (!this.readers.isEmpty()) {
-                    if (this.readers.peek().nextKeyValue()) {
-                        this.nextVertex = new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
-                        return true;
-                    } else
-                        this.readers.remove();
-                }
-            }
-        } catch (final Exception e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-        return false;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java
deleted file mode 100644
index 4ea0958..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/HiddenFileFilter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.PathFilter;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class HiddenFileFilter implements PathFilter {
-
-    private static final HiddenFileFilter INSTANCE = new HiddenFileFilter();
-
-    private HiddenFileFilter() {
-
-    }
-
-    @Override
-    public boolean accept(final Path path) {
-        final String name = path.getName();
-        return !name.startsWith("_") && !name.startsWith(".");
-    }
-
-    public static HiddenFileFilter instance() {
-        return INSTANCE;
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java
deleted file mode 100644
index c8424f6..0000000
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/hdfs/TextIterator.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.hadoop.structure.hdfs;
-
-import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.Queue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TextIterator implements Iterator<String> {
-
-    private String line;
-    private boolean available = false;
-    private final Queue<BufferedReader> readers = new LinkedList<>();
-
-    public TextIterator(final Configuration configuration, final Path path) throws IOException {
-        final FileSystem fs = FileSystem.get(configuration);
-        for (final FileStatus status : fs.listStatus(path, HiddenFileFilter.instance())) {
-            this.readers.add(new BufferedReader(new InputStreamReader(fs.open(status.getPath()))));
-        }
-    }
-
-    @Override
-    public boolean hasNext() {
-        try {
-            if (this.available) {
-                return true;
-            } else {
-                while (true) {
-                    if (this.readers.isEmpty())
-                        return false;
-                    if ((this.line = this.readers.peek().readLine()) != null) {
-                        this.available = true;
-                        return true;
-                    } else
-                        this.readers.remove();
-                }
-            }
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-
-    @Override
-    public String next() {
-        try {
-            if (this.available) {
-                this.available = false;
-                return this.line;
-            } else {
-                while (true) {
-                    if (this.readers.isEmpty())
-                        throw FastNoSuchElementException.instance();
-                    if ((this.line = this.readers.peek().readLine()) != null) {
-                        return this.line;
-                    } else
-                        this.readers.remove();
-                }
-            }
-        } catch (final IOException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
index 56dfe52..5d3995c 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -19,6 +19,7 @@
 
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
+import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -26,16 +27,20 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.Writable;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.TextIterator;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -48,13 +53,31 @@ public final class FileSystemStorage implements Storage {
 
     private static final String SPACE = " ";
     private static final String D_SPACE = "(D) ";
+    private static final String FORWARD_SLASH = "/";
+    private static final String FORWARD_ASTERISK = "/*";
 
     private final FileSystem fs;
 
-    public FileSystemStorage(final FileSystem fileSystem) {
+    private FileSystemStorage(final FileSystem fileSystem) {
         this.fs = fileSystem;
     }
 
+    public static FileSystemStorage open() {
+        return FileSystemStorage.open(new Configuration());
+    }
+
+    public static FileSystemStorage open(final Configuration configuration) {
+        try {
+            return new FileSystemStorage(FileSystem.get(configuration));
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    public static FileSystemStorage open(final FileSystem fileSystem) {
+        return new FileSystemStorage(fileSystem);
+    }
+
     private static String fileStatusString(final FileStatus status) {
         StringBuilder s = new StringBuilder();
         s.append(status.getPermission()).append(" ");
@@ -113,7 +136,7 @@ public final class FileSystemStorage implements Storage {
     @Override
     public boolean rm(final String location) {
         try {
-            return HDFSTools.globDelete(this.fs, location, false);
+            return FileSystemStorage.globDelete(this.fs, location, false);
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -122,35 +145,53 @@ public final class FileSystemStorage implements Storage {
     @Override
     public boolean rmr(final String location) {
         try {
-            return HDFSTools.globDelete(this.fs, location, true);
+            return FileSystemStorage.globDelete(this.fs, location, true);
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     @Override
-    public <V> Iterator<V> head(final String location, final int totalLines, final Class<V> objectClass) {
-        return headMaker(this.fs, location, totalLines, (Class<? extends Writable>) objectClass);
+    public Iterator<String> head(final String location, final int totalLines) {
+        try {
+            return IteratorUtils.limit((Iterator) new TextIterator(fs.getConf(), new Path(location)), totalLines);
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
     }
 
     @Override
-    public String toString() {
-        return StringFactory.storageString(this.fs.toString());
+    public Iterator<Vertex> headGraph(final String location, final int totalLines, final Class parserClass) {
+        final org.apache.commons.configuration.Configuration configuration = new BaseConfiguration();
+        configuration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, Constants.getSearchGraphLocation(location, this).get());
+        configuration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, parserClass.getCanonicalName());
+        try {
+            if (InputFormat.class.isAssignableFrom(parserClass))
+                return IteratorUtils.limit(new HadoopVertexIterator(HadoopGraph.open(configuration)), totalLines);
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+        throw new IllegalArgumentException("The provided parser class must be an " + InputFormat.class.getCanonicalName() + ": " + parserClass.getCanonicalName());
+
     }
 
-    private static Iterator headMaker(final FileSystem fs, final String path, final int totalLines, final Class<? extends Writable> writableClass) {
+    @Override
+    public <K, V> Iterator<KeyValue<K, V>> headMemory(final String location, final String memoryKey, final int totalLines, final Class parserClass) {
+        if (!parserClass.equals(SequenceFileInputFormat.class))
+            throw new IllegalArgumentException("Only " + SequenceFileInputFormat.class.getCanonicalName() + " memories are supported");
+        final Configuration configuration = new Configuration();
         try {
-            if (writableClass.equals(ObjectWritable.class))
-                return IteratorUtils.limit(new ObjectWritableIterator(fs.getConf(), new Path(path)), totalLines);
-            else if (writableClass.equals(VertexWritable.class))
-                return IteratorUtils.limit(new VertexWritableIterator(fs.getConf(), new Path(path)), totalLines);
-            else
-                return IteratorUtils.limit(new TextIterator(fs.getConf(), new Path(path)), totalLines);
+            return IteratorUtils.limit((Iterator) new ObjectWritableIterator(configuration, new Path(Constants.getMemoryLocation(location, memoryKey))), totalLines);
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
+    @Override
+    public String toString() {
+        return StringFactory.storageString(this.fs.toString());
+    }
+
     /////////
 
     public void copyToLocal(final String fromLocation, final String toLocation) {
@@ -173,7 +214,7 @@ public final class FileSystemStorage implements Storage {
         try {
             final FileSystem local = FileSystem.getLocal(new Configuration());
             final FSDataOutputStream outA = local.create(new Path(toLocation));
-            for (final Path path : HDFSTools.getAllFilePaths(fs, new Path(fromLocation), HiddenFileFilter.instance())) {
+            for (final Path path : FileSystemStorage.getAllFilePaths(fs, new Path(fromLocation), HiddenFileFilter.instance())) {
                 final FSDataInputStream inA = fs.open(path);
                 IOUtils.copyBytes(inA, outA, 8192);
                 inA.close();
@@ -183,4 +224,31 @@ public final class FileSystemStorage implements Storage {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
+
+    ////////////
+
+    private static boolean globDelete(final FileSystem fs, final String path, final boolean recursive) throws IOException {
+        boolean deleted = false;
+        for (final Path p : FileUtil.stat2Paths(fs.globStatus(new Path(path)))) {
+            fs.delete(p, recursive);
+            deleted = true;
+        }
+        return deleted;
+    }
+
+    private static List<Path> getAllFilePaths(final FileSystem fs, Path path, final PathFilter filter) throws IOException {
+        if (null == path) path = fs.getHomeDirectory();
+        if (path.toString().equals(FORWARD_SLASH)) path = new Path("");
+
+        final List<Path> paths = new ArrayList<Path>();
+        if (fs.isFile(path))
+            paths.add(path);
+        else {
+            for (final FileStatus status : fs.globStatus(new Path(path + FORWARD_ASTERISK), filter)) {
+                final Path next = status.getPath();
+                paths.addAll(getAllFilePaths(fs, next, filter));
+            }
+        }
+        return paths;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java
new file mode 100644
index 0000000..8f5452f
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopEdgeIterator.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopEdge;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class HadoopEdgeIterator extends HadoopElementIterator<Edge> {
+
+    private Iterator<Edge> edgeIterator = Collections.emptyIterator();
+
+    public HadoopEdgeIterator(final HadoopGraph graph) throws IOException {
+        super(graph);
+    }
+
+    @Override
+    public Edge next() {
+        try {
+            while (true) {
+                if (this.edgeIterator.hasNext())
+                    return new HadoopEdge(this.edgeIterator.next(), this.graph);
+                if (this.readers.isEmpty())
+                    throw FastNoSuchElementException.instance();
+                if (this.readers.peek().nextKeyValue()) {
+                    this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
+                } else {
+                    this.readers.remove().close();
+                }
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        try {
+            while (true) {
+                if (this.edgeIterator.hasNext())
+                    return true;
+                if (this.readers.isEmpty())
+                    return false;
+                if (this.readers.peek().nextKeyValue()) {
+                    this.edgeIterator = this.readers.peek().getCurrentValue().get().edges(Direction.OUT);
+                } else {
+                    this.readers.remove().close();
+                }
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
new file mode 100644
index 0000000..9196ce3
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class HadoopElementIterator<E extends Element> implements Iterator<E> {
+
+    protected final HadoopGraph graph;
+    protected final Queue<RecordReader<NullWritable, VertexWritable>> readers = new LinkedList<>();
+
+    public HadoopElementIterator(final HadoopGraph graph) {
+        try {
+            this.graph = graph;
+            final Configuration configuration = ConfUtil.makeHadoopConfiguration(this.graph.configuration());
+            final InputFormat<NullWritable, VertexWritable> inputFormat = this.graph.configuration().getGraphInputFormat().getConstructor().newInstance();
+            if (inputFormat instanceof FileInputFormat) {
+                final Storage storage = FileSystemStorage.open(configuration);
+
+                if (!this.graph.configuration().containsKey(Constants.GREMLIN_HADOOP_INPUT_LOCATION))
+                    return; // there is no input location and thus, no data (empty graph)
+                if (!Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).isPresent())
+                    return; // there is no data at the input location (empty graph)
+                configuration.set(Constants.MAPREDUCE_INPUT_FILEINPUTFORMAT_INPUTDIR, Constants.getSearchGraphLocation(this.graph.configuration().getInputLocation(), storage).get());
+            }
+            final List<InputSplit> splits = inputFormat.getSplits(new JobContextImpl(configuration, new JobID(UUID.randomUUID().toString(), 1)));
+            for (final InputSplit split : splits) {
+                this.readers.add(inputFormat.createRecordReader(split, new TaskAttemptContextImpl(configuration, new TaskAttemptID())));
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java
new file mode 100644
index 0000000..45b0cad
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopVertexIterator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopVertex;
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.io.IOException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class HadoopVertexIterator extends HadoopElementIterator<Vertex> {
+
+    private HadoopVertex nextVertex = null;
+
+    public HadoopVertexIterator(final HadoopGraph graph) throws IOException {
+        super(graph);
+    }
+
+    @Override
+    public Vertex next() {
+        try {
+            if (this.nextVertex != null) {
+                final Vertex temp = this.nextVertex;
+                this.nextVertex = null;
+                return temp;
+            } else {
+                while (!this.readers.isEmpty()) {
+                    if (this.readers.peek().nextKeyValue())
+                        return new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
+                    else
+                        this.readers.remove().close();
+                }
+            }
+            throw FastNoSuchElementException.instance();
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        try {
+            if (null != this.nextVertex) return true;
+            else {
+                while (!this.readers.isEmpty()) {
+                    if (this.readers.peek().nextKeyValue()) {
+                        this.nextVertex = new HadoopVertex(this.readers.peek().getCurrentValue().get(), this.graph);
+                        return true;
+                    } else
+                        this.readers.remove().close();
+                }
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+        return false;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java
new file mode 100644
index 0000000..8b01571
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HiddenFileFilter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class HiddenFileFilter implements PathFilter {
+
+    private static final HiddenFileFilter INSTANCE = new HiddenFileFilter();
+
+    private HiddenFileFilter() {
+
+    }
+
+    @Override
+    public boolean accept(final Path path) {
+        final String name = path.getName();
+        return !name.startsWith("_") && !name.startsWith(".");
+    }
+
+    public static HiddenFileFilter instance() {
+        return INSTANCE;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
index 60bf930..676ca07 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableIterator.java
@@ -19,11 +19,10 @@
 package org.apache.tinkerpop.gremlin.hadoop.structure.io;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HDFSTools;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
 import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
 import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 
@@ -43,9 +42,8 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
     private final Queue<SequenceFile.Reader> readers = new LinkedList<>();
 
     public ObjectWritableIterator(final Configuration configuration, final Path path) throws IOException {
-        final FileSystem fs = FileSystem.get(configuration);
-        for (final Path path2 : HDFSTools.getAllFilePaths(fs, path, HiddenFileFilter.instance())) {
-            this.readers.add(new SequenceFile.Reader(configuration, SequenceFile.Reader.file(path2)));
+        for (final FileStatus status : FileSystem.get(configuration).listStatus(path, HiddenFileFilter.instance())) {
+            this.readers.add(new SequenceFile.Reader(configuration, SequenceFile.Reader.file(status.getPath())));
         }
     }
 
@@ -62,7 +60,7 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
                         this.available = true;
                         return true;
                     } else
-                        this.readers.remove();
+                        this.readers.remove().close();
                 }
             }
         } catch (final IOException e) {
@@ -83,7 +81,7 @@ public final class ObjectWritableIterator implements Iterator<KeyValue> {
                     if (this.readers.peek().next(this.key, this.value)) {
                         return new KeyValue<>(this.key.get(), this.value.get());
                     } else
-                        this.readers.remove();
+                        this.readers.remove().close();
                 }
             }
         } catch (final IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java
new file mode 100644
index 0000000..ecb6ede
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/TextIterator.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TextIterator implements Iterator<String> {
+
+    private String line;
+    private boolean available = false;
+    private final Queue<BufferedReader> readers = new LinkedList<>();
+
+    public TextIterator(final Configuration configuration, final Path path) throws IOException {
+        final FileSystem fs = FileSystem.get(configuration);
+        for (final FileStatus status : fs.listStatus(path, HiddenFileFilter.instance())) {
+            this.readers.add(new BufferedReader(new InputStreamReader(fs.open(status.getPath()))));
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        try {
+            if (this.available) {
+                return true;
+            } else {
+                while (true) {
+                    if (this.readers.isEmpty())
+                        return false;
+                    if ((this.line = this.readers.peek().readLine()) != null) {
+                        this.available = true;
+                        return true;
+                    } else
+                        this.readers.remove().close();
+                }
+            }
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public String next() {
+        try {
+            if (this.available) {
+                this.available = false;
+                return this.line;
+            } else {
+                while (true) {
+                    if (this.readers.isEmpty())
+                        throw FastNoSuchElementException.instance();
+                    if ((this.line = this.readers.peek().readLine()) != null) {
+                        return this.line;
+                    } else
+                        this.readers.remove().close();
+                }
+            }
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
index daaffb8..d3e1fd0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.SequenceFile;
-import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
 import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 
@@ -42,9 +41,8 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
     private final Queue<SequenceFile.Reader> readers = new LinkedList<>();
 
     public VertexWritableIterator(final Configuration configuration, final Path path) throws IOException {
-        final FileSystem fs = FileSystem.get(configuration);
-        for (final FileStatus status : fs.listStatus(path, HiddenFileFilter.instance())) {
-            this.readers.add(new SequenceFile.Reader(fs, status.getPath(), configuration));
+        for (final FileStatus status : FileSystem.get(configuration).listStatus(path, HiddenFileFilter.instance())) {
+            this.readers.add(new SequenceFile.Reader(configuration, SequenceFile.Reader.file(status.getPath())));
         }
     }
 
@@ -61,7 +59,7 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
                         this.available = true;
                         return true;
                     } else
-                        this.readers.remove();
+                        this.readers.remove().close();
                 }
             }
         } catch (final IOException e) {
@@ -82,7 +80,7 @@ public final class VertexWritableIterator implements Iterator<Vertex> {
                     if (this.readers.peek().next(this.value)) {
                         return this.value.get();
                     } else
-                        this.readers.remove();
+                        this.readers.remove().close();
                 }
             }
         } catch (final IOException e) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
new file mode 100644
index 0000000..a8c5307
--- /dev/null
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/FileSystemStorageCheck.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin;
+
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.hadoop.Constants;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
+import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
+import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class FileSystemStorageCheck extends AbstractGremlinTest {
+
+    @Test
+    @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
+    public void shouldPersistGraphAndMemory() throws Exception {
+        final ComputerResult result = graph.compute(graphComputerClass.get()).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
+        /////
+        final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration()));
+        // TEST GRAPH PERSISTENCE
+        assertTrue(storage.exists(Constants.getGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))));
+        assertEquals(6, result.graph().traversal().V().count().next().longValue());
+        assertEquals(0, result.graph().traversal().E().count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
+        assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
+        assertEquals(2, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).dedup().count().next().longValue());
+        /////
+        // TEST MEMORY PERSISTENCE
+        assertEquals(2, (int) result.memory().get("clusterCount"));
+        assertTrue(storage.exists(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")));
+        assertEquals(1, IteratorUtils.count(storage.headMemory(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class)));
+        assertEquals(2, storage.headMemory(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount", SequenceFileInputFormat.class).next().getValue());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
deleted file mode 100644
index d47ce43..0000000
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/GraphMemoryHDFSCheck.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.hadoop.groovy.plugin;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.tinkerpop.gremlin.AbstractGremlinTest;
-import org.apache.tinkerpop.gremlin.LoadGraphWith;
-import org.apache.tinkerpop.gremlin.hadoop.Constants;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
-import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
-import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.ClusterCountMapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
-import org.apache.tinkerpop.gremlin.structure.io.Storage;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class GraphMemoryHDFSCheck extends AbstractGremlinTest {
-
-    @Test
-    @LoadGraphWith(LoadGraphWith.GraphData.MODERN)
-    public void shouldPersistGraphAndMemory() throws Exception {
-        final ComputerResult result = graph.compute(graphComputerClass.get()).program(PeerPressureVertexProgram.build().create(graph)).mapReduce(ClusterCountMapReduce.build().memoryKey("clusterCount").create()).submit().get();
-        /////
-        final Storage storage = new FileSystemStorage(FileSystem.get(ConfUtil.makeHadoopConfiguration(graph.configuration())));
-        // TEST GRAPH PERSISTENCE
-        assertTrue(storage.exists(Constants.getGraphLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))));
-        assertEquals(6, result.graph().traversal().V().count().next().longValue());
-        assertEquals(0, result.graph().traversal().E().count().next().longValue());
-        assertEquals(6, result.graph().traversal().V().values("name").count().next().longValue());
-        assertEquals(6, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).count().next().longValue());
-        assertEquals(2, result.graph().traversal().V().values(PeerPressureVertexProgram.CLUSTER).dedup().count().next().longValue());
-        /////
-        // TEST MEMORY PERSISTENCE
-        assertEquals(2, (int) result.memory().get("clusterCount"));
-        assertTrue(storage.exists(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")));
- //       System.out.println(IteratorUtils.list(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
-//        assertEquals(1, IteratorUtils.count(storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount"))));
-        assertEquals(2, storage.head(Constants.getMemoryLocation(graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), "clusterCount")).next());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
index 4108e35..b558169 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
@@ -137,7 +137,7 @@ public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
         AbstractGremlinProcessTest.checkResults(Arrays.asList("ripple", "lop"), traversal);
         assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/m')"));
         assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
-        final List<KeyValue<Integer, Collection<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/m',ObjectWritable)"));
+        final List<KeyValue<Integer, Collection<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.headMemory('target/test-output','m',SequenceFileInputFormat)"));
         assertEquals(4, mList.size());
         mList.forEach(keyValue -> {
             if (keyValue.getKey().equals(29))
@@ -151,7 +151,7 @@ public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
             else
                 throw new IllegalStateException("The provided key/value is unknown: " + keyValue);
         });
-        final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/" + TraverserMapReduce.TRAVERSERS + "',ObjectWritable)"));
+        final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.headMemory('target/test-output/'," + "'" + TraverserMapReduce.TRAVERSERS + "',SequenceFileInputFormat)"));
         assertEquals(2, traversersList.size());
         traversersList.forEach(keyValue -> {
             assertEquals(MapReduce.NullObject.instance(), keyValue.getKey());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
index 7dc8143..16d654f 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopPluginSuite.java
@@ -29,6 +29,6 @@ import org.junit.runners.model.RunnerBuilder;
  */
 public class HadoopPluginSuite extends AbstractGremlinSuite {
     public HadoopPluginSuite(final Class<?> klass, final RunnerBuilder builder) throws InitializationError {
-        super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class, GraphMemoryHDFSCheck.class}, true, TraversalEngine.Type.COMPUTER);
+        super(klass, builder, new Class<?>[]{HadoopGremlinPluginCheck.class, FileSystemStorageCheck.class}, new Class<?>[]{HadoopGremlinPluginCheck.class, FileSystemStorageCheck.class}, true, TraversalEngine.Type.COMPUTER);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
index fcb234c..a7e333c 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/groovy/plugin/SparkGremlinPlugin.java
@@ -42,7 +42,6 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
     protected static final Set<String> IMPORTS = new HashSet<String>() {{
         add(IMPORT_SPACE + SparkGraphComputer.class.getPackage().getName() + DOT_STAR);
         add(IMPORT_SPACE + Spark.class.getPackage().getName() + DOT_STAR);
-        add(IMPORT_SPACE + SparkContextStorage.class.getPackage().getName() + DOT_STAR);
     }};
 
     @Override
@@ -54,7 +53,7 @@ public final class SparkGremlinPlugin extends AbstractGremlinPlugin {
     public void afterPluginTo(final PluginAcceptor pluginAcceptor) throws PluginInitializationException, IllegalEnvironmentException {
         pluginAcceptor.addImports(IMPORTS);
         try {
-            pluginAcceptor.eval("spark = SparkContextStorage.open()");
+            pluginAcceptor.addBinding("spark", SparkContextStorage.open());
         } catch (final Exception e) {
             throw new PluginInitializationException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 03b3016..f96fd15 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -239,6 +239,10 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                 // unpersist the graphRDD if it will no longer be used
                 if (!PersistedOutputRDD.class.equals(hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_OUTPUT_RDD, null)) || this.persist.equals(GraphComputer.Persist.NOTHING)) {
                     graphRDD.unpersist();
+                    if (apacheConfiguration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)) {
+                        Spark.removeRDD(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
+                        Spark.removeRDD(Constants.getGraphLocation(apacheConfiguration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
+                    }
                 }
                 // update runtime and return the newly computed graph
                 finalMemory.setRuntime(System.currentTimeMillis() - startTime);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b4d8e960/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
index 5cb9edf..b78caa9 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/PersistedOutputRDD.java
@@ -46,6 +46,7 @@ public final class PersistedOutputRDD implements OutputRDD {
             throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
         final String graphRDDName = Constants.getGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
         Spark.removeRDD(graphRDDName);  // this might be bad cause it unpersists the job RDD
+        Constants.getSearchGraphLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), SparkContextStorage.open(configuration)).ifPresent(Spark::removeRDD);  // this might be bad cause it unpersists the job RDD
         if (!configuration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true))
             graphRDD.mapValues(vertex -> {
                 vertex.get().dropEdges();
@@ -62,7 +63,7 @@ public final class PersistedOutputRDD implements OutputRDD {
             LOGGER.warn("The SparkContext should be persisted in order for the RDD to persist across jobs. To do so, set " + Constants.GREMLIN_SPARK_PERSIST_CONTEXT + " to true");
         if (!configuration.containsKey(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))
             throw new IllegalArgumentException("There is no provided " + Constants.GREMLIN_HADOOP_OUTPUT_LOCATION + " to write the persisted RDD to");
-        final String sideEffectRDDName = Constants.getMemoryLocation(configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey);
+        final String sideEffectRDDName = configuration.getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + memoryKey;
         Spark.removeRDD(sideEffectRDDName);
         memoryRDD.setName(sideEffectRDDName).cache();
         return IteratorUtils.map(memoryRDD.toLocalIterator(), tuple -> new KeyValue<>(tuple._1(), tuple._2()));