You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/27 02:24:32 UTC

incubator-tinkerpop git commit: There was a bug around ls() and exists() behavior for 'directory'-based storage locations. I created a beautiful test case that ensures that both SparkContextStorage (spark()) and FileSystemStorage (hdfs()) behave the same

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master cebe3c917 -> c4bc256fb


There was a bug around ls() and exists() behavior for 'directory'-based storage locations. I created a beautiful test case that ensures that both SparkContextStorage (spark()) and FileSystemStorage (hdfs()) behave the same for their respective RDDs and files. Ran full integration tests. Everything OK. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/c4bc256f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/c4bc256f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/c4bc256f

Branch: refs/heads/master
Commit: c4bc256fbd0bcb24194e95174f57d0e3a71de3ed
Parents: cebe3c9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 26 18:24:13 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 26 18:24:28 2016 -0700

----------------------------------------------------------------------
 .../hadoop/structure/io/FileSystemStorage.java  | 36 +++++++-------
 .../structure/io/HadoopElementIterator.java     | 16 +++++--
 .../structure/io/AbstractStorageCheck.java      | 50 ++++++++++++++++++++
 .../structure/io/FileSystemStorageCheck.java    | 16 +++++++
 .../structure/io/SparkContextStorageCheck.java  | 14 ++++++
 5 files changed, 111 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c4bc256f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
index ed112f7..7fa43cd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorage.java
@@ -84,12 +84,16 @@ public final class FileSystemStorage implements Storage {
         s.append(status.getOwner()).append(SPACE);
         s.append(status.getGroup()).append(SPACE);
         s.append(status.getLen()).append(SPACE);
-        if (status.isDir())
+        if (status.isDirectory())
             s.append(D_SPACE);
         s.append(status.getPath().getName());
         return s.toString();
     }
 
