You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/08 15:20:32 UTC

[07/17] incubator-tinkerpop git commit: migrated GiraphGraphComputer over to the new Storage model via FileSystemStorage for HDFS.

migrated GiraphGraphComputer over to the new Storage model via FileSystemStorage for HDFS.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/74b9c8ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/74b9c8ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/74b9c8ec

Branch: refs/heads/master
Commit: 74b9c8ecfe787ead99d79c127fd85a4fccd926ec
Parents: 3fff8f5
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 9 18:27:29 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 9 18:27:29 2015 -0700

----------------------------------------------------------------------
 .../process/computer/GiraphGraphComputer.java   | 36 +++++++++-----------
 .../process/computer/util/MapReduceHelper.java  |  8 ++---
 .../hadoop/structure/io/FileSystemStorage.java  |  4 ++-
 .../hadoop/structure/io/InputOutputHelper.java  |  2 +-
 .../groovy/plugin/HadoopGremlinPluginCheck.java | 26 ++------------
 .../process/computer/SparkGraphComputer.java    |  1 +
 .../gremlin/spark/structure/Spark.java          |  2 ++
 .../spark/structure/io/SparkContextStorage.java |  2 +-
 8 files changed, 31 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
----------------------------------------------------------------------
diff --git a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
index 646b707..7e3de5e 100644
--- a/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
+++ b/giraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/giraph/process/computer/GiraphGraphComputer.java
@@ -41,6 +41,7 @@ import org.apache.tinkerpop.gremlin.hadoop.process.computer.AbstractHadoopGraphC
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.ComputerSubmissionHelper;
 import org.apache.tinkerpop.gremlin.hadoop.process.computer.util.MapReduceHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.InputOutputHelper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator;
@@ -52,6 +53,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
 import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
 import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
+import org.apache.tinkerpop.gremlin.structure.io.Storage;
 import org.apache.tinkerpop.gremlin.util.Gremlin;
 
 import java.io.File;
@@ -126,7 +128,6 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
             try {
                 final FileSystem fs = FileSystem.get(this.giraphConfiguration);
                 this.loadJars(fs);
-                fs.delete(new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)), true);
                 ToolRunner.run(this, new String[]{});
             } catch (final Exception e) {
                 //e.printStackTrace();
@@ -140,6 +141,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
 
     @Override
     public int run(final String[] args) {
+        final Storage storage = FileSystemStorage.open(this.giraphConfiguration);
+        storage.rmr(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION));
         this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, this.persist.equals(Persist.EDGES));
         try {
             // it is possible to run graph computer without a vertex program (and thus, only map reduce jobs if they exist)
@@ -170,13 +173,10 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 }
                 // handle input paths (if any)
                 if (FileInputFormat.class.isAssignableFrom(this.giraphConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class))) {
-                    final Path inputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
-                    if (!FileSystem.get(this.giraphConfiguration).exists(inputPath))  // TODO: what about when the input is not a file input?
-                        throw new IllegalArgumentException("The provided input path does not exist: " + inputPath);
-                    FileInputFormat.setInputPaths(job.getInternalJob(), inputPath);
+                    FileInputFormat.setInputPaths(job.getInternalJob(), Constants.getSearchGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION), storage).get());
                 }
                 // handle output paths
-                final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
+                final Path outputPath = new Path(Constants.getGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
                 FileOutputFormat.setOutputPath(job.getInternalJob(), outputPath);
                 job.getInternalJob().setJarByClass(GiraphGraphComputer.class);
                 this.logger.info(Constants.GREMLIN_HADOOP_GIRAPH_JOB_PREFIX + this.vertexProgram);
@@ -184,17 +184,18 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
                 if (!job.run(true))
                     throw new IllegalStateException("The GiraphGraphComputer job failed -- aborting all subsequent MapReduce jobs");  // how do I get the exception that occured?
                 // add vertex program memory values to the return memory
-                for (final String key : this.vertexProgram.getMemoryComputeKeys()) {
-                    final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + key);
-                    final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, path);
-                    if (iterator.hasNext()) {
-                        this.memory.set(key, iterator.next().getValue());
+                for (final String memoryKey : this.vertexProgram.getMemoryComputeKeys()) {
+                    if (storage.exists(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey))) {
+                        final ObjectWritableIterator iterator = new ObjectWritableIterator(this.giraphConfiguration, new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey)));
+                        if (iterator.hasNext()) {
+                            this.memory.set(memoryKey, iterator.next().getValue());
+                        }
+                        storage.rmr(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), memoryKey));
                     }
