You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/09/22 22:07:28 UTC
svn commit: r1174360 - in /cassandra/branches/cassandra-1.0.0:
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/db/filter/
src/java/org/apache/cassandra/io/sstable/
src/java/org/apache/cassandra/service/ src/java/org/apache/ca...
Author: jbellis
Date: Thu Sep 22 20:07:28 2011
New Revision: 1174360
URL: http://svn.apache.org/viewvc?rev=1174360&view=rev
Log:
optimize single-source case for MergeIterator
patch by jbellis; tested by brandonwilliams for CASSANDRA-3234
Modified:
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java
cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/compaction/ParallelCompactionIterable.java Thu Sep 22 20:07:28 2011
@@ -89,10 +89,10 @@ public class ParallelCompactionIterable
private static class Unwrapper extends AbstractIterator<AbstractCompactedRow> implements CloseableIterator<AbstractCompactedRow>
{
- private final MergeIterator<RowContainer, CompactedRowContainer> reducer;
+ private final CloseableIterator<CompactedRowContainer> reducer;
private final CompactionController controller;
- public Unwrapper(MergeIterator<RowContainer, CompactedRowContainer> reducer, CompactionController controller)
+ public Unwrapper(CloseableIterator<CompactedRowContainer> reducer, CompactionController controller)
{
this.reducer = reducer;
this.controller = controller;
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Sep 22 20:07:28 2011
@@ -93,7 +93,7 @@ public class QueryFilter
Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(comparator);
// define a 'reduced' iterator that merges columns w/ the same name, which
// greatly simplifies computing liveColumns in the presence of tombstones.
- Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, new MergeIterator.Reducer<IColumn, IColumn>()
+ MergeIterator.Reducer<IColumn, IColumn> reducer = new MergeIterator.Reducer<IColumn, IColumn>()
{
ColumnFamily curCF = returnCF.cloneMeShallow();
@@ -111,7 +111,7 @@ public class QueryFilter
// consumers make of the result (for instance CFS.getColumnFamily() call removeDeleted() on the
// result which removes column; which shouldn't be done on the original super column).
assert current instanceof SuperColumn;
- curCF.addColumn(((SuperColumn)current).cloneMe());
+ curCF.addColumn(((SuperColumn) current).cloneMe());
}
else
{
@@ -129,16 +129,17 @@ public class QueryFilter
// time of the cf, if that is greater.
long deletedAt = c.getMarkedForDeleteAt();
if (returnCF.getMarkedForDeleteAt() > deletedAt)
- ((SuperColumn)c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
+ ((SuperColumn) c).delete(c.getLocalDeletionTime(), returnCF.getMarkedForDeleteAt());
- c = filter.filterSuperColumn((SuperColumn)c, gcBefore);
- ((SuperColumn)c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
+ c = filter.filterSuperColumn((SuperColumn) c, gcBefore);
+ ((SuperColumn) c).delete(c.getLocalDeletionTime(), deletedAt); // reset sc tombstone time to what it should be
}
- curCF.clear();
+ curCF.clear();
return c;
}
- });
+ };
+ Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, reducer);
topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
}
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Thu Sep 22 20:07:28 2011
@@ -24,16 +24,15 @@ package org.apache.cassandra.io.sstable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.IMergeIterator;
import org.apache.cassandra.utils.MergeIterator;
public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
{
- private final MergeIterator<DecoratedKey,DecoratedKey> mi;
+ private final IMergeIterator<DecoratedKey,DecoratedKey> mi;
public ReducingKeyIterator(Collection<SSTableReader> sstables)
{
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Sep 22 20:07:28 2011
@@ -86,7 +86,8 @@ public class RangeSliceResponseResolver
iters.add(new RowIterator(reply.rows.iterator(), response.getFrom()));
}
// for each row, compute the combination of all different versions seen, and repair incomplete versions
- MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, pairComparator, new Reducer());
+ // TODO do we need to call close?
+ CloseableIterator<Row> iter = MergeIterator.get(iters, pairComparator, new Reducer());
List<Row> resolvedRows = new ArrayList<Row>(n);
while (iter.hasNext())
Modified: cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java (original)
+++ cassandra/branches/cassandra-1.0.0/src/java/org/apache/cassandra/utils/MergeIterator.java Thu Sep 22 20:07:28 2011
@@ -20,50 +20,30 @@ package org.apache.cassandra.utils;
import java.io.IOException;
import java.io.IOError;
-import java.util.ArrayDeque;
-import java.util.Comparator;
-import java.util.List;
-import java.util.PriorityQueue;
+import java.util.*;
import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.Ordering;
/** Merges sorted input iterators which individually contain unique items. */
-public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements CloseableIterator<Out>
+public abstract class MergeIterator<In,Out> extends AbstractIterator<Out> implements IMergeIterator<In, Out>
{
- public final Comparator<In> comp;
+ protected final Reducer<In,Out> reducer;
protected final List<? extends CloseableIterator<In>> iterators;
- // a queue for return: all candidates must be open and have at least one item
- protected final PriorityQueue<Candidate<In>> queue;
- protected MergeIterator(List<? extends CloseableIterator<In>> iters, Comparator<In> comp)
+ protected MergeIterator(List<? extends CloseableIterator<In>> iters, Reducer<In, Out> reducer)
{
this.iterators = iters;
- this.comp = comp;
- this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size()));
- for (CloseableIterator<In> iter : iters)
- {
- Candidate<In> candidate = new Candidate<In>(iter, comp);
- if (!candidate.advance())
- // was empty
- continue;
- this.queue.add(candidate);
- }
- }
-
- public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters)
- {
- return get(iters, (Comparator<E>)Ordering.natural());
- }
-
- public static <E> MergeIterator<E,E> get(List<? extends CloseableIterator<E>> iters, Comparator<E> comp)
- {
- return new OneToOne<E>(iters, comp);
+ this.reducer = reducer;
}
- public static <In,Out> MergeIterator<In,Out> get(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer)
+ public static <In, Out> IMergeIterator<In, Out> get(final List<? extends CloseableIterator<In>> sources,
+ Comparator<In> comparator,
+ final Reducer<In, Out> reducer)
{
- return new ManyToOne<In,Out>(iters, comp, reducer);
+ assert !sources.isEmpty();
+ if (sources.size() == 1)
+ return new OneToOne<In, Out>(sources, reducer);
+ return new ManyToOne<In, Out>(sources, comparator, reducer);
}
public Iterable<? extends CloseableIterator<In>> iterators()
@@ -71,23 +51,6 @@ public abstract class MergeIterator<In,O
return iterators;
}
- /**
- * Consumes sorted items from the queue: should only remove items from the queue,
- * not add them.
- */
- protected abstract Out consume();
-
- /**
- * Returns consumed items to the queue.
- */
- protected abstract void advance();
-
- protected final Out computeNext()
- {
- advance();
- return consume();
- }
-
public void close()
{
for (CloseableIterator<In> iterator : this.iterators)
@@ -103,47 +66,38 @@ public abstract class MergeIterator<In,O
}
}
- /** A MergeIterator that returns a single value for each one consumed. */
- private static final class OneToOne<E> extends MergeIterator<E,E>
- {
- // the last returned candidate, so that we can lazily call 'advance()'
- protected Candidate<E> candidate;
- public OneToOne(List<? extends CloseableIterator<E>> iters, Comparator<E> comp)
- {
- super(iters, comp);
- }
-
- protected final E consume()
- {
- candidate = queue.poll();
- if (candidate == null)
- return endOfData();
- return candidate.item;
- }
-
- protected final void advance()
- {
- if (candidate != null && candidate.advance())
- // has more items
- queue.add(candidate);
- }
- }
-
/** A MergeIterator that consumes multiple input values per output value. */
private static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
{
- protected final Reducer<In,Out> reducer;
+ public final Comparator<In> comp;
+ // a queue for return: all candidates must be open and have at least one item
+ protected final PriorityQueue<Candidate<In>> queue;
// a stack of the last consumed candidates, so that we can lazily call 'advance()'
// TODO: if we had our own PriorityQueue implementation we could stash items
// at the end of its array, so we wouldn't need this storage
protected final ArrayDeque<Candidate<In>> candidates;
public ManyToOne(List<? extends CloseableIterator<In>> iters, Comparator<In> comp, Reducer<In,Out> reducer)
{
- super(iters, comp);
- this.reducer = reducer;
+ super(iters, reducer);
+ this.comp = comp;
+ this.queue = new PriorityQueue<Candidate<In>>(Math.max(1, iters.size()));
+ for (CloseableIterator<In> iter : iters)
+ {
+ Candidate<In> candidate = new Candidate<In>(iter, comp);
+ if (!candidate.advance())
+ // was empty
+ continue;
+ this.queue.add(candidate);
+ }
this.candidates = new ArrayDeque<Candidate<In>>(queue.size());
}
+ protected final Out computeNext()
+ {
+ advance();
+ return consume();
+ }
+
/** Consume values by sending them to the reducer while they are equal. */
protected final Out consume()
{
@@ -177,17 +131,13 @@ public abstract class MergeIterator<In,O
private final CloseableIterator<In> iter;
private final Comparator<In> comp;
private In item;
+
public Candidate(CloseableIterator<In> iter, Comparator<In> comp)
{
this.iter = iter;
this.comp = comp;
}
- public In item()
- {
- return item;
- }
-
/** @return True if our iterator had an item, and it is now available */
protected boolean advance()
{
@@ -221,4 +171,24 @@ public abstract class MergeIterator<In,O
*/
protected void onKeyChange() {}
}
+
+ private static class OneToOne<In, Out> extends MergeIterator<In, Out>
+ {
+ private final CloseableIterator<In> source;
+
+ public OneToOne(List<? extends CloseableIterator<In>> sources, Reducer<In, Out> reducer)
+ {
+ super(sources, reducer);
+ source = sources.get(0);
+ }
+
+ protected Out computeNext()
+ {
+ if (!source.hasNext())
+ return endOfData();
+ reducer.onKeyChange();
+ reducer.reduce(source.next());
+ return reducer.getReduced();
+ }
+ }
}
Modified: cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java?rev=1174360&r1=1174359&r2=1174360&view=diff
==============================================================================
--- cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java (original)
+++ cassandra/branches/cassandra-1.0.0/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Thu Sep 22 20:07:28 2011
@@ -19,9 +19,7 @@
package org.apache.cassandra.utils;
import java.util.Arrays;
-import java.util.Comparator;
import java.util.Iterator;
-import java.util.List;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
@@ -45,16 +43,6 @@ public class MergeIteratorTest
d = new CLI();
}
- @Test
- public void testOneToOne() throws Exception
- {
- MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
- Ordering.<String>natural());
- assert Iterators.elementsEqual(all, smi);
- smi.close();
- assert a.closed && b.closed && c.closed && d.closed;
- }
-
/** Test that duplicate values are concatted. */
@Test
public void testManyToOne() throws Exception
@@ -74,7 +62,7 @@ public class MergeIteratorTest
return tmp;
}
};
- MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
+ IMergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
Ordering.<String>natural(),
reducer);
assert Iterators.elementsEqual(cat, smi);