You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/16 06:24:34 UTC

svn commit: r1136287 - in /cassandra/trunk: ./ src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/columniterator/ src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/filter/ src/java/org/apache/cassandra/io/sst...

Author: jbellis
Date: Thu Jun 16 04:24:33 2011
New Revision: 1136287

URL: http://svn.apache.org/viewvc?rev=1136287&view=rev
Log:
replace CollatingIterator, ReducingIterator with MergeIterator
patch by stuhood; reviewed by jbellis for CASSANDRA-2062

Added:
    cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
Removed:
    cassandra/trunk/src/java/org/apache/cassandra/db/RowIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java
Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
    cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
    cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 16 04:24:33 2011
@@ -3,6 +3,8 @@
  * add commitlog_total_space_in_mb to prevent fragmented logs (CASSANDRA-2427)
  * removed commitlog_rotation_threshold_in_mb configuration (CASSANDRA-2771)
  * make AbstractBounds.normalize de-overlapp overlapping ranges (CASSANDRA-2641)
+ * replace CollatingIterator, ReducingIterator with MergeIterator 
+   (CASSANDRA-2062)
 
 
 0.8.1

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jun 16 04:24:33 2011
@@ -32,7 +32,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import com.google.common.collect.Iterables;
-import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.lang.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1188,7 +1187,7 @@ public class ColumnFamilyStore implement
 
         IColumnIterator ci = filter.getMemtableColumnIterator(cached, null, getComparator());
         ColumnFamily cf = ci.getColumnFamily().cloneMeShallow();
-        filter.collectCollatedColumns(cf, ci, gcBefore);
+        filter.collateColumns(cf, Collections.singletonList(ci), getComparator(), gcBefore);
         // TODO this is necessary because when we collate supercolumns together, we don't check
         // their subcolumns for relevance, so we need to do a second prune post facto here.
         return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore);
@@ -1244,10 +1243,7 @@ public class ColumnFamilyStore implement
             if (iterators.size() == 0)
                 return null;
 
-            Comparator<IColumn> comparator = filter.filter.getColumnComparator(getComparator());
-            Iterator collated = IteratorUtils.collatedIterator(comparator, iterators);
-
-            filter.collectCollatedColumns(returnCF, collated, gcBefore);
+            filter.collateColumns(returnCF, iterators, getComparator(), gcBefore);
 
             // Caller is responsible for final removeDeletedCF.  This is important for cacheRow to work correctly:
             return returnCF;
@@ -1298,7 +1294,7 @@ public class ColumnFamilyStore implement
         // It is fine to aliases the View.sstables since it's an unmodifiable collection
         Collection<SSTableReader> sstables = currentView.sstables;
 
-        RowIterator iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
+        CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
         List<Row> rows = new ArrayList<Row>();
 
         try
@@ -1486,7 +1482,7 @@ public class ColumnFamilyStore implement
                         ColumnFamily expandedData = data;
                         data = expandedData.cloneMeShallow();
                         IColumnIterator iter = dataFilter.getMemtableColumnIterator(expandedData, dk, getComparator());
-                        new QueryFilter(dk, path, dataFilter).collectCollatedColumns(data, iter, gcBefore());
+                        new QueryFilter(dk, path, dataFilter).collateColumns(data, Collections.singletonList(iter), getComparator(), gcBefore());
                     }
 
                     rows.add(new Row(dk, data));

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Thu Jun 16 04:24:33 2011
@@ -25,15 +25,16 @@ import java.util.Map.Entry;
 
 import com.google.common.base.Function;
 import com.google.common.base.Predicate;
