You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/22 22:07:28 UTC

svn commit: r1174360 - in /cassandra/branches/cassandra-1.0.0: src/java/org/apache/cassandra/db/compaction/ src/java/org/apache/cassandra/db/filter/ src/java/org/apache/cassandra/io/sstable/ src/java/org/apache/cassandra/service/ src/java/org/apache/ca...

Author: jbellis
Date: Thu Sep 22 20:07:28 2011
New Revision: 1174360

URL: http://svn.apache.org/viewvc?rev=1174360&view=rev
Log:
optimize single-source case for MergeIterator
patch by jbellis; tested by brandonwilliams for CASSANDRA-3234

Modified:
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
    cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java
    cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java Thu Sep 22 20:07:28 2011
@@ -89,10 +89,10 @@ public class ParallelCompactionIterable 
 
     private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
     {
-        private final MergeIterator<RowContainer, CompactedRowContainer> reducer;
+        private final CloseableIterator<CompactedRowContainer> reducer;
         private final CompactionController controller;
 
-        public Unwrapper(MergeIterator<RowContainer, CompactedRowContainer> reducer, CompactionController controller)
+        public Unwrapper(CloseableIterator<CompactedRowContainer> reducer, CompactionController controller)
         {
             this.reducer = reducer;
             this.controller = controller;

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Sep 22 20:07:28 2011
@@ -93,7 +93,7 @@ public class QueryFilter
         Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(comparator);
         // define a 'reduced' iterator that merges columns w/ the same name, which
         // greatly simplifies computing liveColumns in the presence of tombstones.
-        Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, new MergeIterator.Reducer<IColumn, IColumn>()
+        MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>()
         {
             ColumnFamily curCF = returnCF.cloneMeShallow();
 
@@ -111,7 +111,7 @@ public class QueryFilter
                     // consumers make of the result (for instance CFS.getColumnFamily() call removeDeleted() on the
                     // result which removes column; which shouldn't be done on the original super column).
                     assert current instanceof SuperColumn;
-                    curCF.addColumn(((SuperColumn)current).cloneMe());
+                    curCF.addColumn(((SuperColumn) current).cloneMe());
                 }
                 else
                 {
@@ -129,16 +129,17 @@ public class QueryFilter
                     // time of the cf, if that is greater.
                     long deletedAt = c.getMarkedForDeleteAt();
                     if (returnCF.getMarkedForDeleteAt() > deletedAt)
-                        ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
+                        ((SuperColumn) c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
 
-                    c = filter.filterSuperColumn((SuperColumn)c, gcBefore);
-                    ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
+                    c = filter.filterSuperColumn((SuperColumn) c, gcBefore);
+                    ((SuperColumn) c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
                 }
-                curCF.clear();           
+                curCF.clear();
 
                 return c;
             }
-        });
+        };
+        Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, reducer);
 
         topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
     }

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Thu Sep 22 20:07:28 2011
@@ -24,16 +24,15 @@ package org.apache.cassandra.io.sstable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Iterator;
 
 import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.IMergeIterator;
 import org.apache.cassandra.utils.MergeIterator;
 
 public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
 {
-    private final MergeIterator<DecoratedKey,DecoratedKey> mi;
+    private final IMergeIterator<DecoratedKey,DecoratedKey> mi;
 
     public ReducingKeyIterator(Collection<SSTableReader> sstables)
     {

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Sep 22 20:07:28 2011
@@ -86,7 +86,8 @@ public class RangeSliceResponseResolver 
             iters.add(new RowIterator(reply.rows.iterator(), response.getFrom()));
         }
         // for each row, compute the combination of all different versions seen, and repair incomplete versions
-        MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, pairComparator, new Reducer());
+        // TODO do we need to call close?
+        CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer());
 
         List<Row> resolvedRows = new ArrayList<Row>(n);
         while (iter.hasNext())

Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java Thu Sep 22 20:07:28 2011
@@ -20,50 +20,30 @@ package org.apache.cassandra.utils;
 
 import java.io.IOException;
 import java.io.IOError;
-import java.util.ArrayDeque;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
+import java.util.*;
 
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Ordering;
 
 /** Merges sorted input iterators which individually contain unique items. */
-public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements CloseableIterator<Out>
+public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out>
 {
-    public final Comparator<In> comp;
+    protected final Reducer<In,Out> reducer;
     protected final List<? extends CloseableIterator<In>> iterators;
-    // a queue for return: all candidates must be open and have at least one item
-    protected final PriorityQueue<Candidate<In>> queue;
 
-    protected MergeIterator(List<? extends CloseableIterator<In>> iters, Comparator<In> comp)
+    protected MergeIterator(List<? extends CloseableIterator<In>> iters, Reducer<In, Out> reducer)
     {
         this.iterators = iters;
-        this.comp = comp;
-        this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size()));
-        for (CloseableIterator<In> iter : iters)
-        {
-            Candidate<In> candidate = new Candidate<In>(iter, comp);
-            if (!candidate.advance())
-                // was empty
-                continue;
-            this.queue.add(candidate);
-        }
-    }
-
-    public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters)
-    {
-        return get(iters, (Comparator<E>)Ordering.natural());
-    }
-
-    public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters, Comparator<E> comp)
-    {
-        return new OneToOne<E>(iters, comp);
+        this.reducer = reducer;
     }
 
