You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/07/10 19:48:50 UTC

svn commit: r793052 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra/db: ColumnFamilyStore.java QueryFilter.java SliceQueryFilter.java Table.java

Author: jbellis
Date: Fri Jul 10 17:48:49 2009
New Revision: 793052

URL: http://svn.apache.org/viewvc?rev=793052&view=rev
Log:
refactor out QueryFilter, SliceQueryFilter.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-287

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=793052&r1=793051&r2=793052&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri Jul 10 17:48:49 2009
@@ -1555,7 +1555,7 @@
      * get a list of columns starting from a given column, in a specified order
      * only the latest version of a column is returned
      */
-    public ColumnFamily getSliceFrom(String key, String cfName, String startColumn, String finishColumn, boolean isAscending, int offset, int count)
+    public ColumnFamily getColumnFamily(QueryFilter filter)
     throws IOException, ExecutionException, InterruptedException
     {
         sstableLock_.readLock().lock();
@@ -1569,7 +1569,7 @@
             memtableLock_.readLock().lock();
             try
             {
-                iter = memtable_.getColumnIterator(key, cfName, isAscending, startColumn);
+                iter = filter.getMemColumnIterator(memtable_);
                 returnCF = iter.getColumnFamily();
             }
             finally
@@ -1579,10 +1579,10 @@
             iterators.add(iter);
 
             /* add the memtables being flushed */
-            List<Memtable> memtables = getUnflushedMemtables(cfName);
+            List<Memtable> memtables = getUnflushedMemtables(filter.getColumnFamilyName());
             for (Memtable memtable:memtables)
             {
-                iter = memtable.getColumnIterator(key, cfName, isAscending, startColumn);
+                iter = filter.getMemColumnIterator(memtable);
                 returnCF.delete(iter.getColumnFamily());
                 iterators.add(iter);
             }
@@ -1591,67 +1591,24 @@
             List<SSTableReader> sstables = new ArrayList<SSTableReader>(ssTables_.values());
             for (SSTableReader sstable : sstables)
             {
-                iter = new SSTableColumnIterator(sstable.getFilename(), key, cfName, startColumn, isAscending);
+                iter = filter.getSSTableColumnIterator(sstable);
                 if (iter.hasNext())
                 {
                     returnCF.delete(iter.getColumnFamily());
                     iterators.add(iter);
                 }
-            }
-
-            // define a 'reduced' iterator that merges columns w/ the same name, which
-            // greatly simplifies computing liveColumns in the presence of tombstones.
-            Comparator<IColumn> comparator = new Comparator<IColumn>()
-            {
-                public int compare(IColumn c1, IColumn c2)
+                else
                 {
-                    return c1.name().compareTo(c2.name());
+                    iter.close();
                 }
-            };
-            if (!isAscending)
-                comparator = new ReverseComparator(comparator);
+            }
+
+            Comparator<IColumn> comparator = filter.getColumnComparator();
             Iterator collated = IteratorUtils.collatedIterator(comparator, iterators);
             if (!collated.hasNext())