+import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
-import org.apache.commons.collections.IteratorUtils;
 
 import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
 
 public class RowIteratorFactory
 {
@@ -60,7 +61,7 @@ public class RowIteratorFactory
      * @param comparator
      * @return A row iterator following all the given restrictions
      */
-    public static RowIterator getIterator(final Collection<Memtable> memtables,
+    public static CloseableIterator<Row> getIterator(final Collection<Memtable> memtables,
                                           final Collection<SSTableReader> sstables,
                                           final DecoratedKey startWith,
                                           final DecoratedKey stopAt,
@@ -70,7 +71,7 @@ public class RowIteratorFactory
     )
     {
         // fetch data from current memtable, historical memtables, and SSTables in the correct order.
-        final List<Iterator<IColumnIterator>> iterators = new ArrayList<Iterator<IColumnIterator>>();
+        final List<CloseableIterator<IColumnIterator>> iterators = new ArrayList<CloseableIterator<IColumnIterator>>();
         // we iterate through memtables with a priority queue to avoid more sorting than necessary.
         // this predicate throws out the rows before the start of our range.
         Predicate<IColumnIterator> p = new Predicate<IColumnIterator>()
@@ -85,8 +86,7 @@ public class RowIteratorFactory
         // memtables
         for (Memtable memtable : memtables)
         {
-            iterators.add(Iterators.filter(Iterators.transform(memtable.getEntryIterator(startWith),
-                                                               new ConvertToColumnIterator(filter, comparator)), p));
+            iterators.add(new ConvertToColumnIterator(filter, comparator, p, memtable.getEntryIterator(startWith)));
         }
 
         // sstables
@@ -98,10 +98,9 @@ public class RowIteratorFactory
             iterators.add(scanner);
         }
 
-        Iterator<IColumnIterator> collated = IteratorUtils.collatedIterator(COMPARE_BY_KEY, iterators);
-
+        final Memtable firstMemtable = memtables.iterator().next();
         // reduce rows from all sources into a single row
-        ReducingIterator<IColumnIterator, Row> reduced = new ReducingIterator<IColumnIterator, Row>(collated)
+        return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<IColumnIterator, Row>()
         {
             private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
             private final List<IColumnIterator> colIters = new ArrayList<IColumnIterator>();
@@ -121,57 +120,61 @@ public class RowIteratorFactory
                 this.returnCF.delete(current.getColumnFamily());
             }
 
-            @Override
-            protected boolean isEqual(IColumnIterator o1, IColumnIterator o2)
-            {
-                return COMPARE_BY_KEY.compare(o1, o2) == 0;
-            }
-
             protected Row getReduced()
             {
-                Comparator<IColumn> colComparator = filter.filter.getColumnComparator(comparator);
-                Iterator<IColumn> colCollated = IteratorUtils.collatedIterator(colComparator, colIters);
 
                 // First check if this row is in the rowCache. If it is we can skip the rest
                 ColumnFamily cached = cfs.getRawCachedRow(key);
-                if (cached != null)
+                if (cached == null)
+                    // not cached: collate
+                    filter.collateColumns(returnCF, colIters, comparator, gcBefore);
+                else
                 {
                     QueryFilter keyFilter = new QueryFilter(key, filter.path, filter.filter);
                     returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
                 }
-                else if (colCollated.hasNext())
-                {
-                    filter.collectCollatedColumns(returnCF, colCollated, gcBefore);
-                }
 
                 Row rv = new Row(key, returnCF);
                 colIters.clear();
                 key = null;
                 return rv;
             }
-        };
-
-        return new RowIterator(reduced, iterators);
+        });
     }
 
     /**
      * Get a ColumnIterator for a specific key in the memtable.
      */
