You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sr...@apache.org on 2011/08/31 23:42:35 UTC

svn commit: r1163837 - in /mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile: SequenceFileDirIterator.java SequenceFileDirValueIterator.java

Author: srowen
Date: Wed Aug 31 21:42:35 2011
New Revision: 1163837

URL: http://svn.apache.org/viewvc?rev=1163837&view=rev
Log:
Made SequenceFile directory Iterables Closeable

Modified:
    mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
    mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java?rev=1163837&r1=1163836&r2=1163837&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirIterator.java Wed Aug 31 21:42:35 2011
@@ -17,20 +17,25 @@
 
 package org.apache.mahout.common.iterator.sequencefile;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ForwardingIterator;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
 import org.apache.mahout.common.Pair;
 
 /**
@@ -39,9 +44,10 @@ import org.apache.mahout.common.Pair;
  * restricted with a {@link PathFilter}.
  */
 public final class SequenceFileDirIterator<K extends Writable,V extends Writable>
-    extends ForwardingIterator<Pair<K,V>> {
+    extends ForwardingIterator<Pair<K,V>> implements Closeable {
 
   private final Iterator<Pair<K,V>> delegate;
+  private final List<SequenceFileIterator<K,V>> iterators;
 
   public SequenceFileDirIterator(Path path,
                                  PathType pathType,
@@ -62,18 +68,27 @@ public final class SequenceFileDirIterat
       Arrays.sort(statuses, ordering);
     }
     Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+    iterators = Lists.newArrayList();
+
     Iterator<Iterator<Pair<K,V>>> fsIterators =
         Iterators.transform(fileStatusIterator,
                             new Function<FileStatus, Iterator<Pair<K, V>>>() {
                               @Override
                               public Iterator<Pair<K, V>> apply(FileStatus from) {
                                 try {
-                                  return new SequenceFileIterator<K,V>(from.getPath(), reuseKeyValueInstances, conf);
+                                  SequenceFileIterator<K,V> iterator =
+                                      new SequenceFileIterator<K,V>(from.getPath(), reuseKeyValueInstances, conf);
+                                  iterators.add(iterator);
+                                  return iterator;
                                 } catch (IOException ioe) {
                                   throw new IllegalStateException(from.getPath().toString(), ioe);
                                 }
                               }
                             });
+
+    Collections.reverse(iterators); // close later in reverse order
+
     delegate = Iterators.concat(fsIterators);
   }
 
@@ -82,4 +97,10 @@ public final class SequenceFileDirIterat
     return delegate;
   }
 
+  @Override
+  public void close() throws IOException {
+    IOUtils.close(iterators);
+    iterators.clear();
+  }
+
 }

Modified: mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java
URL: http://svn.apache.org/viewvc/mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java?rev=1163837&r1=1163836&r2=1163837&view=diff
==============================================================================
--- mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java (original)
+++ mahout/trunk/core/src/main/java/org/apache/mahout/common/iterator/sequencefile/SequenceFileDirValueIterator.java Wed Aug 31 21:42:35 2011
@@ -17,29 +17,36 @@
 
 package org.apache.mahout.common.iterator.sequencefile;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.List;
 
 import com.google.common.base.Function;
 import com.google.common.collect.ForwardingIterator;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.io.Writable;
+import org.apache.mahout.common.IOUtils;
 
 /**
  * Like {@link SequenceFileValueIterator}, but iterates not just over one sequence file, but many. The input path
  * may be specified as a directory of files to read, or as a glob pattern. The set of files may be optionally
  * restricted with a {@link PathFilter}.
  */
-public final class SequenceFileDirValueIterator<V extends Writable> extends ForwardingIterator<V> {
+public final class SequenceFileDirValueIterator<V extends Writable>
+    extends ForwardingIterator<V> implements Closeable {
 
   private final Iterator<V> delegate;
+  private final List<SequenceFileValueIterator<V>> iterators;
 
   public SequenceFileDirValueIterator(Path path,
                                       PathType pathType,
@@ -58,18 +65,27 @@ public final class SequenceFileDirValueI
       Arrays.sort(statuses, ordering);
     }
     Iterator<FileStatus> fileStatusIterator = Iterators.forArray(statuses);
+
+    iterators = Lists.newArrayList();
+
     Iterator<Iterator<V>> fsIterators =
         Iterators.transform(fileStatusIterator,
                             new Function<FileStatus, Iterator<V>>() {
                               @Override
                               public Iterator<V> apply(FileStatus from) {
                                 try {
-                                  return new SequenceFileValueIterator<V>(from.getPath(), reuseKeyValueInstances, conf);
+                                  SequenceFileValueIterator<V> iterator =
+                                    new SequenceFileValueIterator<V>(from.getPath(), reuseKeyValueInstances, conf);
+                                  iterators.add(iterator);
+                                  return iterator;
                                 } catch (IOException ioe) {
                                   throw new IllegalStateException(from.getPath().toString(), ioe);
                                 }
                               }
                             });
+
+    Collections.reverse(iterators); // close later in reverse order
+
     delegate = Iterators.concat(fsIterators);
   }
 
@@ -78,4 +94,10 @@ public final class SequenceFileDirValueI
     return delegate;
   }
 
+  @Override
+  public void close() throws IOException {
+    IOUtils.close(iterators);
+    iterators.clear();
+  }
+
 }