You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/29 01:45:11 UTC

git commit: CRUNCH-53: Support autoclosing iterators for text/seq/avro reader factories. Contributed by Shawn Smith.

Updated Branches:
  refs/heads/master ca2b9c06a -> 58383d3d0


CRUNCH-53: Support autoclosing iterators for text/seq/avro reader factories. Contributed by Shawn Smith.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/58383d3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/58383d3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/58383d3d

Branch: refs/heads/master
Commit: 58383d3d0e8346c17366beaea4c4543eec0b9c19
Parents: ca2b9c0
Author: Josh Wills <jw...@cloudera.com>
Authored: Tue Aug 28 16:44:42 2012 -0700
Committer: Josh Wills <jw...@cloudera.com>
Committed: Tue Aug 28 16:44:42 2012 -0700

----------------------------------------------------------------------
 .../crunch/io/avro/AvroFileReaderFactory.java      |    5 +-
 .../apache/crunch/io/impl/AutoClosingIterator.java |   62 +++++++++++++++
 .../apache/crunch/io/seq/SeqFileReaderFactory.java |    5 +-
 .../crunch/io/seq/SeqFileTableReaderFactory.java   |    5 +-
 .../crunch/io/text/TextFileReaderFactory.java      |    7 +-
 5 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index 982f6db..d1940cc 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.avro.AvroType;
 import org.apache.crunch.types.avro.Avros;
 import org.apache.hadoop.conf.Configuration;
@@ -73,7 +74,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
     try {
       FsInput fsi = new FsInput(path, fs.getConf());
       final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
-      return new UnmodifiableIterator<T>() {
+      return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
         @Override
         public boolean hasNext() {
           return reader.hasNext();
@@ -83,7 +84,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
         public T next() {
           return mapFn.map(reader.next());
         }
-      };
+      });
     } catch (IOException e) {
       LOG.info("Could not read avro file at path: " + path, e);
       return Iterators.emptyIterator();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
new file mode 100644
index 0000000..d58f290
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import com.google.common.collect.UnmodifiableIterator;
+import com.google.common.io.Closeables;
+
+/**
+ * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false.  As long a client loops through to
+ * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
+ */
+public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Iterator<T>, Closeable {
+  private final Iterator<T> iter;
+  private Closeable closeable;
+
+  public AutoClosingIterator(Closeable closeable, Iterator<T> iter) {
+    this.closeable = closeable;
+    this.iter = iter;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!iter.hasNext()) {
+      Closeables.closeQuietly(this);
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public T next() {
+    return iter.next();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closeable != null) {
+      closeable.close();
+      closeable = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index 47163e1..050c0fc 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -57,7 +58,7 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
     mapFn.initialize();
     try {
       final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-      return new UnmodifiableIterator<T>() {
+      return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
         boolean nextChecked = false;
         boolean hasNext = false;
 
@@ -84,7 +85,7 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
           nextChecked = false;
           return mapFn.map(value);
         }
-      };
+      });
     } catch (IOException e) {
       LOG.info("Could not read seqfile at path: " + path, e);
       return Iterators.emptyIterator();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
index 038142a..7c34a75 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.Pair;
 import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
@@ -64,7 +65,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
     valueMapFn.initialize();
     try {
       final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
-      return new UnmodifiableIterator<Pair<K, V>>() {
+      return new AutoClosingIterator<Pair<K, V>>(reader, new UnmodifiableIterator<Pair<K, V>>() {
         boolean nextChecked = false;
         boolean hasNext = false;
 
@@ -91,7 +92,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
           nextChecked = false;
           return Pair.of(keyMapFn.map(key), valueMapFn.map(value));
         }
-      };
+      });
     } catch (IOException e) {
       LOG.info("Could not read seqfile at path: " + path, e);
       return Iterators.emptyIterator();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
index d22b233..5a512fc 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -28,6 +28,7 @@ import org.apache.crunch.MapFn;
 import org.apache.crunch.fn.CompositeMapFn;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -65,7 +66,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
     mapFn.setConfigurationForTest(conf);
     mapFn.initialize();
 
-    FSDataInputStream is = null;
+    FSDataInputStream is;
     try {
       is = fs.open(path);
     } catch (IOException e) {
@@ -75,7 +76,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
 
     final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
     final MapFn<String, T> iterMapFn = mapFn;
-    return new UnmodifiableIterator<T>() {
+    return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
       private String nextLine;
 
       @Override
@@ -92,6 +93,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
       public T next() {
         return iterMapFn.map(nextLine);
       }
-    };
+    });
   }
 }