-    private static class ConvertToColumnIterator implements Function<Map.Entry<DecoratedKey, ColumnFamily>, IColumnIterator>
+    private static class ConvertToColumnIterator extends AbstractIterator<IColumnIterator> implements CloseableIterator<IColumnIterator>
     {
-        private QueryFilter filter;
-        private AbstractType comparator;
+        private final QueryFilter filter;
+        private final AbstractType comparator;
+        private final Predicate<IColumnIterator> pred;
+        private final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter;
 
-        public ConvertToColumnIterator(QueryFilter filter, AbstractType comparator)
+        public ConvertToColumnIterator(QueryFilter filter, AbstractType comparator, Predicate<IColumnIterator> pred, Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter)
         {
             this.filter = filter;
             this.comparator = comparator;
+            this.pred = pred;
+            this.iter = iter;
         }
 
-        public IColumnIterator apply(final Entry<DecoratedKey, ColumnFamily> entry)
+        public IColumnIterator computeNext()
         {
-            return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey(), comparator);
+            while (iter.hasNext())
+            {
+                Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next();
+                IColumnIterator ici = filter.getMemtableColumnIterator(entry.getValue(), entry.getKey(), comparator);
+                if (pred.apply(ici))
+                    return ici;
+            }
+            return endOfData();
         }
-    }
 
+        public void close()
+        {
+            // pass
+        }
+    }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java Thu Jun 16 04:24:33 2011
@@ -27,8 +27,9 @@ import java.util.Iterator;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.utils.CloseableIterator;
 
-public interface IColumnIterator extends Iterator<IColumn>
+public interface IColumnIterator extends CloseableIterator<IColumn>
 {
     /**
      * @return An empty CF holding metadata for the row being iterated.

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java Thu Jun 16 04:24:33 2011
@@ -21,33 +21,35 @@ package org.apache.cassandra.db.compacti
  */
 
 
-import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.cassandra.service.StorageService;
-import org.apache.commons.collections.iterators.CollatingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.AbstractIterator;
+
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
 
-public class CompactionIterator extends ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow>
-implements Closeable, CompactionInfo.Holder
+public class CompactionIterator extends AbstractIterator<AbstractCompactedRow>
+implements CloseableIterator<AbstractCompactedRow>, CompactionInfo.Holder
 {
     private static Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
 
     public static final int FILE_BUFFER_SIZE = 1024 * 1024;
 
-    protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+    private final MergeIterator<IColumnIterator, AbstractCompactedRow> source;
     protected final CompactionType type;
     protected final CompactionController controller;
 
@@ -65,33 +67,26 @@ implements Closeable, CompactionInfo.Hol
 
     public CompactionIterator(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
     {
-        this(type, getCollatingIterator(sstables), controller);
+        this(type, getScanners(sstables), controller);
     }
 
-    @SuppressWarnings("unchecked")
-    protected CompactionIterator(CompactionType type, Iterator iter, CompactionController controller)
+    protected CompactionIterator(CompactionType type, List<SSTableScanner> scanners, CompactionController controller)
     {
-        super(iter);
         this.type = type;
         this.controller = controller;
+        this.source = MergeIterator.get(scanners, ICOMP, new Reducer());
         row = 0;
         totalBytes = bytesRead = 0;
-        for (SSTableScanner scanner : getScanners())
-        {
+        for (SSTableScanner scanner : scanners)
             totalBytes += scanner.getFileLength();
-        }
     }
 
-    @SuppressWarnings("unchecked")
-    protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader> sstables) throws IOException
+    protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
     {
-        // TODO CollatingIterator iter = FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
-        CollatingIterator iter = FBUtilities.getCollatingIterator();
+        ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
         for (SSTableReader sstable : sstables)
-        {
-            iter.addIterator(sstable.getDirectScanner(FILE_BUFFER_SIZE));
-        }
-        return iter;
+            scanners.add(sstable.getDirectScanner(FILE_BUFFER_SIZE));
+        return scanners;
     }
 
     public CompactionInfo getCompactionInfo()
@@ -103,50 +98,12 @@ implements Closeable, CompactionInfo.Hol
                                   totalBytes);
     }
 
-    @Override
-    protected boolean isEqual(SSTableIdentityIterator o1, SSTableIdentityIterator o2)
-    {
-        return o1.getKey().equals(o2.getKey());
-    }
-
-    public void reduce(SSTableIdentityIterator current)
-    {
-        rows.add(current);
-    }
 
