You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2011/08/31 23:42:35 UTC
svn commit: r1163837 - in
/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile:
SequenceFileDirIterator.java SequenceFileDirValueIterator.java
Author: srowen
Date: Wed Aug 31 21:42:35 2011
New Revision: 1163837
URL: http://svn.apache.org/viewvc?rev=1163837&view=rev
Log:
Made SequenceFile directory Iterables Closeable
Modified:
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java?rev=1163837&r1=1163836&r2=1163837&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java Wed Aug 31 21:42:35 2011
@@ -17,20 +17,25 @@
package org.apache.mahout.common.iterator.sequencefile;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import com.google.common.base.Function;
import com.google.common.collect.ForwardingIterator;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
import org.apache.mahout.common.Pair;
/**
@@ -39,9 +44,10 @@ import org.apache.mahout.common.Pair;
* restricted with a {@link PathFilter}.
*/
public final class SequenceFileDirIterator<K extends Writable,V extends Writable>
- extends ForwardingIterator<Pair<K,V>> {
+ extends ForwardingIterator<Pair<K,V>> implements Closeable {
private final Iterator<Pair<K,V>> delegate;
+ private final List<SequenceFileIterator<K,V>> iterators;
public SequenceFileDirIterator(Path path,
PathType pathType,
@@ -62,18 +68,27 @@ public final class SequenceFileDirIterat
Arrays.sort(statuses, ordering);
}
Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+ iterators = Lists.newArrayList();
+
Iterator<Iterator<Pair<K,V>>> fsIterators =
Iterators.transform(fileStatusIterator,
new Function<FileStatus, Iterator<Pair<K, V>>>() {
@Override
public Iterator<Pair<K, V>> apply(FileStatus from) {
try {
- return new SequenceFileIterator<K,V>(from.getPath(), reuseKeyValueInstances, conf);
+ SequenceFileIterator<K,V> iterator =
+ new SequenceFileIterator<K,V>(from.getPath(), reuseKeyValueInstances, conf);
+ iterators.add(iterator);
+ return iterator;
} catch (IOException ioe) {
throw new IllegalStateException(from.getPath().toString(), ioe);
}
}
});
+
+ Collections.reverse(iterators); // close later in reverse order
+
delegate = Iterators.concat(fsIterators);
}
@@ -82,4 +97,10 @@ public final class SequenceFileDirIterat
return delegate;
}
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(iterators);
+ iterators.clear();
+ }
+
}
Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java?rev=1163837&r1=1163836&r2=1163837&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java Wed Aug 31 21:42:35 2011
@@ -17,29 +17,36 @@
package org.apache.mahout.common.iterator.sequencefile;
+import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.List;
import com.google.common.base.Function;
import com.google.common.collect.ForwardingIterator;
import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
/**
* Like {@link SequenceFileValueIterator}, but iterates not just over one sequence file, but many. The input path
* may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally
* restricted with a {@link PathFilter}.
*/
-public final class SequenceFileDirValueIterator<V extends Writable> extends ForwardingIterator<V> {
+public final class SequenceFileDirValueIterator<V extends Writable>
+ extends ForwardingIterator<V> implements Closeable {
private final Iterator<V> delegate;
+ private final List<SequenceFileValueIterator<V>> iterators;
public SequenceFileDirValueIterator(Path path,
PathType pathType,
@@ -58,18 +65,27 @@ public final class SequenceFileDirValueI
Arrays.sort(statuses, ordering);
}
Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+ iterators = Lists.newArrayList();
+
Iterator<Iterator<V>> fsIterators =
Iterators.transform(fileStatusIterator,
new Function<FileStatus, Iterator<V>>() {
@Override
public Iterator<V> apply(FileStatus from) {
try {
- return new SequenceFileValueIterator<V>(from.getPath(), reuseKeyValueInstances, conf);
+ SequenceFileValueIterator<V> iterator =
+ new SequenceFileValueIterator<V>(from.getPath(), reuseKeyValueInstances, conf);
+ iterators.add(iterator);
+ return iterator;
} catch (IOException ioe) {
throw new IllegalStateException(from.getPath().toString(), ioe);
}
}
});
+
+ Collections.reverse(iterators); // close later in reverse order
+
delegate = Iterators.concat(fsIterators);
}
@@ -78,4 +94,10 @@ public final class SequenceFileDirValueI
return delegate;
}
+ @Override
+ public void close() throws IOException {
+ IOUtils.close(iterators);
+ iterators.clear();
+ }
+
}