-                    FileSystem.get(this.giraphConfiguration).delete(path, true);
                 }
-                final Path path = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_ITERATION);
+                final Path path = new Path(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));
                 this.memory.setIteration((Integer) new ObjectWritableIterator(this.giraphConfiguration, path).next().getValue());
-                FileSystem.get(this.giraphConfiguration).delete(path, true);
+                storage.rmr(Constants.getMemoryLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), Constants.HIDDEN_ITERATION));
             }
             // do map reduce jobs
             this.giraphConfiguration.setBoolean(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT_HAS_EDGES, this.giraphConfiguration.getBoolean(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT_HAS_EDGES, true));
@@ -204,11 +205,8 @@ public final class GiraphGraphComputer extends AbstractHadoopGraphComputer imple
             }
 
             // if no persistence, delete the map reduce output
-            if (this.persist.equals(Persist.NOTHING)) {
-                final Path outputPath = new Path(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G);
-                if (FileSystem.get(this.giraphConfiguration).exists(outputPath))      // TODO: what about when the output is not a file output?
-                    FileSystem.get(this.giraphConfiguration).delete(outputPath, true);
-            }
+            if (this.persist.equals(Persist.NOTHING))
+                storage.rmr(Constants.getGraphLocation(this.giraphConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
         } catch (final Exception e) {
             throw new IllegalStateException(e.getMessage(), e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index 4c92abe..6e0cd9e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -65,7 +65,7 @@ public final class MapReduceHelper {
         mapReduce.storeState(apacheConfiguration);
         ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, newConfiguration);
         if (!mapReduce.doStage(MapReduce.Stage.MAP)) {
-            final Path memoryPath = new Path(configuration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + mapReduce.getMemoryKey());
+            final Path memoryPath = new Path(Constants.getMemoryLocation(configuration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), mapReduce.getMemoryKey()));
             mapReduce.addResultToMemory(memory, new ObjectWritableIterator(configuration, memoryPath));
         } else {
             final Optional<Comparator<?>> mapSort = mapReduce.getMapKeySort();
@@ -100,9 +100,9 @@ public final class MapReduceHelper {
             job.setOutputFormatClass(SequenceFileOutputFormat.class);
             // if there is no vertex program, then grab the graph from the input location
             final Path graphPath = vertexProgramExists ?
-                    new Path(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.HIDDEN_G) :
+                    new Path(Constants.getGraphLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION))) :
                     new Path(newConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
-            Path memoryPath = new Path(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + (reduceSort.isPresent() ? mapReduce.getMemoryKey() + "-temp" : mapReduce.getMemoryKey()));
+            Path memoryPath = new Path(Constants.getMemoryLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), (reduceSort.isPresent() ? mapReduce.getMemoryKey() + "-temp" : mapReduce.getMemoryKey())));
             if (FileSystem.get(newConfiguration).exists(memoryPath)) {
                 FileSystem.get(newConfiguration).delete(memoryPath, true);
             }
@@ -124,7 +124,7 @@ public final class MapReduceHelper {
                 reduceSortJob.setOutputFormatClass(SequenceFileOutputFormat.class);
                 reduceSortJob.setNumReduceTasks(1); // todo: is this necessary to ensure sorted order?
                 FileInputFormat.setInputPaths(reduceSortJob, memoryPath);
-                final Path sortedMemoryPath = new Path(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + mapReduce.getMemoryKey());
+                final Path sortedMemoryPath = new Path(Constants.getMemoryLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), mapReduce.getMemoryKey()));
                 FileOutputFormat.setOutputPath(reduceSortJob, sortedMemoryPath);
                 reduceSortJob.waitForCompletion(true);
                 FileSystem.get(newConfiguration).delete(memoryPath, true); // delete the temporary memory path

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
index b34f7a3..330227e 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -154,7 +154,7 @@ public final class FileSystemStorage implements Storage {
     @Override
     public Iterator<String> head(final String location, final int totalLines) {
         try {
-            return IteratorUtils.limit((Iterator) new TextIterator(fs.getConf(), new Path(location)), totalLines);
+            return IteratorUtils.limit((Iterator) new TextIterator(this.fs.getConf(), new Path(location)), totalLines);
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -228,6 +228,8 @@ public final class FileSystemStorage implements Storage {
     ////////////
 
     private static boolean globDelete(final FileSystem fs, final String path, final boolean recursive) throws IOException {
+        if (!fs.exists(new Path(path)))
+            return false;
         boolean deleted = false;
         for (final Path p : FileUtil.stat2Paths(fs.globStatus(new Path(path)))) {
             fs.delete(p, recursive);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
index 04097c1..48c2ad4 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/InputOutputHelper.java
@@ -77,7 +77,7 @@ public final class InputOutputHelper {
         final BaseConfiguration newConfiguration = new BaseConfiguration();
         newConfiguration.copy(hadoopConfiguration);
         if (resultGraph.equals(GraphComputer.ResultGraph.NEW)) {
-            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "/" + Constants.HIDDEN_G);
+            newConfiguration.setProperty(Constants.GREMLIN_HADOOP_INPUT_LOCATION, hadoopConfiguration.getOutputLocation());
             if (hadoopConfiguration.containsKey(Constants.GREMLIN_HADOOP_GRAPH_OUTPUT_FORMAT))
                 newConfiguration.setProperty(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputOutputHelper.getInputFormat(hadoopConfiguration.getGraphOutputFormat()).getCanonicalName());
             newConfiguration.setProperty(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION, hadoopConfiguration.getOutputLocation() + "_");

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
index 4e4353d..95c19ed 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopGremlinPluginCheck.java
@@ -135,30 +135,8 @@ public class HadoopGremlinPluginCheck extends AbstractGremlinTest {
         this.remote.connect(Arrays.asList("graph", "g"));
         Traversal<Vertex, String> traversal = (Traversal<Vertex, String>) this.remote.submit(Arrays.asList("g.V().hasLabel('person').group('m').by('age').by('name').out('knows').out('created').values('name')"));
         AbstractGremlinProcessTest.checkResults(Arrays.asList("ripple", "lop"), traversal);
-        assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/m')"));
-        assertTrue((Boolean) this.console.eval("hdfs.exists('target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
-        final List<KeyValue<Integer, Collection<String>>> mList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output','m',SequenceFileInputFormat)"));
-        assertEquals(4, mList.size());
-        mList.forEach(keyValue -> {
-            if (keyValue.getKey().equals(29))
-                assertTrue(keyValue.getValue().contains("marko"));
-            else if (keyValue.getKey().equals(35))
-                assertTrue(keyValue.getValue().contains("peter"));
-            else if (keyValue.getKey().equals(32))
-                assertTrue(keyValue.getValue().contains("josh"));
-            else if (keyValue.getKey().equals(27))
-                assertTrue(keyValue.getValue().contains("vadas"));
-            else
-                throw new IllegalStateException("The provided key/value is unknown: " + keyValue);
-        });
-        final List<KeyValue<MapReduce.NullObject, Traverser<String>>> traversersList = IteratorUtils.asList(this.console.eval("hdfs.head('target/test-output/'," + "'" + TraverserMapReduce.TRAVERSERS + "',SequenceFileInputFormat)"));
-        assertEquals(2, traversersList.size());
-        traversersList.forEach(keyValue -> {
-            assertEquals(MapReduce.NullObject.instance(), keyValue.getKey());
-            final String name = keyValue.getValue().get();
-            assertTrue(name.equals("ripple") || name.equals("lop"));
-        });
-        ////////////////
+        assertFalse((Boolean) this.console.eval("hdfs.exists('target/test-output/m')"));
+        assertFalse((Boolean) this.console.eval("hdfs.exists('target/test-output/" + TraverserMapReduce.TRAVERSERS + "')"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 3f7efaa..ecd9573 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.PropertiesConfiguration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.spark.SparkConf;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
index 2761efb..0bf679b 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/Spark.java
@@ -34,6 +34,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 
 /**
+ * This is a static cache the prevents Spark from garbage collecting unreferenced RDDs.
+ *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public class Spark {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/74b9c8ec/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
index 2ea0fc3..97a26f1 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorage.java
@@ -75,7 +75,7 @@ public final class SparkContextStorage implements Storage {
     @Override
     public List<String> ls(final String location) {
         final List<String> rdds = new ArrayList<>();
-        final String wildCardLocation = location.replace(".", "\\.").replace("*", ".*");
+        final String wildCardLocation = (location.endsWith("*") ? location : location + "*").replace(".", "\\.").replace("*", ".*");
         for (final RDD<?> rdd : Spark.getRDDs()) {
             if (rdd.name().matches(wildCardLocation))
                 rdds.add(rdd.name() + " [" + rdd.getStorageLevel().description() + "]");