-    protected AbstractCompactedRow getReduced()
+    public AbstractCompactedRow computeNext()
     {
-        assert rows.size() > 0;
-
-        try
-        {
-            AbstractCompactedRow compactedRow = controller.getCompactedRow(rows);
-            if (compactedRow.isEmpty())
-            {
-                controller.invalidateCachedRow(compactedRow.key);
-                return null;
-            }
-
-            // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
-            // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
-            // memory on long running instances
-            controller.removeDeletedInCache(compactedRow.key);
-
-            return compactedRow;
-        }
-        finally
-        {
-            rows.clear();
-            if ((row++ % 1000) == 0)
-            {
-                bytesRead = 0;
-                for (SSTableScanner scanner : getScanners())
-                {
-                    bytesRead += scanner.getFilePointer();
-                }
-                throttle();
-            }
-        }
+        if (!source.hasNext())
+            return endOfData();
+        return source.next();
     }
 
     private void throttle()
@@ -187,16 +144,69 @@ implements Closeable, CompactionInfo.Hol
 
     public void close() throws IOException
     {
-        FileUtils.close(getScanners());
+        source.close();
     }
 
     protected Iterable<SSTableScanner> getScanners()
     {
-        return ((CollatingIterator)source).getIterators();
+        return (Iterable<SSTableScanner>)(source.iterators());
     }
 
     public String toString()
     {
         return this.getCompactionInfo().toString();
     }
+    
+    protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
+    {
+        protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+
+        public void reduce(IColumnIterator current)
+        {
+            rows.add((SSTableIdentityIterator)current);
+        }
+
+        protected AbstractCompactedRow getReduced()
+        {
+            assert rows.size() > 0;
+
+            try
+            {
+                AbstractCompactedRow compactedRow = controller.getCompactedRow(rows);
+                if (compactedRow.isEmpty())
+                {
+                    controller.invalidateCachedRow(compactedRow.key);
+                    return null;
+                }
+
+                // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
+                // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
+                // memory on long running instances
+                controller.removeDeletedInCache(compactedRow.key);
+
+                return compactedRow;
+            }
+            finally
+            {
+                rows.clear();
+                if ((row++ % 1000) == 0)
+                {
+                    bytesRead = 0;
+                    for (SSTableScanner scanner : getScanners())
+                    {
+                        bytesRead += scanner.getFilePointer();
+                    }
+                    throttle();
+                }
+            }
+        }
+    }
+
+    public final static Comparator<IColumnIterator> ICOMP = new Comparator<IColumnIterator>()
+    {
+        public int compare(IColumnIterator i1, IColumnIterator i2)
+        {
+            return i1.getKey().compareTo(i2.getKey());
+        }
+    };
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jun 16 04:24:33 2011
@@ -31,7 +31,6 @@ import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import org.apache.commons.collections.PredicateUtils;
-import org.apache.commons.collections.iterators.CollatingIterator;
 import org.apache.commons.collections.iterators.FilterIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -934,18 +933,16 @@ public class CompactionManager implement
         public ValidationCompactionIterator(ColumnFamilyStore cfs, Range range) throws IOException
         {
             super(CompactionType.VALIDATION,
-                  getCollatingIterator(cfs.getSSTables(), range),
+                  getScanners(cfs.getSSTables(), range),
                   new CompactionController(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true));
         }
 