-    public static <In,Out> MergeIterator<In,Out> get(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer)
+    public static <In, Out> IMergeIterator<In, Out> get(final List<? extends CloseableIterator<In>> sources,
+                                                    Comparator<In> comparator,
+                                                    final Reducer<In, Out> reducer)
     {
-        return new ManyToOne<In,Out>(iters, comp, reducer);
+        assert !sources.isEmpty();
+        if (sources.size() == 1)
+            return new OneToOne<In, Out>(sources, reducer);
+        return new ManyToOne<In, Out>(sources, comparator, reducer);
     }
 
     public Iterable<? extends CloseableIterator<In>> iterators()
@@ -71,23 +51,6 @@ public abstract class MergeIterator<In,O
         return iterators;
     }
 
-    /**
-     * Consumes sorted items from the queue: should only remove items from the queue,
-     * not add them.
-     */
-    protected abstract Out consume();
-
-    /**
-     * Returns consumed items to the queue.
-     */
-    protected abstract void advance();
-
-    protected final Out computeNext()
-    {
-        advance();
-        return consume();
-    }
-
     public void close()
     {
         for (CloseableIterator<In> iterator : this.iterators)
@@ -103,47 +66,38 @@ public abstract class MergeIterator<In,O
         }
     }
 
-    /** A MergeIterator that returns a single value for each one consumed. */
-    private static final class OneToOne<E> extends MergeIterator<E,E>
-    {
-        // the last returned candidate, so that we can lazily call 'advance()'
-        protected Candidate<E> candidate;
-        public OneToOne(List<? extends CloseableIterator<E>> iters, Comparator<E> comp)
-        {
-            super(iters, comp);
-        }
-
-        protected final E consume()
-        {
-            candidate = queue.poll();
-            if (candidate == null)
-                return endOfData();
-            return candidate.item;
-        }
-
-        protected final void advance()
-        {
-            if (candidate != null && candidate.advance())
-                // has more items
-                queue.add(candidate);
-        }
-    }
-
     /** A MergeIterator that consumes multiple input values per output value. */
     private static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
     {
-        protected final Reducer<In,Out> reducer;
+        public final Comparator<In> comp;
+        // a queue for return: all candidates must be open and have at least one item
+        protected final PriorityQueue<Candidate<In>> queue;
         // a stack of the last consumed candidates, so that we can lazily call 'advance()'
         // TODO: if we had our own PriorityQueue implementation we could stash items
         // at the end of its array, so we wouldn't need this storage
         protected final ArrayDeque<Candidate<In>> candidates;
         public ManyToOne(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer)
         {
-            super(iters, comp);
-            this.reducer = reducer;
+            super(iters, reducer);
+            this.comp = comp;
+            this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size()));
+            for (CloseableIterator<In> iter : iters)
+            {
+                Candidate<In> candidate = new Candidate<In>(iter, comp);
+                if (!candidate.advance())
+                    // was empty
+                    continue;
+                this.queue.add(candidate);
+            }
             this.candidates = new ArrayDeque<Candidate<In>>(queue.size());
         }
 
+        protected final Out computeNext()
+        {
+            advance();
+            return consume();
+        }
+
         /** Consume values by sending them to the reducer while they are equal. */
         protected final Out consume()
         {
@@ -177,17 +131,13 @@ public abstract class MergeIterator<In,O
         private final CloseableIterator<In> iter;
         private final Comparator<In> comp;
         private In item;
+
         public Candidate(CloseableIterator<In> iter, Comparator<In> comp)
         {
             this.iter = iter;
             this.comp = comp;
         }
 
-        public In item()
-        {
-            return item;
-        }
-
         /** @return True if our iterator had an item, and it is now available */
         protected boolean advance()
         {
@@ -221,4 +171,24 @@ public abstract class MergeIterator<In,O
          */
         protected void onKeyChange() {}
     }
+
+    private static class OneToOne<In, Out> extends MergeIterator<In, Out>
+    {
+        private final CloseableIterator<In> source;
+
+        public OneToOne(List<? extends CloseableIterator<In>> sources, Reducer<In, Out> reducer)
+        {
+            super(sources, reducer);
+            source = sources.get(0);
+        }
+
+        protected Out computeNext()
+        {
+            if (!source.hasNext())
+                return endOfData();
+            reducer.onKeyChange();
+            reducer.reduce(source.next());
+            return reducer.getReduced();
+        }
+    }
 }

Modified: cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java (original)
+++ cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Thu Sep 22 20:07:28 2011
@@ -19,9 +19,7 @@
 package org.apache.cassandra.utils;
 
 import java.util.Arrays;
-import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
 
 import com.google.common.collect.AbstractIterator;
 import com.google.common.collect.Iterators;
@@ -45,16 +43,6 @@ public class MergeIteratorTest
         d = new CLI();
     }
 
-    @Test
-    public void testOneToOne() throws Exception
-    {
-        MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
-                                                             Ordering.<String>natural());
-        assert Iterators.elementsEqual(all, smi);
-        smi.close();
-        assert a.closed && b.closed && c.closed && d.closed;
-    }
-
     /** Test that duplicate values are concatted. */
     @Test
     public void testManyToOne() throws Exception
@@ -74,7 +62,7 @@ public class MergeIteratorTest
                 return tmp;
             }
         };
-        MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
+        IMergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
                                                              Ordering.<String>natural(),
                                                              reducer);
         assert Iterators.elementsEqual(cat, smi);