You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/10 17:30:12 UTC

svn commit: r802827 - in /incubator/cassandra/trunk: src/java/org/apache/cassandra/db/ src/java/org/apache/cassandra/db/filter/ test/system/

Author: jbellis
Date: Mon Aug 10 15:30:12 2009
New Revision: 802827

URL: http://svn.apache.org/viewvc?rev=802827&view=rev
Log:
use the existing collectReducedColumns api to make subcolumn slices conform as expected to filter semantics
patch by jbellis; reviewed by Evan Weaver for CASSANDRA-356

Added:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
    incubator/cassandra/trunk/test/system/test_server.py

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamily.java Mon Aug 10 15:30:12 2009
@@ -42,10 +42,8 @@
 import org.apache.cassandra.db.marshal.MarshalException;
 import org.apache.cassandra.db.marshal.LongType;
 
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-public final class ColumnFamily
+
+public final class ColumnFamily implements IColumnContainer
 {
     /* The column serializer for this Column Family. Create based on config. */
     private static ColumnFamilySerializer serializer_ = new ColumnFamilySerializer();

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Aug 10 15:30:12 2009
@@ -45,6 +45,7 @@
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.collections.IteratorUtils;
 import org.apache.commons.collections.Predicate;
+import org.apache.commons.collections.iterators.ReverseListIterator;
 
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
@@ -1379,14 +1380,15 @@
         {
             QueryFilter nameFilter = new NamesQueryFilter(filter.key, new QueryPath(columnFamily_), filter.path.superColumnName);
             ColumnFamily cf = getColumnFamily(nameFilter);
-            if (cf != null)
-            {
-                for (IColumn column : cf.getSortedColumns())
-                {
-                    filter.filterSuperColumn((SuperColumn)column, gcBefore);
-                }
-            }
-            return removeDeleted(cf, gcBefore);
+            if (cf == null)
+                return cf;
+
+            assert cf.getSortedColumns().size() == 1;
+            SuperColumn sc = (SuperColumn)cf.getSortedColumns().iterator().next();
+            SuperColumn scFiltered = filter.filterSuperColumn(sc, gcBefore);
+            ColumnFamily cfFiltered = cf.cloneMeShallow();
+            cfFiltered.addColumn(scFiltered);
+            return cfFiltered;
         }
 
         // we are querying top-level columns, do a merging fetch with indexes.

Added: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java?rev=802827&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java (added)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IColumnContainer.java Mon Aug 10 15:30:12 2009
@@ -0,0 +1,10 @@
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.marshal.AbstractType;
+
+public interface IColumnContainer
+{
+    public void addColumn(IColumn column);
+
+    public AbstractType getComparator();
+}

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Mon Aug 10 15:30:12 2009
@@ -281,7 +281,7 @@
         int index;
         if (filter.start.length == 0 && !filter.isAscending)
         {
-            /* assuming the we scan from the largest column in descending order*/
+            /* scan from the largest column in descending order */
             index = 0;
         }
         else

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/SuperColumn.java Mon Aug 10 15:30:12 2009
@@ -33,11 +33,8 @@
 import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.db.marshal.MarshalException;
 
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
 
-public final class SuperColumn implements IColumn
+public final class SuperColumn implements IColumn, IColumnContainer
 {
 	private static Logger logger_ = Logger.getLogger(SuperColumn.class);
 
@@ -47,7 +44,6 @@
     }
 
     private byte[] name_;
-    // TODO make subcolumn comparator configurable
     private ConcurrentSkipListMap<byte[], IColumn> columns_;
     private int localDeletionTime = Integer.MIN_VALUE;
 	private long markedForDeleteAt = Long.MIN_VALUE;

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/IdentityQueryFilter.java Mon Aug 10 15:30:12 2009
@@ -14,8 +14,9 @@
         super(key, path, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE);
     }
 
-    public void filterSuperColumn(SuperColumn superColumn, int gcBefore)
+    public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
     {
         // no filtering done, deliberately
+        return superColumn;
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/NamesQueryFilter.java Mon Aug 10 15:30:12 2009
@@ -5,10 +5,7 @@
 
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.utils.ReducingIterator;
-import org.apache.cassandra.db.Memtable;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.IColumn;
-import org.apache.cassandra.db.SuperColumn;
+import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 
 public class NamesQueryFilter extends QueryFilter
@@ -50,7 +47,7 @@
         return new SSTableNamesIterator(sstable.getFilename(), key, getColumnFamilyName(), columns);
     }
 
-    public void filterSuperColumn(SuperColumn superColumn, int gcBefore)
+    public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
     {
         for (IColumn column : superColumn.getSubColumns())
         {
@@ -59,15 +56,16 @@
                 superColumn.remove(column.name());
             }
         }
+        return superColumn;
     }
 