-        protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader> sstables, Range range) throws IOException
+        protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables, Range range) throws IOException
         {
-            CollatingIterator iter = FBUtilities.getCollatingIterator();
+            ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
             for (SSTableReader sstable : sstables)
-            {
-                iter.addIterator(sstable.getDirectScanner(FILE_BUFFER_SIZE, range));
-            }
-            return iter;
+                scanners.add(sstable.getDirectScanner(FILE_BUFFER_SIZE, range));
+            return scanners;
         }
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Thu Jun 16 04:24:33 2011
@@ -29,7 +29,6 @@ import java.util.*;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterators;
-import org.apache.commons.collections.iterators.CollatingIterator;
 
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -40,7 +39,7 @@ import org.apache.cassandra.db.marshal.A
 import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.util.IIterableColumns;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.MergeIterator;
 
 /**
  * LazilyCompactedRow only computes the row bloom filter and column index in memory
@@ -60,7 +59,7 @@ public class LazilyCompactedRow extends 
     private final boolean shouldPurge;
     private final DataOutputBuffer headerBuffer;
     private ColumnFamily emptyColumnFamily;
-    private LazyColumnIterator iter;
+    private Reducer reducer;
     private int columnCount;
     private long columnSerializedSize;
 
@@ -84,10 +83,10 @@ public class LazilyCompactedRow extends 
         // initialize row header so isEmpty can be called
         headerBuffer = new DataOutputBuffer();
         ColumnIndexer.serialize(this, headerBuffer);
-        // reach into iterator used by ColumnIndexer to get column count and size
-        columnCount = iter.size;
-        columnSerializedSize = iter.serializedSize;
-        iter = null;
+        // reach into the reducer used during iteration to get column count and size
+        columnCount = reducer.size;
+        columnSerializedSize = reducer.serializedSize;
+        reducer = null;
     }
 
     public void write(DataOutput out) throws IOException
@@ -156,10 +155,9 @@ public class LazilyCompactedRow extends 
     public Iterator<IColumn> iterator()
     {
         for (SSTableIdentityIterator row : rows)
-        {
             row.reset();
-        }
-        iter = new LazyColumnIterator(new CollatingIterator(getComparator().columnComparator, rows));
+        reducer = new Reducer();
+        Iterator<IColumn> iter = MergeIterator.get(rows, getComparator().columnComparator, reducer);
         return Iterators.filter(iter, Predicates.notNull());
     }
 
@@ -168,23 +166,12 @@ public class LazilyCompactedRow extends 
         return columnCount;
     }
 
-    private class LazyColumnIterator extends ReducingIterator<IColumn, IColumn>
+    private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
     {
         ColumnFamily container = emptyColumnFamily.cloneMeShallow();
         long serializedSize = 4; // int for column count
         int size = 0;
 
-        public LazyColumnIterator(Iterator<IColumn> source)
-        {
-            super(source);
-        }
-
-        @Override
-        protected boolean isEqual(IColumn o1, IColumn o2)
-        {
-            return o1.name().equals(o2.name());
-        }
-
         public void reduce(IColumn current)
         {
             container.addColumn(current);

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Jun 16 04:24:33 2011
@@ -22,10 +22,7 @@ package org.apache.cassandra.db.filter;
 
 
 import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,7 +35,8 @@ import org.apache.cassandra.io.sstable.S
 import org.apache.cassandra.io.util.FileDataInput;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
 
 public class QueryFilter
 {
@@ -88,11 +86,14 @@ public class QueryFilter
         return superFilter.getSSTableColumnIterator(sstable, file, key);
     }
 
-    public void collectCollatedColumns(final ColumnFamily returnCF, Iterator<IColumn> collatedColumns, final int gcBefore)
+    // TODO move gcBefore into a field
+    public void collateColumns(final ColumnFamily returnCF, List<? extends CloseableIterator<IColumn>> toCollate, AbstractType comparator, final int gcBefore)
     {
+        IFilter topLevelFilter = (superFilter == null ? filter : superFilter);
+        Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(comparator);
         // define a 'reduced' iterator that merges columns w/ the same name, which
         // greatly simplifies computing liveColumns in the presence of tombstones.
-        ReducingIterator<IColumn, IColumn> reduced = new ReducingIterator<IColumn, IColumn>(collatedColumns)
+        Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, new MergeIterator.Reducer<IColumn, IColumn>()
         {
             ColumnFamily curCF = returnCF.cloneMeShallow();
 
@@ -137,9 +138,9 @@ public class QueryFilter
 
                 return c;
             }
-        };
+        });
 
-        (superFilter == null ? filter : superFilter).collectReducedColumns(returnCF, reduced, gcBefore);
+        topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
     }
 
     public String getColumnFamilyName()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java Thu Jun 16 04:24:33 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.io.sstable;
  */
 
 
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
@@ -33,8 +32,9 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
 
