You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2012/08/29 01:45:11 UTC
git commit: CRUNCH-53: Support autoclosing iterators for
text/seq/avro reader factories. Contributed by Shawn Smith.
Updated Branches:
refs/heads/master ca2b9c06a -> 58383d3d0
CRUNCH-53: Support autoclosing iterators for text/seq/avro reader factories. Contributed by Shawn Smith.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/58383d3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/58383d3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/58383d3d
Branch: refs/heads/master
Commit: 58383d3d0e8346c17366beaea4c4543eec0b9c19
Parents: ca2b9c0
Author: Josh Wills <jw...@cloudera.com>
Authored: Tue Aug 28 16:44:42 2012 -0700
Committer: Josh Wills <jw...@cloudera.com>
Committed: Tue Aug 28 16:44:42 2012 -0700
----------------------------------------------------------------------
.../crunch/io/avro/AvroFileReaderFactory.java | 5 +-
.../apache/crunch/io/impl/AutoClosingIterator.java | 62 +++++++++++++++
.../apache/crunch/io/seq/SeqFileReaderFactory.java | 5 +-
.../crunch/io/seq/SeqFileTableReaderFactory.java | 5 +-
.../crunch/io/text/TextFileReaderFactory.java | 7 +-
5 files changed, 75 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
index 982f6db..d1940cc 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.MapFn;
import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
@@ -73,7 +74,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
try {
FsInput fsi = new FsInput(path, fs.getConf());
final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
- return new UnmodifiableIterator<T>() {
+ return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
@Override
public boolean hasNext() {
return reader.hasNext();
@@ -83,7 +84,7 @@ public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
public T next() {
return mapFn.map(reader.next());
}
- };
+ });
} catch (IOException e) {
LOG.info("Could not read avro file at path: " + path, e);
return Iterators.emptyIterator();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
new file mode 100644
index 0000000..d58f290
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import com.google.common.collect.UnmodifiableIterator;
+import com.google.common.io.Closeables;
+
+/**
+ * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false. As long a client loops through to
+ * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
+ */
+public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Iterator<T>, Closeable {
+ private final Iterator<T> iter;
+ private Closeable closeable;
+
+ public AutoClosingIterator(Closeable closeable, Iterator<T> iter) {
+ this.closeable = closeable;
+ this.iter = iter;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!iter.hasNext()) {
+ Closeables.closeQuietly(this);
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ @Override
+ public T next() {
+ return iter.next();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closeable != null) {
+ closeable.close();
+ closeable = null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
index 47163e1..050c0fc 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileReaderFactory.java
@@ -24,6 +24,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.MapFn;
import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -57,7 +58,7 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
mapFn.initialize();
try {
final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- return new UnmodifiableIterator<T>() {
+ return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
boolean nextChecked = false;
boolean hasNext = false;
@@ -84,7 +85,7 @@ public class SeqFileReaderFactory<T> implements FileReaderFactory<T> {
nextChecked = false;
return mapFn.map(value);
}
- };
+ });
} catch (IOException e) {
LOG.info("Could not read seqfile at path: " + path, e);
return Iterators.emptyIterator();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
index 038142a..7c34a75 100644
--- a/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/seq/SeqFileTableReaderFactory.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
@@ -64,7 +65,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
valueMapFn.initialize();
try {
final SequenceFile.Reader reader = new SequenceFile.Reader(fs, path, conf);
- return new UnmodifiableIterator<Pair<K, V>>() {
+ return new AutoClosingIterator<Pair<K, V>>(reader, new UnmodifiableIterator<Pair<K, V>>() {
boolean nextChecked = false;
boolean hasNext = false;
@@ -91,7 +92,7 @@ public class SeqFileTableReaderFactory<K, V> implements FileReaderFactory<Pair<K
nextChecked = false;
return Pair.of(keyMapFn.map(key), valueMapFn.map(value));
}
- };
+ });
} catch (IOException e) {
LOG.info("Could not read seqfile at path: " + path, e);
return Iterators.emptyIterator();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/58383d3d/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
index d22b233..5a512fc 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/TextFileReaderFactory.java
@@ -28,6 +28,7 @@ import org.apache.crunch.MapFn;
import org.apache.crunch.fn.CompositeMapFn;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -65,7 +66,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
mapFn.setConfigurationForTest(conf);
mapFn.initialize();
- FSDataInputStream is = null;
+ FSDataInputStream is;
try {
is = fs.open(path);
} catch (IOException e) {
@@ -75,7 +76,7 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
final BufferedReader reader = new BufferedReader(new InputStreamReader(is));
final MapFn<String, T> iterMapFn = mapFn;
- return new UnmodifiableIterator<T>() {
+ return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
private String nextLine;
@Override
@@ -92,6 +93,6 @@ public class TextFileReaderFactory<T> implements FileReaderFactory<T> {
public T next() {
return iterMapFn.map(nextLine);
}
- };
+ });
}
}