-    public void collectReducedColumns(ColumnFamily returnCF, Iterator<IColumn> reducedColumns, int gcBefore)
+    public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
     {
         while (reducedColumns.hasNext())
         {
             IColumn column = reducedColumns.next();
             if (!column.isMarkedForDelete() || column.getLocalDeletionTime() > gcBefore)
-                returnCF.addColumn(column);
+                container.addColumn(column);
         }
     }
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/QueryFilter.java Mon Aug 10 15:30:12 2009
@@ -38,13 +38,13 @@
      * by the filter code, which should have some limit on the number of columns
      * to avoid running out of memory on large rows.
      */
-    public abstract void collectReducedColumns(ColumnFamily returnCF, Iterator<IColumn> reducedColumns, int gcBefore);
+    public abstract void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore);
 
     /**
      * subcolumns of a supercolumn are unindexed, so to pick out parts of those we operate in-memory.
-     * @param superColumn
+     * @param superColumn may be modified by filtering op.
      */
-    public abstract void filterSuperColumn(SuperColumn superColumn, int gcBefore);
+    public abstract SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore);
 
     public Comparator<IColumn> getColumnComparator(final AbstractType comparator)
     {

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/filter/SliceQueryFilter.java Mon Aug 10 15:30:12 2009
@@ -2,13 +2,14 @@
 
 import java.io.IOException;
 import java.util.Comparator;
-import java.util.Arrays;
 import java.util.Iterator;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.commons.collections.comparators.ReverseComparator;
+import org.apache.commons.collections.iterators.ReverseListIterator;
 
 import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.utils.ReducingIterator;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.marshal.AbstractType;
 
@@ -37,28 +38,21 @@
         return new SSTableSliceIterator(sstable.getFilename(), key, comparator, start, isAscending);
     }
 
-    public void filterSuperColumn(SuperColumn superColumn, int gcBefore)
+    public SuperColumn filterSuperColumn(SuperColumn superColumn, int gcBefore)
     {
-        int liveColumns = 0;
-
-        for (IColumn column : superColumn.getSubColumns())
+        SuperColumn scFiltered = superColumn.cloneMeShallow();
+        Iterator<IColumn> subcolumns;
+        if (isAscending)
         {
-            final boolean outOfRange = isAscending
-                                     ? (start.length > 0 && superColumn.getComparator().compare(column.name(), start) < 0)
-                                        || (finish.length > 0 && superColumn.getComparator().compare(column.name(), finish) > 0)
-                                     : (start.length > 0 && superColumn.getComparator().compare(column.name(), start) > 0)
-                                        || (finish.length > 0 && superColumn.getComparator().compare(column.name(), finish) < 0);
-            if (outOfRange
-                || (column.isMarkedForDelete() && column.getLocalDeletionTime() <= gcBefore)
-                || liveColumns > count)
-            {
-                superColumn.remove(column.name());
-            }
-            else if (!column.isMarkedForDelete())
-            {
-                liveColumns++;
-            }
+            subcolumns = superColumn.getSubColumns().iterator();
+        }
+        else
+        {
+            List<IColumn> columnsAsList = new ArrayList<IColumn>(superColumn.getSubColumns());
+            subcolumns = new ReverseListIterator(columnsAsList);
         }
+        collectReducedColumns(scFiltered, subcolumns, gcBefore);
+        return scFiltered;
     }
 
     @Override
@@ -67,10 +61,10 @@
         return isAscending ? super.getColumnComparator(comparator) : new ReverseComparator(super.getColumnComparator(comparator));
     }
 
-    public void collectReducedColumns(ColumnFamily returnCF, Iterator<IColumn> reducedColumns, int gcBefore)
+    public void collectReducedColumns(IColumnContainer container, Iterator<IColumn> reducedColumns, int gcBefore)
     {
         int liveColumns = 0;
-        AbstractType comparator = returnCF.getComparator();
+        AbstractType comparator = container.getComparator();
 
         while (reducedColumns.hasNext())
         {
@@ -86,7 +80,7 @@
                 liveColumns++;
 
             if (!column.isMarkedForDelete() || column.getLocalDeletionTime() > gcBefore)
-                returnCF.addColumn(column);
+                container.addColumn(column);
         }
     }
 }

Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=802827&r1=802826&r2=802827&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Mon Aug 10 15:30:12 2009
@@ -167,6 +167,19 @@
         _insert_super()
         _verify_super()
 
+    def test_super_subcolumn_limit(self):
+        _insert_super()
+        p = SlicePredicate(slice_range=SliceRange('', '', True, 1))
+        column_parent = ColumnParent('Super1', 'sc2')
+        slice = [result.column
+                 for result in client.get_slice('Keyspace1', 'key1', column_parent, p, ConsistencyLevel.ONE)]
+        assert slice == [Column(_i64(5), 'value5', 0)], slice
+        p = SlicePredicate(slice_range=SliceRange('', '', False, 1))
+        slice = [result.column
+                 for result in client.get_slice('Keyspace1', 'key1', column_parent, p, ConsistencyLevel.ONE)]
+        assert slice == [Column(_i64(6), 'value6', 0)], slice
+        
+
     def test_batch_insert(self):
         _insert_batch(False)
         time.sleep(0.1)