-public class KeyIterator extends AbstractIterator<DecoratedKey> implements Iterator<DecoratedKey>, Closeable
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements CloseableIterator<DecoratedKey>
 {
     private final BufferedRandomAccessFile in;
     private final Descriptor desc;

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Thu Jun 16 04:24:33 2011
@@ -21,31 +21,26 @@ package org.apache.cassandra.io.sstable;
  */
 
 
-import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 
-import org.apache.commons.collections.iterators.CollatingIterator;
-
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
 
-public class ReducingKeyIterator implements Iterator<DecoratedKey>, Closeable
+public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
 {
-    private final CollatingIterator ci;
-    private final ReducingIterator<DecoratedKey, DecoratedKey> iter;
+    private final MergeIterator<DecoratedKey,DecoratedKey> mi;
 
     public ReducingKeyIterator(Collection<SSTableReader> sstables)
     {
-        ci = FBUtilities.getCollatingIterator();
+        ArrayList<KeyIterator> iters = new ArrayList<KeyIterator>();
         for (SSTableReader sstable : sstables)
-        {
-            ci.addIterator(new KeyIterator(sstable.descriptor));
-        }
-
-        iter = new ReducingIterator<DecoratedKey, DecoratedKey>(ci)
+            iters.add(new KeyIterator(sstable.descriptor));
+        mi = MergeIterator.get(iters, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey,DecoratedKey>()
         {
             DecoratedKey reduced = null;
 
@@ -58,21 +53,21 @@ public class ReducingKeyIterator impleme
             {
                 return reduced;
             }
-        };
+        });
     }
 
     public void close() throws IOException
     {
-        for (Object o : ci.getIterators())
+        for (Object o : mi.iterators())
         {
-            ((KeyIterator) o).close();
+            ((CloseableIterator)o).close();
         }
     }
 
     public long getTotalBytes()
     {
         long m = 0;
-        for (Object o : ci.getIterators())
+        for (Object o : mi.iterators())
         {
             m += ((KeyIterator) o).getTotalBytes();
         }
@@ -82,7 +77,7 @@ public class ReducingKeyIterator impleme
     public long getBytesRead()
     {
         long m = 0;
-        for (Object o : ci.getIterators())
+        for (Object o : mi.iterators())
         {
             m += ((KeyIterator) o).getBytesRead();
         }
@@ -96,12 +91,12 @@ public class ReducingKeyIterator impleme
 
     public boolean hasNext()
     {
-        return iter.hasNext();
+        return mi.hasNext();
     }
 
     public DecoratedKey next()
     {
-        return iter.next();
+        return mi.next();
     }
 
     public void remove()

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Thu Jun 16 04:24:33 2011
@@ -19,7 +19,6 @@
 
 package org.apache.cassandra.io.sstable;
 
-import java.io.Closeable;
 import java.io.File;
 import java.io.IOError;
 import java.io.IOException;
@@ -34,9 +33,9 @@ import org.apache.cassandra.db.columnite
 import org.apache.cassandra.db.filter.QueryFilter;
 import org.apache.cassandra.io.util.BufferedRandomAccessFile;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
 
-
-public class SSTableScanner implements Iterator<IColumnIterator>, Closeable
+public class SSTableScanner implements CloseableIterator<IColumnIterator>
 {
     private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Jun 16 04:24:33 2011
@@ -24,7 +24,6 @@ import java.util.*;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import com.google.common.collect.AbstractIterator;
-import org.apache.commons.collections.iterators.CollatingIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,7 +33,8 @@ import org.apache.cassandra.db.RangeSlic
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
 
 /**
  * Turns RangeSliceReply objects into row (string -> CF) maps, resolving
@@ -64,35 +64,27 @@ public class RangeSliceResponseResolver 
     // (this is not currently an issue since we don't do read repair for range queries.)
     public Iterable<Row> resolve() throws IOException
     {
-        CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
-        {
-            public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress> o2)
-            {
-                return o1.left.key.compareTo(o2.left.key);
-            }
-        });
-
+        ArrayList<RowIterator> iters = new ArrayList<RowIterator>(responses.size());
         int n = 0;
         for (Message response : responses)
         {
             RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
             n = Math.max(n, reply.rows.size());
-            collator.addIterator(new RowIterator(reply.rows.iterator(), response.getFrom()));
+            iters.add(new RowIterator(reply.rows.iterator(), response.getFrom()));
         }
-
         // for each row, compute the combination of all different versions seen, and repair incomplete versions
-        return new ReducingIterator<Pair<Row,InetAddress>, Row>(collator)
+        MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, new Comparator<Pair<Row,InetAddress>>()
+        {
+            public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress> o2)
+            {
+                return o1.left.key.compareTo(o2.left.key);
+            }
+        }, new MergeIterator.Reducer<Pair<Row,InetAddress>, Row>()
         {
             List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
             List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size());
             DecoratedKey key;
 
-            @Override
-            protected boolean isEqual(Pair<Row, InetAddress> o1, Pair<Row, InetAddress> o2)
-            {
-                return o1.left.key.equals(o2.left.key);
-            }
-
             public void reduce(Pair<Row,InetAddress> current)
             {
                 key = current.left.key;
@@ -122,7 +114,13 @@ public class RangeSliceResponseResolver 
                 versionSources.clear();
                 return new Row(key, resolved);
             }
-        };
+        });
+
+        List<Row> resolvedRows = new ArrayList<Row>(n);
+        while (iter.hasNext())
+            resolvedRows.add(iter.next());
+
+        return resolvedRows;
     }
 
     public void preprocess(Message message)
@@ -135,7 +133,7 @@ public class RangeSliceResponseResolver 
         return !responses.isEmpty();
     }
 
-    private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>>
+    private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>> implements CloseableIterator<Pair<Row,InetAddress>>
     {
         private final Iterator<Row> iter;
         private final InetAddress source;
@@ -150,6 +148,8 @@ public class RangeSliceResponseResolver 
         {
             return iter.hasNext() ? new Pair<Row, InetAddress>(iter.next(), source) : endOfData();
         }
+
+        public void close() {}
     }
 
     public Iterable<Message> getMessages()

Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Jun 16 04:24:33 2011
@@ -26,8 +26,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.collections.iterators.CollatingIterator;
-
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
 import org.apache.cassandra.db.filter.QueryFilter;
@@ -35,6 +33,7 @@ import org.apache.cassandra.db.filter.Qu
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.*;
 
 public class RowRepairResolver extends AbstractRowResolver
 {
@@ -142,14 +141,14 @@ public class RowRepairResolver extends A
         // this will handle removing columns and subcolumns that are supressed by a row or
         // supercolumn tombstone.
         QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter());
-        CollatingIterator iter = new CollatingIterator(resolved.metadata().comparator.columnComparator);
+        List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>();
         for (ColumnFamily version : versions)
         {
             if (version == null)
                 continue;
-            iter.addIterator(version.getColumnsMap().values().iterator());
+            iters.add(FBUtilities.closeableIterator(version.getColumnsMap().values().iterator()));
         }
-        filter.collectCollatedColumns(resolved, iter, Integer.MIN_VALUE);
+        filter.collateColumns(resolved, iters, resolved.metadata().comparator, Integer.MIN_VALUE);
         return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
     }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jun 16 04:24:33 2011
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.base.Joiner;
-import org.apache.commons.collections.iterators.CollatingIterator;
+import com.google.common.collect.AbstractIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -331,32 +331,6 @@ public class FBUtilities
         }
     }
 
-    /*
-    TODO how to make this work w/ ReducingKeyIterator?
-    public static <T extends Comparable<T>> CollatingIterator getCollatingIterator()
-    {
-        // CollatingIterator will happily NPE if you do not specify a comparator explicitly
-        return new CollatingIterator(new Comparator<T>()
-        {
-            public int compare(T o1, T o2)
-            {
-                return o1.compareTo(o2);
-            }
-        });
-    }
-     */
-    public static CollatingIterator getCollatingIterator()
-    {
-        // CollatingIterator will happily NPE if you do not specify a comparator explicitly
-        return new CollatingIterator(new Comparator()
-        {
-            public int compare(Object o1, Object o2)
-            {
-                return ((Comparable) o1).compareTo(o2);
-            }
-        });
-    }
-
     public static void atomicSetMax(AtomicInteger atomic, int i)
     {
         while (true)
@@ -614,4 +588,27 @@ public class FBUtilities
         return FBUtilities.construct(cache_provider, "row cache provider");
     }
 
+    public static <T> CloseableIterator<T> closeableIterator(Iterator<T> iterator)
+    {
+        return new WrappedCloseableIterator<T>(iterator);
+    }
+
+    private static final class WrappedCloseableIterator<T>
+        extends AbstractIterator<T> implements CloseableIterator<T>
+    {
+        private final Iterator<T> source;
+        public WrappedCloseableIterator(Iterator<T> source)
+        {
+            this.source = source;
+        }
+
+        protected T computeNext()
+        {
+            if (!source.hasNext())
+                return endOfData();
+            return source.next();
+        }
+
+        public void close() {}
+    }
 }

Added: cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java?rev=1136287&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Thu Jun 16 04:24:33 2011
@@ -0,0 +1,107 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class MergeIteratorTest
+{
+    CLI<String> all = null, cat = null, a = null, b = null, c = null, d = null;
+
+    @Before
+    public void clear()
+    {
+        all = new CLI("1", "2", "3", "3", "4", "5", "6", "7", "8", "8", "9");
+        cat = new CLI("1", "2", "33", "4", "5", "6", "7", "88", "9");
+        a = new CLI("1", "3", "5", "8");
+        b = new CLI("2", "4", "6");
+        c = new CLI("3", "7", "8", "9");
+        d = new CLI();
+    }
+
+    @Test
+    public void testOneToOne() throws Exception
+    {
+        MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
+                                                             Ordering.<String>natural());
+        assert Iterators.elementsEqual(all, smi);
+        smi.close();
+        assert a.closed && b.closed && c.closed && d.closed;
+    }
+
+    /** Test that duplicate values are concatted. */
+    @Test
+    public void testManyToOne() throws Exception
+    {
+        MergeIterator.Reducer<String,String> reducer = new MergeIterator.Reducer<String,String>()
+        {
+            String concatted = "";
+            public void reduce(String value)
+            {
+                concatted += value;
+            }
+            
+            public String getReduced()
+            {
+                String tmp = concatted;
+                concatted = "";
+                return tmp;
+            }
+        };
+        MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
+                                                             Ordering.<String>natural(),
+                                                             reducer);
+        assert Iterators.elementsEqual(cat, smi);
+        smi.close();
+        assert a.closed && b.closed && c.closed && d.closed;
+    }
+
+    // closeable list iterator
+    public static class CLI<E> extends AbstractIterator<E> implements CloseableIterator<E>
+    {
+        Iterator<E> iter;
+        boolean closed = false;
+        public CLI(E... items)
+        {
+            this.iter = Arrays.asList(items).iterator();
+        }
+        
+        protected E computeNext()
+        {
+            if (!iter.hasNext()) return endOfData();
+            return iter.next();
+        }
+        
+        public void close()
+        {
+            assert !this.closed;
+            this.closed = true;
+        }
+    }
+}