You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/04/15 19:48:11 UTC

[49/50] incubator-tinkerpop git commit: a little clean up on the Iterators for HadoopLoader.

a little clean up on the Iterators for HadoopLoader.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/d09ffa89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/d09ffa89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/d09ffa89

Branch: refs/heads/variables
Commit: d09ffa895a5f22a26d03a540003ab05e5f82d684
Parents: 9f7ce82
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Apr 15 10:19:47 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Apr 15 10:19:47 2015 -0600

----------------------------------------------------------------------
 .../gremlin/util/iterator/IteratorUtils.java    |  3 +-
 .../util/iterator/IteratorUtilsTest.java        | 24 +++++
 .../hadoop/groovy/plugin/HadoopLoader.groovy    | 30 ++++---
 .../structure/io/VertexWritableIterator.java    | 95 ++++++++++++++++++++
 4 files changed, 138 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d09ffa89/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
index c0facae..769e76a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtils.java
@@ -86,8 +86,7 @@ public final class IteratorUtils {
 
             @Override
             public S next() {
-                this.count++;
-                if (this.count >= limit)
+                if (this.count++ >= limit)
                     throw FastNoSuchElementException.instance();
                 return iterator.next();
             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d09ffa89/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtilsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtilsTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtilsTest.java
index e060fc9..6f3640f 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtilsTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/util/iterator/IteratorUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.util.iterator;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -448,6 +449,29 @@ public class IteratorUtilsTest {
         assertEquals(new Integer(16), IteratorUtils.reduce(iterable, 10, (accumulator, val) -> accumulator + Integer.parseInt(val)));
     }
 
+    @Test
+    public void shouldLimitIterator() {
+        List<String> list = Arrays.asList("a","b","c","d","e");
+        Iterator<String> itty = IteratorUtils.limit(list.iterator(),3);
+        assertTrue(itty.hasNext());
+        assertEquals("a", itty.next());
+        assertTrue(itty.hasNext());
+        assertEquals("b", itty.next());
+        assertTrue(itty.hasNext());
+        assertEquals("c", itty.next());
+        assertFalse(itty.hasNext());
+
+        list = Arrays.asList("a","b","c");
+        itty = IteratorUtils.limit(list.iterator(),4);
+        assertTrue(itty.hasNext());
+        assertEquals("a", itty.next());
+        assertTrue(itty.hasNext());
+        assertEquals("b", itty.next());
+        assertTrue(itty.hasNext());
+        assertEquals("c", itty.next());
+        assertFalse(itty.hasNext());
+    }
+
     public <S> void assertIterator(final Iterator<S> itty, final int size) {
         for (int ix = 0; ix < size; ix++) {
             assertEquals("test" + (ix + 1), itty.next());

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d09ffa89/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy b/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
index ecbe9d3..962fc0d 100644
--- a/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
+++ b/hadoop-gremlin/src/main/groovy/org/apache/tinkerpop/gremlin/hadoop/groovy/plugin/HadoopLoader.groovy
@@ -27,6 +27,8 @@ import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter
 import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.TextIterator
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritableIterator
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritableIterator
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils
 
 /**
@@ -100,28 +102,21 @@ class HadoopLoader {
         }
 
         FileSystem.metaClass.head = { final String path, final long totalLines ->
-            return ((FileSystem) delegate).head(path, Long.MAX_VALUE, Text.class);
+            return headMaker((FileSystem) delegate, path, totalLines, Text.class);
         }
 
         FileSystem.metaClass.head = { final String path ->
-            return ((FileSystem) delegate).head(path, Long.MAX_VALUE, Text.class);
+            return headMaker((FileSystem) delegate, path, Long.MAX_VALUE, Text.class);
         }
 
         FileSystem.metaClass.head = {
             final String path, final Class<org.apache.hadoop.io.Writable> writableClass ->
-                return ((FileSystem) delegate).head(path, Long.MAX_VALUE, writableClass);
+                return headMaker((FileSystem) delegate, path, Long.MAX_VALUE, writableClass);
         }
 
         FileSystem.metaClass.head = {
-            final String path, final long totalKeyValues, final Class<org.apache.hadoop.io.Writable> writableClass ->
-                // if(writableClass.equals(org.apache.giraph.graph.Vertex.class)) {
-                /// return StreamFactory.stream(new GiraphVertexIterator(((FileSystem) delegate).getConf(), new Path(path))).limit(totalKeyValues).iterator();
-                // } else
-                if (writableClass.equals(ObjectWritable.class)) {
-                    return IteratorUtils.limit(new ObjectWritableIterator(((FileSystem) delegate).getConf(), new Path(path)), totalKeyValues);
-                } else {
-                    return IteratorUtils.limit(new TextIterator(((FileSystem) delegate).getConf(), new Path(path)), totalKeyValues);
-                }
+            final String path, final long totalLines, final Class<org.apache.hadoop.io.Writable> writableClass ->
+                return headMaker((FileSystem) delegate, path, totalLines, writableClass);
         }
 
         /*FileSystem.metaClass.unzip = { final String from, final String to, final boolean deleteZip ->
@@ -129,4 +124,15 @@ class HadoopLoader {
         }*/
 
     }
+
+    private static Iterator headMaker(
+            final FileSystem fs,
+            final String path, final long totalLines, final Class<org.apache.hadoop.io.Writable> writableClass) {
+        if (writableClass.equals(ObjectWritable.class))
+            return IteratorUtils.limit(new ObjectWritableIterator(fs.getConf(), new Path(path)), totalLines.intValue());
+        else if (writableClass.equals(VertexWritable.class))
+            return IteratorUtils.limit(new VertexWritableIterator(fs.getConf(), new Path(path)), totalLines.intValue());
+        else
+            return IteratorUtils.limit(new TextIterator(fs.getConf(), new Path(path)), totalLines.intValue());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/d09ffa89/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
new file mode 100644
index 0000000..6933ae4
--- /dev/null
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/VertexWritableIterator.java
@@ -0,0 +1,95 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  * http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing,
+ *  * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  * KIND, either express or implied.  See the License for the
+ *  * specific language governing permissions and limitations
+ *  * under the License.
+ *
+ */
+
+package org.apache.tinkerpop.gremlin.hadoop.structure.io;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.tinkerpop.gremlin.hadoop.structure.hdfs.HiddenFileFilter;
+import org.apache.tinkerpop.gremlin.process.traversal.FastNoSuchElementException;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class VertexWritableIterator implements Iterator<Vertex> {
+
+    private final VertexWritable value = new VertexWritable();
+    private boolean available = false;
+    private final Queue<SequenceFile.Reader> readers = new LinkedList<>();
+
+    public VertexWritableIterator(final Configuration configuration, final Path path) throws IOException {
+        final FileSystem fs = FileSystem.get(configuration);
+        for (final FileStatus status : fs.listStatus(path, HiddenFileFilter.instance())) {
+            this.readers.add(new SequenceFile.Reader(fs, status.getPath(), configuration));
+        }
+    }
+
+    @Override
+    public boolean hasNext() {
+        try {
+            if (this.available) {
+                return true;
+            } else {
+                while (true) {
+                    if (this.readers.isEmpty())
+                        return false;
+                    if (this.readers.peek().next(this.value)) {
+                        this.available = true;
+                        return true;
+                    } else
+                        this.readers.remove();
+                }
+            }
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public Vertex next() {
+        try {
+            if (this.available) {
+                this.available = false;
+                return this.value.get();
+            } else {
+                while (true) {
+                    if (this.readers.isEmpty())
+                        throw FastNoSuchElementException.instance();
+                    if (this.readers.peek().next(this.value)) {
+                        return this.value.get();
+                    } else
+                        this.readers.remove();
+                }
+            }
+        } catch (final IOException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}
\ No newline at end of file