You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/10 19:48:50 UTC
svn commit: r793052 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra/db:
ColumnFamilyStore.java QueryFilter.java SliceQueryFilter.java Table.java
Author: jbellis
Date: Fri Jul 10 17:48:49 2009
New Revision: 793052
URL: http://svn.apache.org/viewvc?rev=793052&view=rev
Log:
refactor out QueryFilter, SliceQueryFilter.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-287
Added:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=793052&r1=793051&r2=793052&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jul 10 17:48:49 2009
@@ -1555,7 +1555,7 @@
* get a list of columns starting from a given column, in a specified order
* only the latest version of a column is returned
*/
- public ColumnFamily getSliceFrom(String key, String cfName, String startColumn, String finishColumn, boolean isAscending, int offset, int count)
+ public ColumnFamily getColumnFamily(QueryFilter filter)
throws IOException, ExecutionException, InterruptedException
{
sstableLock_.readLock().lock();
@@ -1569,7 +1569,7 @@
memtableLock_.readLock().lock();
try
{
- iter = memtable_.getColumnIterator(key, cfName, isAscending, startColumn);
+ iter = filter.getMemColumnIterator(memtable_);
returnCF = iter.getColumnFamily();
}
finally
@@ -1579,10 +1579,10 @@
iterators.add(iter);
/* add the memtables being flushed */
- List<Memtable> memtables = getUnflushedMemtables(cfName);
+ List<Memtable> memtables = getUnflushedMemtables(filter.getColumnFamilyName());
for (Memtable memtable:memtables)
{
- iter = memtable.getColumnIterator(key, cfName, isAscending, startColumn);
+ iter = filter.getMemColumnIterator(memtable);
returnCF.delete(iter.getColumnFamily());
iterators.add(iter);
}
@@ -1591,67 +1591,24 @@
List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
for (SSTableReader sstable : sstables)
{
- iter = new SSTableColumnIterator(sstable.getFilename(), key, cfName, startColumn, isAscending);
+ iter = filter.getSSTableColumnIterator(sstable);
if (iter.hasNext())
{
returnCF.delete(iter.getColumnFamily());
iterators.add(iter);
}
- }
-
- // define a 'reduced' iterator that merges columns w/ the same name, which
- // greatly simplifies computing liveColumns in the presence of tombstones.
- Comparator<IColumn> comparator = new Comparator<IColumn>()
- {
- public int compare(IColumn c1, IColumn c2)
+ else
{
- return c1.name().compareTo(c2.name());
+ iter.close();
}
- };
- if (!isAscending)
- comparator = new ReverseComparator(comparator);
+ }
+
+ Comparator<IColumn> comparator = filter.getColumnComparator();
Iterator collated = IteratorUtils.collatedIterator(comparator, iterators);
if (!collated.hasNext())
- return ColumnFamily.create(table_, cfName);
- ReducingIterator<IColumn> reduced = new ReducingIterator<IColumn>(collated)
- {
- ColumnFamily curCF = returnCF.cloneMeShallow();
-
- protected Object getKey(IColumn o)
- {
- return o == null ? null : o.name();
- }
+ return ColumnFamily.create(table_, filter.getColumnFamilyName());
- public void reduce(IColumn current)
- {
- curCF.addColumn(current);
- }
-
- protected IColumn getReduced()
- {
- IColumn c = curCF.getAllColumns().first();
- curCF.clear();
- return c;
- }
- };
-
- // add unique columns to the CF container
- int liveColumns = 0;
- int limit = offset + count;
- for (IColumn column : reduced)
- {
- if (liveColumns >= limit)
- break;
- if (!finishColumn.isEmpty()
- && ((isAscending && column.name().compareTo(finishColumn) > 0))
- || (!isAscending && column.name().compareTo(finishColumn) < 0))
- break;
- if (!column.isMarkedForDelete())
- liveColumns++;
-
- if (liveColumns > offset)
- returnCF.addColumn(column);
- }
+ filter.collectColumns(returnCF, collated);
return removeDeleted(returnCF);
}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java?rev=793052&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java Fri Jul 10 17:48:49 2009
@@ -0,0 +1,89 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Collection;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.commons.collections.IteratorUtils;
+
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.utils.ReducingIterator;
+
+public abstract class QueryFilter
+{
+ public final String key;
+ public final String columnFamilyColumn;
+
+ protected QueryFilter(String key, String columnFamilyColumn)
+ {
+ this.key = key;
+ this.columnFamilyColumn = columnFamilyColumn;
+ }
+
+ /**
+ * returns an iterator that returns columns from the given memtable
+ * matching the Filter criteria in sorted order.
+ */
+ public abstract ColumnIterator getMemColumnIterator(Memtable memtable);
+
+ /**
+ * returns an iterator that returns columns from the given SSTable
+ * matching the Filter criteria in sorted order.
+ */
+ public abstract ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException;
+
+ /**
+ * collects columns from reducedColumns into returnCF. Termination is determined
+ * by the filter code, which should have some limit on the number of columns
+ * to avoid running out of memory on large rows.
+ */
+ public abstract void collectColumns(ColumnFamily returnCF, ReducingIterator<IColumn> reducedColumns);
+
+ protected Comparator<IColumn> getColumnComparator()
+ {
+ return new Comparator<IColumn>()
+ {
+ public int compare(IColumn c1, IColumn c2)
+ {
+ return c1.name().compareTo(c2.name());
+ }
+ };
+ }
+
+ public void collectColumns(final ColumnFamily returnCF, Iterator collatedColumns)
+ {
+ // define a 'reduced' iterator that merges columns w/ the same name, which
+ // greatly simplifies computing liveColumns in the presence of tombstones.
+ ReducingIterator<IColumn> reduced = new ReducingIterator<IColumn>(collatedColumns)
+ {
+ ColumnFamily curCF = returnCF.cloneMeShallow();
+
+ protected Object getKey(IColumn o)
+ {
+ return o == null ? null : o.name();
+ }
+
+ public void reduce(IColumn current)
+ {
+ curCF.addColumn(current);
+ }
+
+ protected IColumn getReduced()
+ {
+ IColumn c = curCF.getAllColumns().first();
+ curCF.clear();
+ return c;
+ }
+ };
+
+ collectColumns(returnCF, reduced);
+ }
+
+ public String getColumnFamilyName()
+ {
+ return RowMutation.getColumnAndColumnFamily(columnFamilyColumn)[0]);
+ }
+}
Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java?rev=793052&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java Fri Jul 10 17:48:49 2009
@@ -0,0 +1,64 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.utils.ReducingIterator;
+
+public class SliceQueryFilter extends QueryFilter
+{
+ public final String start, finish;
+ public final boolean isAscending;
+ public final int offset, count;
+
+ public SliceQueryFilter(String key, String columnFamilyColumn, String start, String finish, boolean ascending, int offset, int count)
+ {
+ super(key, columnFamilyColumn);
+ this.start = start;
+ this.finish = finish;
+ isAscending = ascending;
+ this.offset = offset;
+ this.count = count;
+ }
+
+ public ColumnIterator getMemColumnIterator(Memtable memtable)
+ {
+ return memtable.getColumnIterator(key, columnFamilyColumn, isAscending, start);
+ }
+
+ public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException
+ {
+ return new SSTableColumnIterator(sstable.getFilename(), key, columnFamilyColumn, start, isAscending);
+ }
+
+ @Override
+ protected Comparator<IColumn> getColumnComparator()
+ {
+ Comparator<IColumn> comparator = super.getColumnComparator();
+ return isAscending ? comparator : new ReverseComparator(comparator);
+ }
+
+ public void collectColumns(ColumnFamily returnCF, ReducingIterator<IColumn> reducedColumns)
+ {
+ int liveColumns = 0;
+ int limit = offset + count;
+
+ for (IColumn column : reducedColumns)
+ {
+ if (liveColumns >= limit)
+ break;
+ if (!finish.isEmpty()
+ && ((isAscending && column.name().compareTo(finish) > 0))
+ || (!isAscending && column.name().compareTo(finish) < 0))
+ break;
+ if (!column.isMarkedForDelete())
+ liveColumns++;
+
+ if (liveColumns > offset)
+ returnCF.addColumn(column);
+ }
+ }
+}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=793052&r1=793051&r2=793052&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Jul 10 17:48:49 2009
@@ -592,7 +592,8 @@
long start1 = System.currentTimeMillis();
try
{
- ColumnFamily columnFamily = cfStore.getSliceFrom(key, cfName, start, finish, isAscending, offset, count);
+ QueryFilter filter = new SliceQueryFilter(key, cfName, start, finish, isAscending, offset, count);
+ ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
if (columnFamily != null)
row.addColumnFamily(columnFamily);
long timeTaken = System.currentTimeMillis() - start1;