+    private String tryHomeDirectory(final String location) {
+        return location.equals("/") ? this.fs.getHomeDirectory().toString() : location;
+    }
+
     @Override
     public List<String> ls() {
         return this.ls("/");
@@ -98,9 +102,9 @@ public final class FileSystemStorage implements Storage {
     @Override
     public List<String> ls(final String location) {
         try {
-            final String newLocation;
-            newLocation = location.equals("/") ? this.fs.getHomeDirectory().toString() : location;
-            return Stream.of(this.fs.globStatus(new Path(newLocation + "/*"))).map(FileSystemStorage::fileStatusString).collect(Collectors.toList());
+            return this.fs.isDirectory(new Path(tryHomeDirectory(location))) ?
+                    Stream.of(this.fs.globStatus(new Path(tryHomeDirectory(location) + "/*"))).map(FileSystemStorage::fileStatusString).collect(Collectors.toList()) :
+                    Stream.of(this.fs.globStatus(new Path(tryHomeDirectory(location) + "*"))).map(FileSystemStorage::fileStatusString).collect(Collectors.toList());
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -126,7 +130,7 @@ public final class FileSystemStorage implements Storage {
     @Override
     public boolean exists(final String location) {
         try {
-            return this.fs.exists(new Path(location));
+            return this.fs.globStatus(new Path(tryHomeDirectory(location) + "*")).length > 0;
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -135,7 +139,15 @@ public final class FileSystemStorage implements Storage {
     @Override
     public boolean rm(final String location) {
         try {
-            return FileSystemStorage.globDelete(this.fs, location, true);
+            final FileStatus[] statuses = this.fs.globStatus(new Path(tryHomeDirectory(location) + "*"));
+            Stream.of(statuses).forEach(status -> {
+                try {
+                    this.fs.delete(status.getPath(), true);
+                } catch (IOException e) {
+                    throw new IllegalStateException(e.getMessage(), e);
+                }
+            });
+            return statuses.length > 0;
         } catch (final IOException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
@@ -215,18 +227,6 @@ public final class FileSystemStorage implements Storage {
         }
     }
 
-    ////////////
-
-    private static boolean globDelete(final FileSystem fs, final String path, final boolean recursive) throws IOException {
-        if (!fs.exists(new Path(path)))
-            return false;
-        boolean deleted = false;
-        for (final Path p : FileUtil.stat2Paths(fs.globStatus(new Path(path)))) {
-            fs.delete(p, recursive);
-            deleted = true;
-        }
-        return deleted;
-    }
 
     private static List<Path> getAllFilePaths(final FileSystem fs, Path path, final PathFilter filter) throws IOException {
         if (null == path) path = fs.getHomeDirectory();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c4bc256f/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
index 9196ce3..ae371e0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/HadoopElementIterator.java
@@ -30,12 +30,11 @@ import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.HadoopGraph;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.FileSystemStorage;
-import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.io.Storage;
 
+import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -45,7 +44,7 @@ import java.util.UUID;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public abstract class HadoopElementIterator<E extends Element> implements Iterator<E> {
+public abstract class HadoopElementIterator<E extends Element> implements Iterator<E>, AutoCloseable {
 
     protected final HadoopGraph graph;
     protected final Queue<RecordReader<NullWritable, VertexWritable>> readers = new LinkedList<>();
@@ -72,4 +71,15 @@ public abstract class HadoopElementIterator<E extends Element> implements Iterat
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
+
+    @Override
+    public void close() {
+        try {
+            for (final RecordReader reader : this.readers) {
+                reader.close();
+            }
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c4bc256f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
index 1a73093..492a501 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/AbstractStorageCheck.java
@@ -83,6 +83,7 @@ public abstract class AbstractStorageCheck extends AbstractGremlinTest {
         assertTrue(storage.exists(Constants.getMemoryLocation(outputLocation, "clusterCount")));
         assertEquals(2, storage.ls(outputLocation).size());
         assertTrue(storage.rm(Constants.getGraphLocation(outputLocation)));
+        assertFalse(storage.rm(Constants.getGraphLocation(outputLocation)));
         assertEquals(1, storage.ls(outputLocation).size());
         assertTrue(storage.rm(Constants.getMemoryLocation(outputLocation, "clusterCount")));
         assertEquals(0, storage.ls(outputLocation).size());
@@ -142,4 +143,53 @@ public abstract class AbstractStorageCheck extends AbstractGremlinTest {
         assertEquals(4l, traversal.asAdmin().getSideEffects().<Long>get(Graph.Hidden.hide("reducing")).get().longValue());
 
     }
+
+    public void checkFileDirectoryDistinction(final Storage storage, final String directory1, final String directory2) throws Exception {
+        assertTrue(storage.exists(directory1));
+        assertTrue(storage.exists(directory2));
+        assertTrue(storage.exists(directory1 + "/f*"));
+        assertTrue(storage.exists(directory2 + "/f*"));
+        assertEquals(10, storage.ls(directory1).size());
+        assertEquals(10, storage.ls(directory1 + "/*").size());
+        assertEquals(5, storage.ls(directory2).size());
+        assertEquals(5, storage.ls(directory2 + "/*").size());
+        for (int i = 0; i < 10; i++) {
+            assertTrue(storage.exists(directory1 + "/file1-" + i + ".txt.bz"));
+            assertTrue(storage.exists(directory1 + "/file1-" + i + "*"));
+            assertTrue(storage.exists(directory1 + "/file1-" + i + ".txt*"));
+            assertTrue(storage.exists(directory1 + "/file1-" + i + ".*.bz"));
+            assertTrue(storage.exists(directory1 + "/file1-" + i + ".*.b*"));
+        }
+        assertFalse(storage.exists(directory1 + "/file1-10.txt.bz"));
+        for (int i = 0; i < 5; i++) {
+            assertTrue(storage.exists(directory2 + "/file2-" + i + ".txt.bz"));
+            assertTrue(storage.exists(directory2 + "/file2-" + i + "*"));
+            assertTrue(storage.exists(directory2 + "/file2-" + i + ".txt*"));
+            assertTrue(storage.exists(directory2 + "/file2-" + i + ".*.bz"));
+            assertTrue(storage.exists(directory2 + "/file2-" + i + ".*.b*"));
+        }
+        assertFalse(storage.exists(directory2 + "/file1-5.txt.bz"));
+        assertTrue(storage.rm(directory1 + "/file1-0.txt.bz"));
+        assertFalse(storage.rm(directory1 + "/file1-0.txt.bz"));
+        assertEquals(9, storage.ls(directory1).size());
+        assertEquals(9, storage.ls(directory1 + "/*").size());
+        assertEquals(9, storage.ls(directory1 + "/file*").size());
+        assertEquals(9, storage.ls(directory1 + "/file1*").size());
+        assertEquals(0, storage.ls(directory1 + "/file2*").size());
+        assertEquals(5, storage.ls(directory2).size());
+        assertEquals(5, storage.ls(directory2 + "/*").size());
+        assertEquals(5, storage.ls(directory2 + "/file*").size());
+        assertEquals(5, storage.ls(directory2 + "/file2*").size());
+        assertEquals(0, storage.ls(directory2 + "/file1*").size());
+        assertTrue(storage.rm(directory1 + "/file1-*"));
+        assertFalse(storage.rm(directory1 + "/file1-*"));
+        assertEquals(0, storage.ls(directory1).size());
+        assertEquals(0, storage.ls(directory1 + "/*").size());
+        assertEquals(5, storage.ls(directory2).size());
+        assertEquals(5, storage.ls(directory2 + "/*").size());
+        assertTrue(storage.rm(directory2 + "/f*"));
+        assertFalse(storage.rm(directory2 + "/file*"));
+        assertEquals(0, storage.ls(directory2).size());
+        assertEquals(0, storage.ls(directory2 + "*").size());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c4bc256f/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
index 846582e..72d5efe 100644
--- a/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
+++ b/hadoop-gremlin/src/test/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/FileSystemStorageCheck.java
@@ -78,6 +78,22 @@ public class FileSystemStorageCheck extends AbstractStorageCheck {
         super.checkResidualDataInStorage(storage, outputLocation);
     }
 
+    @Test
+    public void shouldSupportDirectoryFileDistinction() throws Exception {
+        final Storage storage = FileSystemStorage.open(ConfUtil.makeHadoopConfiguration(graph.configuration()));
+        final String directory1 = TestHelper.makeTestDataDirectory(FileSystemStorageCheck.class, "directory1");
+        final String directory2 = TestHelper.makeTestDataDirectory(FileSystemStorageCheck.class, "directory2");
+        for (int i = 0; i < 10; i++) {
+            new File(directory1 + "/" + "file1-" + i + ".txt.bz").createNewFile();
+        }
+        for (int i = 0; i < 5; i++) {
+            new File(directory2 + "/" + "file2-" + i + ".txt.bz").createNewFile();
+        }
+        super.checkFileDirectoryDistinction(storage, directory1, directory2);
+        deleteDirectory(directory1);
+        deleteDirectory(directory2);
+    }
+
     private static void deleteDirectory(final String location) throws IOException {
         // TestHelper creates the directory and we need it not to exist
         assertTrue(new File(location).isDirectory());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/c4bc256f/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
index c5746b6..06deb7c 100644
--- a/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
+++ b/spark-gremlin/src/test/java/org/apache/tinkerpop/gremlin/spark/structure/io/SparkContextStorageCheck.java
@@ -19,6 +19,8 @@
 
 package org.apache.tinkerpop.gremlin.spark.structure.io;
 
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.storage.StorageLevel;
 import org.apache.tinkerpop.gremlin.LoadGraphWith;
 import org.apache.tinkerpop.gremlin.hadoop.Constants;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.AbstractStorageCheck;
@@ -71,4 +73,16 @@ public class SparkContextStorageCheck extends AbstractStorageCheck {
         final String outputLocation = graph.configuration().getString(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION);
         super.checkResidualDataInStorage(storage, outputLocation);
     }
+
+    @Test
+    public void shouldSupportDirectoryFileDistinction() throws Exception {
+        final Storage storage = SparkContextStorage.open("local[4]");
+        for (int i = 0; i < 10; i++) {
+            JavaSparkContext.fromSparkContext(Spark.getContext()).emptyRDD().setName("directory1/file1-" + i + ".txt.bz").persist(StorageLevel.DISK_ONLY());
+        }
+        for (int i = 0; i < 5; i++) {
+            JavaSparkContext.fromSparkContext(Spark.getContext()).emptyRDD().setName("directory2/file2-" + i + ".txt.bz").persist(StorageLevel.DISK_ONLY());
+        }
+        super.checkFileDirectoryDistinction(storage, "directory1", "directory2");
+    }
 }
\ No newline at end of file