-                return ColumnFamily.create(table_, cfName);
-            ReducingIterator<IColumn> reduced = new ReducingIterator<IColumn>(collated)
-            {
-                ColumnFamily curCF = returnCF.cloneMeShallow();
-
-                protected Object getKey(IColumn o)
-                {
-                    return o == null ? null : o.name();
-                }
+                return ColumnFamily.create(table_, filter.getColumnFamilyName());
 
-                public void reduce(IColumn current)
-                {
-                    curCF.addColumn(current);
-                }
-
-                protected IColumn getReduced()
-                {
-                    IColumn c = curCF.getAllColumns().first();
-                    curCF.clear();
-                    return c;
-                }
-            };
-
-            // add unique columns to the CF container
-            int liveColumns = 0;
-            int limit = offset + count;
-            for (IColumn column : reduced)
-            {
-                if (liveColumns >= limit)
-                    break;
-                if (!finishColumn.isEmpty()
-                    && ((isAscending && column.name().compareTo(finishColumn) > 0))
-                        || (!isAscending && column.name().compareTo(finishColumn) < 0))
-                    break;
-                if (!column.isMarkedForDelete())
-                    liveColumns++;
-
-                if (liveColumns > offset)
-                    returnCF.addColumn(column);
-            }
+            filter.collectColumns(returnCF, collated);
 
             return removeDeleted(returnCF);
         }

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java?rev=793052&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/QueryFilter.java Fri Jul 10 17:48:49 2009
@@ -0,0 +1,89 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Collection;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.commons.collections.IteratorUtils;
+
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.utils.ReducingIterator;
+
+public abstract class QueryFilter
+{
+    public final String key;
+    public final String columnFamilyColumn;
+
+    protected QueryFilter(String key, String columnFamilyColumn)
+    {
+        this.key = key;
+        this.columnFamilyColumn = columnFamilyColumn;
+    }
+
+    /**
+     * returns an iterator that returns columns from the given memtable
+     * matching the Filter criteria in sorted order.
+     */
+    public abstract ColumnIterator getMemColumnIterator(Memtable memtable);
+
+    /**
+     * returns an iterator that returns columns from the given SSTable
+     * matching the Filter criteria in sorted order.
+     */
+    public abstract ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException;
+
+    /**
+     * collects columns from reducedColumns into returnCF.  Termination is determined
+     * by the filter code, which should have some limit on the number of columns
+     * to avoid running out of memory on large rows.
+     */
+    public abstract void collectColumns(ColumnFamily returnCF, ReducingIterator<IColumn> reducedColumns);
+
+    protected Comparator<IColumn> getColumnComparator()
+    {
+        return new Comparator<IColumn>()
+        {
+            public int compare(IColumn c1, IColumn c2)
+            {
+                return c1.name().compareTo(c2.name());
+            }
+        };
+    }
+    
+    public void collectColumns(final ColumnFamily returnCF, Iterator collatedColumns)
+    {
+        // define a 'reduced' iterator that merges columns w/ the same name, which
+        // greatly simplifies computing liveColumns in the presence of tombstones.
+        ReducingIterator<IColumn> reduced = new ReducingIterator<IColumn>(collatedColumns)
+        {
+            ColumnFamily curCF = returnCF.cloneMeShallow();
+
+            protected Object getKey(IColumn o)
+            {
+                return o == null ? null : o.name();
+            }
+
+            public void reduce(IColumn current)
+            {
+                curCF.addColumn(current);
+            }
+
+            protected IColumn getReduced()
+            {
+                IColumn c = curCF.getAllColumns().first();
+                curCF.clear();
+                return c;
+            }
+        };
+
+        collectColumns(returnCF, reduced);
+    }
+
+    public String getColumnFamilyName()
+    {
+        return RowMutation.getColumnAndColumnFamily(columnFamilyColumn)[0]);
+    }
+}

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java?rev=793052&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SliceQueryFilter.java Fri Jul 10 17:48:49 2009
@@ -0,0 +1,64 @@
+package org.apache.cassandra.db;
+
+import java.io.IOException;
+import java.util.Comparator;
+
+import org.apache.commons.collections.comparators.ReverseComparator;
+
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.utils.ReducingIterator;
+
+public class SliceQueryFilter extends QueryFilter
+{
+    public final String start, finish;
+    public final boolean isAscending;
+    public final int offset, count;
+
+    public SliceQueryFilter(String key, String columnFamilyColumn, String start, String finish, boolean ascending, int offset, int count)
+    {
+        super(key, columnFamilyColumn);
+        this.start = start;
+        this.finish = finish;
+        isAscending = ascending;
+        this.offset = offset;
+        this.count = count;
+    }
+
+    public ColumnIterator getMemColumnIterator(Memtable memtable)
+    {
+        return memtable.getColumnIterator(key, columnFamilyColumn, isAscending, start);
+    }
+
+    public ColumnIterator getSSTableColumnIterator(SSTableReader sstable) throws IOException
+    {
+        return new SSTableColumnIterator(sstable.getFilename(), key, columnFamilyColumn, start, isAscending);
+    }
+
+    @Override
+    protected Comparator<IColumn> getColumnComparator()
+    {
+        Comparator<IColumn> comparator = super.getColumnComparator();
+        return isAscending ? comparator : new ReverseComparator(comparator);
+    }
+
+    public void collectColumns(ColumnFamily returnCF, ReducingIterator<IColumn> reducedColumns)
+    {
+        int liveColumns = 0;
+        int limit = offset + count;
+
+        for (IColumn column : reducedColumns)
+        {
+            if (liveColumns >= limit)
+                break;
+            if (!finish.isEmpty()
+                && ((isAscending && column.name().compareTo(finish) > 0))
+                    || (!isAscending && column.name().compareTo(finish) < 0))
+                break;
+            if (!column.isMarkedForDelete())
+                liveColumns++;
+
+            if (liveColumns > offset)
+                returnCF.addColumn(column);
+        }
+    }
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=793052&r1=793051&r2=793052&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Jul 10 17:48:49 2009
@@ -592,7 +592,8 @@
         long start1 = System.currentTimeMillis();
         try
         {
-            ColumnFamily columnFamily = cfStore.getSliceFrom(key, cfName, start, finish, isAscending, offset, count);
+            QueryFilter filter = new SliceQueryFilter(key, cfName, start, finish, isAscending, offset, count);
+            ColumnFamily columnFamily = cfStore.getColumnFamily(filter);
             if (columnFamily != null)
                 row.addColumnFamily(columnFamily);
             long timeTaken = System.currentTimeMillis() - start1;