You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2011/07/31 06:40:43 UTC

svn commit: r1152545 - in /cassandra/trunk: CHANGES.txt src/java/org/apache/cassandra/thrift/CassandraServer.java test/system/test_thrift_server.py

Author: jbellis
Date: Sun Jul 31 04:40:42 2011
New Revision: 1152545

URL: http://svn.apache.org/viewvc?rev=1152545&view=rev
Log:
add paging to get_count
patch by Byron Clark; reviewed by jbellis for CASSANDRA-2894

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
    cassandra/trunk/test/system/test_thrift_server.py

Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1152545&r1=1152544&r2=1152545&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Sun Jul 31 04:40:42 2011
@@ -24,6 +24,7 @@
    (CASSANDRA-2953)
  * fix potential use of free'd native memory in SerializingCache 
    (CASSANDRA-1951)
+ * add paging to get_count (CASSANDRA-2894)
 
 
 0.8.3

Modified: cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java?rev=1152545&r1=1152544&r2=1152545&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/thrift/CassandraServer.java Sun Jul 31 04:40:42 2011
@@ -61,6 +61,8 @@ import org.apache.thrift.TException;
 public class CassandraServer implements Cassandra.Iface
 {
     private static Logger logger = LoggerFactory.getLogger(CassandraServer.class);
+    
+    private final static int COUNT_PAGE_SIZE = 1024;
 
     private final static List<ColumnOrSuperColumn> EMPTY_COLUMNS = Collections.emptyList();
     private final static List<Column> EMPTY_SUBCOLUMNS = Collections.emptyList();
@@ -404,8 +406,64 @@ public class CassandraServer implements 
         logger.debug("get_count");
 
         state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+        Table table = Table.open(state().getKeyspace());
+        ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
+
+        if (predicate.column_names != null)
+            return get_slice(key, column_parent, predicate, consistency_level).size();
+
+        int pageSize;
+        // request by page if this is a large row
+        if (cfs.getMeanColumns() > 0)
+        {
+            int averageColumnSize = (int) (cfs.getMeanRowSize() / cfs.getMeanColumns());
+            pageSize = Math.min(COUNT_PAGE_SIZE,
+                                DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
+            pageSize = Math.max(2, pageSize);
+            logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+        }
+        else
+        {
+            pageSize = COUNT_PAGE_SIZE;
+        }
+
+        int totalCount = 0;
+        List<ColumnOrSuperColumn> columns;
+
+        if (predicate.slice_range == null)
+        {
+            predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                   ByteBufferUtil.EMPTY_BYTE_BUFFER,
+                                                   false,
+                                                   Integer.MAX_VALUE);
+        }
+        
+        int requestedCount = predicate.slice_range.count;
+        while (true)
+        {
+            predicate.slice_range.count = Math.min(pageSize, requestedCount);
+            columns = get_slice(key, column_parent, predicate, consistency_level);
+            if (columns.isEmpty())
+                break;
+
+            totalCount += columns.size();
+            requestedCount -= columns.size();
+            ColumnOrSuperColumn lastColumn = columns.get(columns.size() - 1);
+            ByteBuffer lastName = lastColumn.isSetSuper_column() ? lastColumn.super_column.name : lastColumn.column.name;
+            if ((requestedCount == 0) || ((columns.size() == 1) && (lastName.equals(predicate.slice_range.start))))
+            {
+                break;
+            }
+            else
+            {
+                predicate.slice_range.start = lastName;
+                // remove the count for the column that starts the next slice
+                totalCount--;
+                requestedCount++;
+            }
+        }
 
-        return get_slice(key, column_parent, predicate, consistency_level).size();
+        return totalCount;
     }
 
     public Map<ByteBuffer, Integer> multiget_count(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)

Modified: cassandra/trunk/test/system/test_thrift_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_thrift_server.py?rev=1152545&r1=1152544&r2=1152545&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_thrift_server.py (original)
+++ cassandra/trunk/test/system/test_thrift_server.py Sun Jul 31 04:40:42 2011
@@ -254,6 +254,25 @@ class TestMutations(ThriftTester):
         p = SlicePredicate(slice_range=SliceRange('c2', 'c4', False, 1000)) 
         assert client.get_count('key1', ColumnParent('Standard1'), p, ConsistencyLevel.ONE) == 3
 
+    def test_count_paging(self):
+        _set_keyspace('Keyspace1')
+        _insert_simple()
+
+        # Exercise paging
+        column_parent = ColumnParent('Standard1')
+        super_column_parent = ColumnParent('Super1', 'sc3')
+        # Paging for small columns starts at 1024 columns
+        columns_to_insert = [Column('c%d' % (i,), 'value%d' % (i,), 0) for i in xrange(3, 1026)]
+        cfmap = {'Standard1': [Mutation(ColumnOrSuperColumn(c)) for c in columns_to_insert]}
+        client.batch_mutate({'key1' : cfmap }, ConsistencyLevel.ONE)
+
+        p = SlicePredicate(slice_range=SliceRange('', '', False, 2000))
+        assert client.get_count('key1', column_parent, p, ConsistencyLevel.ONE) == 1025
+
+        # Ensure that the count limit isn't clobbered
+        p = SlicePredicate(slice_range=SliceRange('', '', False, 10))
+        assert client.get_count('key1', ColumnParent('Standard1'), p, ConsistencyLevel.ONE) == 10
+
     def test_insert_blocking(self):
         _set_keyspace('Keyspace1')
         _insert_simple()