You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/06/16 06:24:34 UTC
svn commit: r1136287 - in /cassandra/trunk: ./
src/java/org/apache/cassandra/db/
src/java/org/apache/cassandra/db/columniterator/
src/java/org/apache/cassandra/db/compaction/
src/java/org/apache/cassandra/db/filter/
src/java/org/apache/cassandra/io/sst...
Author: jbellis
Date: Thu Jun 16 04:24:33 2011
New Revision: 1136287
URL: http://svn.apache.org/viewvc?rev=1136287&view=rev
Log:
replace CollatingIterator, ReducingIterator with MergeIterator
patch by stuhood; reviewed by jbellis for CASSANDRA-2062
Added:
cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/db/RowIterator.java
cassandra/trunk/src/java/org/apache/cassandra/utils/ReducingIterator.java
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 16 04:24:33 2011
@@ -3,6 +3,8 @@
* add commitlog_total_space_in_mb to prevent fragmented logs (CASSANDRA-2427)
* removed commitlog_rotation_threshold_in_mb configuration (CASSANDRA-2771)
* make AbstractBounds.normalize de-overlapp overlapping ranges (CASSANDRA-2641)
+ * replace CollatingIterator, ReducingIterator with MergeIterator
+ (CASSANDRA-2062)
0.8.1
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu Jun 16 04:24:33 2011
@@ -32,7 +32,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import com.google.common.collect.Iterables;
-import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1188,7 +1187,7 @@ public class ColumnFamilyStore implement
IColumnIterator ci = filter.getMemtableColumnIterator(cached, null, getComparator());
ColumnFamily cf = ci.getColumnFamily().cloneMeShallow();
- filter.collectCollatedColumns(cf, ci, gcBefore);
+ filter.collateColumns(cf, Collections.singletonList(ci), getComparator(), gcBefore);
// TODO this is necessary because when we collate supercolumns together, we don't check
// their subcolumns for relevance, so we need to do a second prune post facto here.
return cf.isSuper() ? removeDeleted(cf, gcBefore) : removeDeletedCF(cf, gcBefore);
@@ -1244,10 +1243,7 @@ public class ColumnFamilyStore implement
if (iterators.size() == 0)
return null;
- Comparator<IColumn> comparator = filter.filter.getColumnComparator(getComparator());
- Iterator collated = IteratorUtils.collatedIterator(comparator, iterators);
-
- filter.collectCollatedColumns(returnCF, collated, gcBefore);
+ filter.collateColumns(returnCF, iterators, getComparator(), gcBefore);
// Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly:
return returnCF;
@@ -1298,7 +1294,7 @@ public class ColumnFamilyStore implement
// It is fine to aliases the View.sstables since it's an unmodifiable collection
Collection<SSTableReader> sstables = currentView.sstables;
- RowIterator iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
+ CloseableIterator<Row> iterator = RowIteratorFactory.getIterator(memtables, sstables, startWith, stopAt, filter, getComparator(), this);
List<Row> rows = new ArrayList<Row>();
try
@@ -1486,7 +1482,7 @@ public class ColumnFamilyStore implement
ColumnFamily expandedData = data;
data = expandedData.cloneMeShallow();
IColumnIterator iter = dataFilter.getMemtableColumnIterator(expandedData, dk, getComparator());
- new QueryFilter(dk, path, dataFilter).collectCollatedColumns(data, iter, gcBefore());
+ new QueryFilter(dk, path, dataFilter).collateColumns(data, Collections.singletonList(iter), getComparator(), gcBefore());
}
rows.add(new Row(dk, data));
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Thu Jun 16 04:24:33 2011
@@ -25,15 +25,16 @@ import java.util.Map.Entry;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
+import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Iterators;
-import org.apache.commons.collections.IteratorUtils;
import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
public class RowIteratorFactory
{
@@ -60,7 +61,7 @@ public class RowIteratorFactory
* @param comparator
* @return A row iterator following all the given restrictions
*/
- public static RowIterator getIterator(final Collection<Memtable> memtables,
+ public static CloseableIterator<Row> getIterator(final Collection<Memtable> memtables,
final Collection<SSTableReader> sstables,
final DecoratedKey startWith,
final DecoratedKey stopAt,
@@ -70,7 +71,7 @@ public class RowIteratorFactory
)
{
// fetch data from current memtable, historical memtables, and SSTables in the correct order.
- final List<Iterator<IColumnIterator>> iterators = new ArrayList<Iterator<IColumnIterator>>();
+ final List<CloseableIterator<IColumnIterator>> iterators = new ArrayList<CloseableIterator<IColumnIterator>>();
// we iterate through memtables with a priority queue to avoid more sorting than necessary.
// this predicate throws out the rows before the start of our range.
Predicate<IColumnIterator> p = new Predicate<IColumnIterator>()
@@ -85,8 +86,7 @@ public class RowIteratorFactory
// memtables
for (Memtable memtable : memtables)
{
- iterators.add(Iterators.filter(Iterators.transform(memtable.getEntryIterator(startWith),
- new ConvertToColumnIterator(filter, comparator)), p));
+ iterators.add(new ConvertToColumnIterator(filter, comparator, p, memtable.getEntryIterator(startWith)));
}
// sstables
@@ -98,10 +98,9 @@ public class RowIteratorFactory
iterators.add(scanner);
}
- Iterator<IColumnIterator> collated = IteratorUtils.collatedIterator(COMPARE_BY_KEY, iterators);
-
+ final Memtable firstMemtable = memtables.iterator().next();
// reduce rows from all sources into a single row
- ReducingIterator<IColumnIterator, Row> reduced = new ReducingIterator<IColumnIterator, Row>(collated)
+ return MergeIterator.get(iterators, COMPARE_BY_KEY, new MergeIterator.Reducer<IColumnIterator, Row>()
{
private final int gcBefore = (int) (System.currentTimeMillis() / 1000) - cfs.metadata.getGcGraceSeconds();
private final List<IColumnIterator> colIters = new ArrayList<IColumnIterator>();
@@ -121,57 +120,61 @@ public class RowIteratorFactory
this.returnCF.delete(current.getColumnFamily());
}
- @Override
- protected boolean isEqual(IColumnIterator o1, IColumnIterator o2)
- {
- return COMPARE_BY_KEY.compare(o1, o2) == 0;
- }
-
protected Row getReduced()
{
- Comparator<IColumn> colComparator = filter.filter.getColumnComparator(comparator);
- Iterator<IColumn> colCollated = IteratorUtils.collatedIterator(colComparator, colIters);
// First check if this row is in the rowCache. If it is we can skip the rest
ColumnFamily cached = cfs.getRawCachedRow(key);
- if (cached != null)
+ if (cached == null)
+ // not cached: collate
+ filter.collateColumns(returnCF, colIters, comparator, gcBefore);
+ else
{
QueryFilter keyFilter = new QueryFilter(key, filter.path, filter.filter);
returnCF = cfs.filterColumnFamily(cached, keyFilter, gcBefore);
}
- else if (colCollated.hasNext())
- {
- filter.collectCollatedColumns(returnCF, colCollated, gcBefore);
- }
Row rv = new Row(key, returnCF);
colIters.clear();
key = null;
return rv;
}
- };
-
- return new RowIterator(reduced, iterators);
+ });
}
/**
* Get a ColumnIterator for a specific key in the memtable.
*/
- private static class ConvertToColumnIterator implements Function<Map.Entry<DecoratedKey, ColumnFamily>, IColumnIterator>
+ private static class ConvertToColumnIterator extends AbstractIterator<IColumnIterator> implements CloseableIterator<IColumnIterator>
{
- private QueryFilter filter;
- private AbstractType comparator;
+ private final QueryFilter filter;
+ private final AbstractType comparator;
+ private final Predicate<IColumnIterator> pred;
+ private final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter;
- public ConvertToColumnIterator(QueryFilter filter, AbstractType comparator)
+ public ConvertToColumnIterator(QueryFilter filter, AbstractType comparator, Predicate<IColumnIterator> pred, Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter)
{
this.filter = filter;
this.comparator = comparator;
+ this.pred = pred;
+ this.iter = iter;
}
- public IColumnIterator apply(final Entry<DecoratedKey, ColumnFamily> entry)
+ public IColumnIterator computeNext()
{
- return filter.getMemtableColumnIterator(entry.getValue(), entry.getKey(), comparator);
+ while (iter.hasNext())
+ {
+ Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next();
+ IColumnIterator ici = filter.getMemtableColumnIterator(entry.getValue(), entry.getKey(), comparator);
+ if (pred.apply(ici))
+ return ici;
+ }
+ return endOfData();
}
- }
+ public void close()
+ {
+ // pass
+ }
+ }
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/columniterator/IColumnIterator.java Thu Jun 16 04:24:33 2011
@@ -27,8 +27,9 @@ import java.util.Iterator;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.utils.CloseableIterator;
-public interface IColumnIterator extends Iterator<IColumn>
+public interface IColumnIterator extends CloseableIterator<IColumn>
{
/**
* @return An empty CF holding metadata for the row being iterated.
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java Thu Jun 16 04:24:33 2011
@@ -21,33 +21,35 @@ package org.apache.cassandra.db.compacti
*/
-import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
-import org.apache.cassandra.service.StorageService;
-import org.apache.commons.collections.iterators.CollatingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.AbstractIterator;
+
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.columniterator.IColumnIterator;
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableScanner;
-import org.apache.cassandra.io.util.FileUtils;
-import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
-public class CompactionIterator extends ReducingIterator<SSTableIdentityIterator, AbstractCompactedRow>
-implements Closeable, CompactionInfo.Holder
+public class CompactionIterator extends AbstractIterator<AbstractCompactedRow>
+implements CloseableIterator<AbstractCompactedRow>, CompactionInfo.Holder
{
private static Logger logger = LoggerFactory.getLogger(CompactionIterator.class);
public static final int FILE_BUFFER_SIZE = 1024 * 1024;
- protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+ private final MergeIterator<IColumnIterator, AbstractCompactedRow> source;
protected final CompactionType type;
protected final CompactionController controller;
@@ -65,33 +67,26 @@ implements Closeable, CompactionInfo.Hol
public CompactionIterator(CompactionType type, Iterable<SSTableReader> sstables, CompactionController controller) throws IOException
{
- this(type, getCollatingIterator(sstables), controller);
+ this(type, getScanners(sstables), controller);
}
- @SuppressWarnings("unchecked")
- protected CompactionIterator(CompactionType type, Iterator iter, CompactionController controller)
+ protected CompactionIterator(CompactionType type, List<SSTableScanner> scanners, CompactionController controller)
{
- super(iter);
this.type = type;
this.controller = controller;
+ this.source = MergeIterator.get(scanners, ICOMP, new Reducer());
row = 0;
totalBytes = bytesRead = 0;
- for (SSTableScanner scanner : getScanners())
- {
+ for (SSTableScanner scanner : scanners)
totalBytes += scanner.getFileLength();
- }
}
- @SuppressWarnings("unchecked")
- protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader> sstables) throws IOException
+ protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables) throws IOException
{
- // TODO CollatingIterator iter = FBUtilities.<SSTableIdentityIterator>getCollatingIterator();
- CollatingIterator iter = FBUtilities.getCollatingIterator();
+ ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
for (SSTableReader sstable : sstables)
- {
- iter.addIterator(sstable.getDirectScanner(FILE_BUFFER_SIZE));
- }
- return iter;
+ scanners.add(sstable.getDirectScanner(FILE_BUFFER_SIZE));
+ return scanners;
}
public CompactionInfo getCompactionInfo()
@@ -103,50 +98,12 @@ implements Closeable, CompactionInfo.Hol
totalBytes);
}
- @Override
- protected boolean isEqual(SSTableIdentityIterator o1, SSTableIdentityIterator o2)
- {
- return o1.getKey().equals(o2.getKey());
- }
-
- public void reduce(SSTableIdentityIterator current)
- {
- rows.add(current);
- }
- protected AbstractCompactedRow getReduced()
+ public AbstractCompactedRow computeNext()
{
- assert rows.size() > 0;
-
- try
- {
- AbstractCompactedRow compactedRow = controller.getCompactedRow(rows);
- if (compactedRow.isEmpty())
- {
- controller.invalidateCachedRow(compactedRow.key);
- return null;
- }
-
- // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
- // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
- // memory on long running instances
- controller.removeDeletedInCache(compactedRow.key);
-
- return compactedRow;
- }
- finally
- {
- rows.clear();
- if ((row++ % 1000) == 0)
- {
- bytesRead = 0;
- for (SSTableScanner scanner : getScanners())
- {
- bytesRead += scanner.getFilePointer();
- }
- throttle();
- }
- }
+ if (!source.hasNext())
+ return endOfData();
+ return source.next();
}
private void throttle()
@@ -187,16 +144,69 @@ implements Closeable, CompactionInfo.Hol
public void close() throws IOException
{
- FileUtils.close(getScanners());
+ source.close();
}
protected Iterable<SSTableScanner> getScanners()
{
- return ((CollatingIterator)source).getIterators();
+ return (Iterable<SSTableScanner>)(source.iterators());
}
public String toString()
{
return this.getCompactionInfo().toString();
}
+
+ protected class Reducer extends MergeIterator.Reducer<IColumnIterator, AbstractCompactedRow>
+ {
+ protected final List<SSTableIdentityIterator> rows = new ArrayList<SSTableIdentityIterator>();
+
+ public void reduce(IColumnIterator current)
+ {
+ rows.add((SSTableIdentityIterator)current);
+ }
+
+ protected AbstractCompactedRow getReduced()
+ {
+ assert rows.size() > 0;
+
+ try
+ {
+ AbstractCompactedRow compactedRow = controller.getCompactedRow(rows);
+ if (compactedRow.isEmpty())
+ {
+ controller.invalidateCachedRow(compactedRow.key);
+ return null;
+ }
+
+ // If the raw is cached, we call removeDeleted on it to have/ coherent query returns. However it would look
+ // like some deleted columns lived longer than gc_grace + compaction. This can also free up big amount of
+ // memory on long running instances
+ controller.removeDeletedInCache(compactedRow.key);
+
+ return compactedRow;
+ }
+ finally
+ {
+ rows.clear();
+ if ((row++ % 1000) == 0)
+ {
+ bytesRead = 0;
+ for (SSTableScanner scanner : getScanners())
+ {
+ bytesRead += scanner.getFilePointer();
+ }
+ throttle();
+ }
+ }
+ }
+ }
+
+ public final static Comparator<IColumnIterator> ICOMP = new Comparator<IColumnIterator>()
+ {
+ public int compare(IColumnIterator i1, IColumnIterator i2)
+ {
+ return i1.getKey().compareTo(i2.getKey());
+ }
+ };
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java Thu Jun 16 04:24:33 2011
@@ -31,7 +31,6 @@ import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.commons.collections.PredicateUtils;
-import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.commons.collections.iterators.FilterIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -934,18 +933,16 @@ public class CompactionManager implement
public ValidationCompactionIterator(ColumnFamilyStore cfs, Range range) throws IOException
{
super(CompactionType.VALIDATION,
- getCollatingIterator(cfs.getSSTables(), range),
+ getScanners(cfs.getSSTables(), range),
new CompactionController(cfs, cfs.getSSTables(), getDefaultGcBefore(cfs), true));
}
- protected static CollatingIterator getCollatingIterator(Iterable<SSTableReader> sstables, Range range) throws IOException
+ protected static List<SSTableScanner> getScanners(Iterable<SSTableReader> sstables, Range range) throws IOException
{
- CollatingIterator iter = FBUtilities.getCollatingIterator();
+ ArrayList<SSTableScanner> scanners = new ArrayList<SSTableScanner>();
for (SSTableReader sstable : sstables)
- {
- iter.addIterator(sstable.getDirectScanner(FILE_BUFFER_SIZE, range));
- }
- return iter;
+ scanners.add(sstable.getDirectScanner(FILE_BUFFER_SIZE, range));
+ return scanners;
}
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/compaction/LazilyCompactedRow.java Thu Jun 16 04:24:33 2011
@@ -29,7 +29,6 @@ import java.util.*;
import com.google.common.base.Predicates;
import com.google.common.collect.Iterators;
-import org.apache.commons.collections.iterators.CollatingIterator;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
@@ -40,7 +39,7 @@ import org.apache.cassandra.db.marshal.A
import org.apache.cassandra.io.sstable.SSTableIdentityIterator;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.util.IIterableColumns;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.MergeIterator;
/**
* LazilyCompactedRow only computes the row bloom filter and column index in memory
@@ -60,7 +59,7 @@ public class LazilyCompactedRow extends
private final boolean shouldPurge;
private final DataOutputBuffer headerBuffer;
private ColumnFamily emptyColumnFamily;
- private LazyColumnIterator iter;
+ private Reducer reducer;
private int columnCount;
private long columnSerializedSize;
@@ -84,10 +83,10 @@ public class LazilyCompactedRow extends
// initialize row header so isEmpty can be called
headerBuffer = new DataOutputBuffer();
ColumnIndexer.serialize(this, headerBuffer);
- // reach into iterator used by ColumnIndexer to get column count and size
- columnCount = iter.size;
- columnSerializedSize = iter.serializedSize;
- iter = null;
+ // reach into the reducer used during iteration to get column count and size
+ columnCount = reducer.size;
+ columnSerializedSize = reducer.serializedSize;
+ reducer = null;
}
public void write(DataOutput out) throws IOException
@@ -156,10 +155,9 @@ public class LazilyCompactedRow extends
public Iterator<IColumn> iterator()
{
for (SSTableIdentityIterator row : rows)
- {
row.reset();
- }
- iter = new LazyColumnIterator(new CollatingIterator(getComparator().columnComparator, rows));
+ reducer = new Reducer();
+ Iterator<IColumn> iter = MergeIterator.get(rows, getComparator().columnComparator, reducer);
return Iterators.filter(iter, Predicates.notNull());
}
@@ -168,23 +166,12 @@ public class LazilyCompactedRow extends
return columnCount;
}
- private class LazyColumnIterator extends ReducingIterator<IColumn, IColumn>
+ private class Reducer extends MergeIterator.Reducer<IColumn, IColumn>
{
ColumnFamily container = emptyColumnFamily.cloneMeShallow();
long serializedSize = 4; // int for column count
int size = 0;
- public LazyColumnIterator(Iterator<IColumn> source)
- {
- super(source);
- }
-
- @Override
- protected boolean isEqual(IColumn o1, IColumn o2)
- {
- return o1.name().equals(o2.name());
- }
-
public void reduce(IColumn current)
{
container.addColumn(current);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Thu Jun 16 04:24:33 2011
@@ -22,10 +22,7 @@ package org.apache.cassandra.db.filter;
import java.nio.ByteBuffer;
-import java.util.Comparator;
-import java.util.Iterator;
-import java.util.SortedSet;
-import java.util.TreeSet;
+import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +35,8 @@ import org.apache.cassandra.io.sstable.S
import org.apache.cassandra.io.util.FileDataInput;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
public class QueryFilter
{
@@ -88,11 +86,14 @@ public class QueryFilter
return superFilter.getSSTableColumnIterator(sstable, file, key);
}
- public void collectCollatedColumns(final ColumnFamily returnCF, Iterator<IColumn> collatedColumns, final int gcBefore)
+ // TODO move gcBefore into a field
+ public void collateColumns(final ColumnFamily returnCF, List<? extends CloseableIterator<IColumn>> toCollate, AbstractType comparator, final int gcBefore)
{
+ IFilter topLevelFilter = (superFilter == null ? filter : superFilter);
+ Comparator<IColumn> fcomp = topLevelFilter.getColumnComparator(comparator);
// define a 'reduced' iterator that merges columns w/ the same name, which
// greatly simplifies computing liveColumns in the presence of tombstones.
- ReducingIterator<IColumn, IColumn> reduced = new ReducingIterator<IColumn, IColumn>(collatedColumns)
+ Iterator<IColumn> reduced = MergeIterator.get(toCollate, fcomp, new MergeIterator.Reducer<IColumn, IColumn>()
{
ColumnFamily curCF = returnCF.cloneMeShallow();
@@ -137,9 +138,9 @@ public class QueryFilter
return c;
}
- };
+ });
- (superFilter == null ? filter : superFilter).collectReducedColumns(returnCF, reduced, gcBefore);
+ topLevelFilter.collectReducedColumns(returnCF, reduced, gcBefore);
}
public String getColumnFamilyName()
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/KeyIterator.java Thu Jun 16 04:24:33 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.io.sstable;
*/
-import java.io.Closeable;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
@@ -33,8 +32,9 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
-public class KeyIterator extends AbstractIterator<DecoratedKey> implements Iterator<DecoratedKey>, Closeable
+public class KeyIterator extends AbstractIterator<DecoratedKey> implements CloseableIterator<DecoratedKey>
{
private final BufferedRandomAccessFile in;
private final Descriptor desc;
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/ReducingKeyIterator.java Thu Jun 16 04:24:33 2011
@@ -21,31 +21,26 @@ package org.apache.cassandra.io.sstable;
*/
-import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
-import org.apache.commons.collections.iterators.CollatingIterator;
-
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
-public class ReducingKeyIterator implements Iterator<DecoratedKey>, Closeable
+public class ReducingKeyIterator implements CloseableIterator<DecoratedKey>
{
- private final CollatingIterator ci;
- private final ReducingIterator<DecoratedKey, DecoratedKey> iter;
+ private final MergeIterator<DecoratedKey,DecoratedKey> mi;
public ReducingKeyIterator(Collection<SSTableReader> sstables)
{
- ci = FBUtilities.getCollatingIterator();
+ ArrayList<KeyIterator> iters = new ArrayList<KeyIterator>();
for (SSTableReader sstable : sstables)
- {
- ci.addIterator(new KeyIterator(sstable.descriptor));
- }
-
- iter = new ReducingIterator<DecoratedKey, DecoratedKey>(ci)
+ iters.add(new KeyIterator(sstable.descriptor));
+ mi = MergeIterator.get(iters, DecoratedKey.comparator, new MergeIterator.Reducer<DecoratedKey,DecoratedKey>()
{
DecoratedKey reduced = null;
@@ -58,21 +53,21 @@ public class ReducingKeyIterator impleme
{
return reduced;
}
- };
+ });
}
public void close() throws IOException
{
- for (Object o : ci.getIterators())
+ for (Object o : mi.iterators())
{
- ((KeyIterator) o).close();
+ ((CloseableIterator)o).close();
}
}
public long getTotalBytes()
{
long m = 0;
- for (Object o : ci.getIterators())
+ for (Object o : mi.iterators())
{
m += ((KeyIterator) o).getTotalBytes();
}
@@ -82,7 +77,7 @@ public class ReducingKeyIterator impleme
public long getBytesRead()
{
long m = 0;
- for (Object o : ci.getIterators())
+ for (Object o : mi.iterators())
{
m += ((KeyIterator) o).getBytesRead();
}
@@ -96,12 +91,12 @@ public class ReducingKeyIterator impleme
public boolean hasNext()
{
- return iter.hasNext();
+ return mi.hasNext();
}
public DecoratedKey next()
{
- return iter.next();
+ return mi.next();
}
public void remove()
Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableScanner.java Thu Jun 16 04:24:33 2011
@@ -19,7 +19,6 @@
package org.apache.cassandra.io.sstable;
-import java.io.Closeable;
import java.io.File;
import java.io.IOError;
import java.io.IOException;
@@ -34,9 +33,9 @@ import org.apache.cassandra.db.columnite
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.io.util.BufferedRandomAccessFile;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.CloseableIterator;
-
-public class SSTableScanner implements Iterator<IColumnIterator>, Closeable
+public class SSTableScanner implements CloseableIterator<IColumnIterator>
{
private static Logger logger = LoggerFactory.getLogger(SSTableScanner.class);
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RangeSliceResponseResolver.java Thu Jun 16 04:24:33 2011
@@ -24,7 +24,6 @@ import java.util.*;
import java.util.concurrent.LinkedBlockingQueue;
import com.google.common.collect.AbstractIterator;
-import org.apache.commons.collections.iterators.CollatingIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,7 +33,8 @@ import org.apache.cassandra.db.RangeSlic
import org.apache.cassandra.db.Row;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.utils.Pair;
-import org.apache.cassandra.utils.ReducingIterator;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.MergeIterator;
/**
* Turns RangeSliceReply objects into row (string -> CF) maps, resolving
@@ -64,35 +64,27 @@ public class RangeSliceResponseResolver
// (this is not currently an issue since we don't do read repair for range queries.)
public Iterable<Row> resolve() throws IOException
{
- CollatingIterator collator = new CollatingIterator(new Comparator<Pair<Row,InetAddress>>()
- {
- public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress> o2)
- {
- return o1.left.key.compareTo(o2.left.key);
- }
- });
-
+ ArrayList<RowIterator> iters = new ArrayList<RowIterator>(responses.size());
int n = 0;
for (Message response : responses)
{
RangeSliceReply reply = RangeSliceReply.read(response.getMessageBody(), response.getVersion());
n = Math.max(n, reply.rows.size());
- collator.addIterator(new RowIterator(reply.rows.iterator(), response.getFrom()));
+ iters.add(new RowIterator(reply.rows.iterator(), response.getFrom()));
}
-
// for each row, compute the combination of all different versions seen, and repair incomplete versions
- return new ReducingIterator<Pair<Row,InetAddress>, Row>(collator)
+ MergeIterator<Pair<Row,InetAddress>, Row> iter = MergeIterator.get(iters, new Comparator<Pair<Row,InetAddress>>()
+ {
+ public int compare(Pair<Row,InetAddress> o1, Pair<Row,InetAddress> o2)
+ {
+ return o1.left.key.compareTo(o2.left.key);
+ }
+ }, new MergeIterator.Reducer<Pair<Row,InetAddress>, Row>()
{
List<ColumnFamily> versions = new ArrayList<ColumnFamily>(sources.size());
List<InetAddress> versionSources = new ArrayList<InetAddress>(sources.size());
DecoratedKey key;
- @Override
- protected boolean isEqual(Pair<Row, InetAddress> o1, Pair<Row, InetAddress> o2)
- {
- return o1.left.key.equals(o2.left.key);
- }
-
public void reduce(Pair<Row,InetAddress> current)
{
key = current.left.key;
@@ -122,7 +114,13 @@ public class RangeSliceResponseResolver
versionSources.clear();
return new Row(key, resolved);
}
- };
+ });
+
+ List<Row> resolvedRows = new ArrayList<Row>(n);
+ while (iter.hasNext())
+ resolvedRows.add(iter.next());
+
+ return resolvedRows;
}
public void preprocess(Message message)
@@ -135,7 +133,7 @@ public class RangeSliceResponseResolver
return !responses.isEmpty();
}
- private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>>
+ private static class RowIterator extends AbstractIterator<Pair<Row,InetAddress>> implements CloseableIterator<Pair<Row,InetAddress>>
{
private final Iterator<Row> iter;
private final InetAddress source;
@@ -150,6 +148,8 @@ public class RangeSliceResponseResolver
{
return iter.hasNext() ? new Pair<Row, InetAddress>(iter.next(), source) : endOfData();
}
+
+ public void close() {}
}
public Iterable<Message> getMessages()
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/RowRepairResolver.java Thu Jun 16 04:24:33 2011
@@ -26,8 +26,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import org.apache.commons.collections.iterators.CollatingIterator;
-
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.QueryFilter;
@@ -35,6 +33,7 @@ import org.apache.cassandra.db.filter.Qu
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.utils.*;
public class RowRepairResolver extends AbstractRowResolver
{
@@ -142,14 +141,14 @@ public class RowRepairResolver extends A
// this will handle removing columns and subcolumns that are supressed by a row or
// supercolumn tombstone.
QueryFilter filter = new QueryFilter(null, new QueryPath(resolved.metadata().cfName), new IdentityQueryFilter());
- CollatingIterator iter = new CollatingIterator(resolved.metadata().comparator.columnComparator);
+ List<CloseableIterator<IColumn>> iters = new ArrayList<CloseableIterator<IColumn>>();
for (ColumnFamily version : versions)
{
if (version == null)
continue;
- iter.addIterator(version.getColumnsMap().values().iterator());
+ iters.add(FBUtilities.closeableIterator(version.getColumnsMap().values().iterator()));
}
- filter.collectCollatedColumns(resolved, iter, Integer.MIN_VALUE);
+ filter.collateColumns(resolved, iters, resolved.metadata().comparator, Integer.MIN_VALUE);
return ColumnFamilyStore.removeDeleted(resolved, Integer.MIN_VALUE);
}
Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java?rev=1136287&r1=1136286&r2=1136287&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/utils/FBUtilities.java Thu Jun 16 04:24:33 2011
@@ -35,7 +35,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import com.google.common.base.Joiner;
-import org.apache.commons.collections.iterators.CollatingIterator;
+import com.google.common.collect.AbstractIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -331,32 +331,6 @@ public class FBUtilities
}
}
- /*
- TODO how to make this work w/ ReducingKeyIterator?
- public static <T extends Comparable<T>> CollatingIterator getCollatingIterator()
- {
- // CollatingIterator will happily NPE if you do not specify a comparator explicitly
- return new CollatingIterator(new Comparator<T>()
- {
- public int compare(T o1, T o2)
- {
- return o1.compareTo(o2);
- }
- });
- }
- */
- public static CollatingIterator getCollatingIterator()
- {
- // CollatingIterator will happily NPE if you do not specify a comparator explicitly
- return new CollatingIterator(new Comparator()
- {
- public int compare(Object o1, Object o2)
- {
- return ((Comparable) o1).compareTo(o2);
- }
- });
- }
-
public static void atomicSetMax(AtomicInteger atomic, int i)
{
while (true)
@@ -614,4 +588,27 @@ public class FBUtilities
return FBUtilities.construct(cache_provider, "row cache provider");
}
+ public static <T> CloseableIterator<T> closeableIterator(Iterator<T> iterator)
+ {
+ return new WrappedCloseableIterator<T>(iterator);
+ }
+
+ private static final class WrappedCloseableIterator<T>
+ extends AbstractIterator<T> implements CloseableIterator<T>
+ {
+ private final Iterator<T> source;
+ public WrappedCloseableIterator(Iterator<T> source)
+ {
+ this.source = source;
+ }
+
+ protected T computeNext()
+ {
+ if (!source.hasNext())
+ return endOfData();
+ return source.next();
+ }
+
+ public void close() {}
+ }
}
Added: cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java?rev=1136287&view=auto
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java (added)
+++ cassandra/trunk/test/unit/org/apache/cassandra/utils/MergeIteratorTest.java Thu Jun 16 04:24:33 2011
@@ -0,0 +1,107 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.cassandra.utils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Ordering;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class MergeIteratorTest
+{
+ CLI<String> all = null, cat = null, a = null, b = null, c = null, d = null;
+
+ @Before
+ public void clear()
+ {
+ all = new CLI("1", "2", "3", "3", "4", "5", "6", "7", "8", "8", "9");
+ cat = new CLI("1", "2", "33", "4", "5", "6", "7", "88", "9");
+ a = new CLI("1", "3", "5", "8");
+ b = new CLI("2", "4", "6");
+ c = new CLI("3", "7", "8", "9");
+ d = new CLI();
+ }
+
+ @Test
+ public void testOneToOne() throws Exception
+ {
+ MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
+ Ordering.<String>natural());
+ assert Iterators.elementsEqual(all, smi);
+ smi.close();
+ assert a.closed && b.closed && c.closed && d.closed;
+ }
+
+ /** Test that duplicate values are concatted. */
+ @Test
+ public void testManyToOne() throws Exception
+ {
+ MergeIterator.Reducer<String,String> reducer = new MergeIterator.Reducer<String,String>()
+ {
+ String concatted = "";
+ public void reduce(String value)
+ {
+ concatted += value;
+ }
+
+ public String getReduced()
+ {
+ String tmp = concatted;
+ concatted = "";
+ return tmp;
+ }
+ };
+ MergeIterator<String,String> smi = MergeIterator.get(Arrays.asList(a, b, c, d),
+ Ordering.<String>natural(),
+ reducer);
+ assert Iterators.elementsEqual(cat, smi);
+ smi.close();
+ assert a.closed && b.closed && c.closed && d.closed;
+ }
+
+ // closeable list iterator
+ public static class CLI<E> extends AbstractIterator<E> implements CloseableIterator<E>
+ {
+ Iterator<E> iter;
+ boolean closed = false;
+ public CLI(E... items)
+ {
+ this.iter = Arrays.asList(items).iterator();
+ }
+
+ protected E computeNext()
+ {
+ if (!iter.hasNext()) return endOfData();
+ return iter.next();
+ }
+
+ public void close()
+ {
+ assert !this.closed;
+ this.closed = true;
+ }
+ }
+}