You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/06/17 23:57:50 UTC

svn commit: r955759 - in /cassandra/trunk: interface/ src/java/org/apache/cassandra/avro/ test/system/

Author: eevans
Date: Thu Jun 17 21:57:50 2010
New Revision: 955759

URL: http://svn.apache.org/viewvc?rev=955759&view=rev
Log:
avro multget_slice() + functional tests

Patch by eevans

Modified:
    cassandra/trunk/interface/cassandra.genavro
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
    cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
    cassandra/trunk/test/system/test_avro_server.py

Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Thu Jun 17 21:57:50 2010
@@ -108,6 +108,11 @@ protocol Cassandra {
         map<array<Mutation>> mutations;
     }
 
+    record CoscsMapEntry {
+         bytes key;
+         array<ColumnOrSuperColumn> columns;
+    }
+
     enum ConsistencyLevel {
         ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL
     }
@@ -147,6 +152,16 @@ protocol Cassandra {
     throws InvalidRequestException, UnavailableException, TimedOutException;
 
     /**
+     * Performs a get_slice for column_parent and predicate against the given
+     * set of keys in parallel.
+     */
+    array<CoscsMapEntry> multiget_slice(array<bytes> keys,
+                                        ColumnParent column_parent,
+                                        SlicePredicate predicate,
+                                        ConsistencyLevel consistency_level)
+    throws InvalidRequestException, UnavailableException, TimedOutException;
+
+    /**
      * Returns the number of columns matching a predicate for a particular
      * key, ColumnFamily, and optionally SuperColumn.
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java Thu Jun 17 21:57:50 2010
@@ -99,6 +99,14 @@ public class AvroRecordFactory
             cp.super_column = ByteBuffer.wrap(superColumn);
         return cp;
     }
+    
+    public static CoscsMapEntry newCoscsMapEntry(ByteBuffer key, GenericArray<ColumnOrSuperColumn> columns)
+    {
+        CoscsMapEntry entry = new CoscsMapEntry();
+        entry.key = key;
+        entry.columns = columns;
+        return entry;
+    }
 }
 
 class ErrorFactory

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Thu Jun 17 21:57:50 2010
@@ -42,15 +42,8 @@ import static org.apache.cassandra.avro.
 /**
  * The Avro analogue to org.apache.cassandra.service.ThriftValidation
  */
-public class AvroValidation {
-    // FIXME: could use method in ThriftValidation
-    // FIXME: remove me
-    static void validateKey(String key) throws InvalidRequestException
-    {
-        if (key.isEmpty())
-            throw newInvalidRequestException("Key may not be empty");
-    }
-    
+public class AvroValidation
+{    
     static void validateKey(byte[] key) throws InvalidRequestException
     {
         if (key == null || key.length == 0)
@@ -62,6 +55,11 @@ public class AvroValidation {
                     " is longer than maximum of " + FBUtilities.MAX_UNSIGNED_SHORT);
     }
     
+    static void validateKey(ByteBuffer key) throws InvalidRequestException
+    {
+        validateKey(key.array());
+    }
+    
     // FIXME: could use method in ThriftValidation
     static void validateKeyspace(String keyspace) throws KeyspaceNotDefinedException
     {

Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu Jun 17 21:57:50 2010
@@ -261,10 +261,13 @@ public class CassandraServer implements 
         if (logger.isDebugEnabled())
             logger.debug("get_slice");
         
-        return multigetSliceInternal(curKeyspace.get(), Arrays.asList(key.array()), columnParent, predicate, consistencyLevel).get(key);
+        GenericArray<ByteBuffer> keys = new GenericData.Array<ByteBuffer>(1, Schema.createArray(Schema.parse("{\"type\": \"bytes\"}")));
+        keys.add(key);
+        
+        return multigetSliceInternal(curKeyspace.get(), keys, columnParent, predicate, consistencyLevel).get(key);
     }
     
-    private Map<ByteBuffer, GenericArray<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<byte[]> keys,
+    private Map<ByteBuffer, GenericArray<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, GenericArray<ByteBuffer> keys,
             ColumnParent columnParent, SlicePredicate predicate, ConsistencyLevel consistencyLevel)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
@@ -277,7 +280,7 @@ public class CassandraServer implements 
         List<ReadCommand> commands = new ArrayList<ReadCommand>();
         if (predicate.column_names != null)
         {
-            for (byte[] key : keys)
+            for (ByteBuffer key : keys)
             {
                 AvroValidation.validateKey(key);
                 
@@ -286,16 +289,16 @@ public class CassandraServer implements 
                 for (ByteBuffer name : predicate.column_names)
                     column_names.add(name.array());
                 
-                commands.add(new SliceByNamesReadCommand(keyspace, key, queryPath, column_names));
+                commands.add(new SliceByNamesReadCommand(keyspace, key.array(), queryPath, column_names));
             }
         }
         else
         {
             SliceRange range = predicate.slice_range;
-            for (byte[] key : keys)
+            for (ByteBuffer key : keys)
             {
                 AvroValidation.validateKey(key);
-                commands.add(new SliceFromReadCommand(keyspace, key, queryPath, range.start.array(), range.finish.array(), range.reversed, range.count));
+                commands.add(new SliceFromReadCommand(keyspace, key.array(), queryPath, range.start.array(), range.finish.array(), range.reversed, range.count));
             }
         }
         
@@ -330,6 +333,29 @@ public class CassandraServer implements 
     }
 
     @Override
+    public GenericArray<CoscsMapEntry> multiget_slice(GenericArray<ByteBuffer> keys, ColumnParent columnParent,
+            SlicePredicate predicate, ConsistencyLevel consistencyLevel)
+    throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("multiget_slice");
+
+        // FIXME: This is bad; Shaving yaks to get to the right return type.
+        Map<ByteBuffer, GenericArray<ColumnOrSuperColumn>> results = multigetSliceInternal(curKeyspace.get(),
+                                                                                           keys,
+                                                                                           columnParent,
+                                                                                           predicate,
+                                                                                           consistencyLevel);
+        Schema sch = Schema.createArray(CoscsMapEntry.SCHEMA$);
+        GenericArray<CoscsMapEntry> avroResults = new GenericData.Array<CoscsMapEntry>(results.size(), sch);
+
+        for (Map.Entry<ByteBuffer, GenericArray<ColumnOrSuperColumn>> entry : results.entrySet())
+            avroResults.add(newCoscsMapEntry(entry.getKey(), entry.getValue()));
+        
+        return avroResults;
+    }
+
+    @Override
     public Void insert(ByteBuffer key, ColumnParent parent, Column column, ConsistencyLevel consistencyLevel)
     throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
     {

Modified: cassandra/trunk/test/system/test_avro_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Thu Jun 17 21:57:50 2010
@@ -237,6 +237,52 @@ class TestRpcOperations(AvroTester):
         assert_columns_match(coscs[0]['column'], columns[2])
         assert_columns_match(coscs[3]['column'], columns[5])
 
+    def test_multiget_slice_simple(self):
+        "performing a slice of simple columns, multiple keys"
+        self.__set_keyspace('Keyspace1')
+
+        columns = list(); mutation_params = dict()
+
+        for i in range(12):
+            columns.append(new_column(i))
+
+        # key1, first 6 columns
+        mutations_one = list()
+        for column in columns[:6]:
+            mutation = {'column_or_supercolumn': {'column': column}}
+            mutations_one.append(mutation)
+
+        map_entry = {'key': 'key1', 'mutations': {'Standard1': mutations_one}}
+        mutation_params['mutation_map'] = [map_entry]
+
+        # key2, last 6 columns
+        mutations_two = list()
+        for column in columns[6:]:
+            mutation = {'column_or_supercolumn': {'column': column}}
+            mutations_two.append(mutation)
+
+        map_entry = {'key': 'key2', 'mutations': {'Standard1': mutations_two}}
+        mutation_params['mutation_map'].append(map_entry)
+
+        mutation_params['consistency_level'] = 'ONE'
+
+        self.client.request('batch_mutate', mutation_params)
+
+        # Slice all 6 columns on both keys
+        slice_params= dict()
+        slice_params['keys'] = ['key1', 'key2']
+        slice_params['column_parent'] = {'column_family': 'Standard1'}
+        sr = {'start': '', 'finish': '', 'reversed': False, 'count': 1000}
+        slice_params['predicate'] = {'slice_range': sr}
+        slice_params['consistency_level'] = 'ONE'
+
+        coscs_map = self.client.request('multiget_slice', slice_params)
+        for entry in coscs_map:
+            assert(entry['key'] in ['key1', 'key2']), \
+                    "expected one of [key1, key2]; got %s" % entry['key']
+            assert(len(entry['columns']) == 6), \
+                    "expected 6 results, got %d" % len(entry['columns'])
+
     def test_get_count(self):
         "counting columns"
         self.__set_keyspace('Keyspace1')