You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ee...@apache.org on 2010/06/17 23:57:50 UTC
svn commit: r955759 - in /cassandra/trunk: interface/
src/java/org/apache/cassandra/avro/ test/system/
Author: eevans
Date: Thu Jun 17 21:57:50 2010
New Revision: 955759
URL: http://svn.apache.org/viewvc?rev=955759&view=rev
Log:
avro multget_slice() + functional tests
Patch by eevans
Modified:
cassandra/trunk/interface/cassandra.genavro
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
cassandra/trunk/test/system/test_avro_server.py
Modified: cassandra/trunk/interface/cassandra.genavro
URL: http://svn.apache.org/viewvc/cassandra/trunk/interface/cassandra.genavro?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/interface/cassandra.genavro (original)
+++ cassandra/trunk/interface/cassandra.genavro Thu Jun 17 21:57:50 2010
@@ -108,6 +108,11 @@ protocol Cassandra {
map<array<Mutation>> mutations;
}
+ record CoscsMapEntry {
+ bytes key;
+ array<ColumnOrSuperColumn> columns;
+ }
+
enum ConsistencyLevel {
ZERO, ONE, QUORUM, DCQUORUM, DCQUORUMSYNC, ALL
}
@@ -147,6 +152,16 @@ protocol Cassandra {
throws InvalidRequestException, UnavailableException, TimedOutException;
/**
+ * Performs a get_slice for column_parent and predicate against the given
+ * set of keys in parallel.
+ */
+ array<CoscsMapEntry> multiget_slice(array<bytes> keys,
+ ColumnParent column_parent,
+ SlicePredicate predicate,
+ ConsistencyLevel consistency_level)
+ throws InvalidRequestException, UnavailableException, TimedOutException;
+
+ /**
* Returns the number of columns matching a predicate for a particular
* key, ColumnFamily, and optionally SuperColumn.
*/
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroRecordFactory.java Thu Jun 17 21:57:50 2010
@@ -99,6 +99,14 @@ public class AvroRecordFactory
cp.super_column = ByteBuffer.wrap(superColumn);
return cp;
}
+
+ public static CoscsMapEntry newCoscsMapEntry(ByteBuffer key, GenericArray<ColumnOrSuperColumn> columns)
+ {
+ CoscsMapEntry entry = new CoscsMapEntry();
+ entry.key = key;
+ entry.columns = columns;
+ return entry;
+ }
}
class ErrorFactory
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/AvroValidation.java Thu Jun 17 21:57:50 2010
@@ -42,15 +42,8 @@ import static org.apache.cassandra.avro.
/**
* The Avro analogue to org.apache.cassandra.service.ThriftValidation
*/
-public class AvroValidation {
- // FIXME: could use method in ThriftValidation
- // FIXME: remove me
- static void validateKey(String key) throws InvalidRequestException
- {
- if (key.isEmpty())
- throw newInvalidRequestException("Key may not be empty");
- }
-
+public class AvroValidation
+{
static void validateKey(byte[] key) throws InvalidRequestException
{
if (key == null || key.length == 0)
@@ -62,6 +55,11 @@ public class AvroValidation {
" is longer than maximum of " + FBUtilities.MAX_UNSIGNED_SHORT);
}
+ static void validateKey(ByteBuffer key) throws InvalidRequestException
+ {
+ validateKey(key.array());
+ }
+
// FIXME: could use method in ThriftValidation
static void validateKeyspace(String keyspace) throws KeyspaceNotDefinedException
{
Modified: cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/avro/CassandraServer.java Thu Jun 17 21:57:50 2010
@@ -261,10 +261,13 @@ public class CassandraServer implements
if (logger.isDebugEnabled())
logger.debug("get_slice");
- return multigetSliceInternal(curKeyspace.get(), Arrays.asList(key.array()), columnParent, predicate, consistencyLevel).get(key);
+ GenericArray<ByteBuffer> keys = new GenericData.Array<ByteBuffer>(1, Schema.createArray(Schema.parse("{\"type\": \"bytes\"}")));
+ keys.add(key);
+
+ return multigetSliceInternal(curKeyspace.get(), keys, columnParent, predicate, consistencyLevel).get(key);
}
- private Map<ByteBuffer, GenericArray<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<byte[]> keys,
+ private Map<ByteBuffer, GenericArray<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, GenericArray<ByteBuffer> keys,
ColumnParent columnParent, SlicePredicate predicate, ConsistencyLevel consistencyLevel)
throws InvalidRequestException, UnavailableException, TimedOutException
{
@@ -277,7 +280,7 @@ public class CassandraServer implements
List<ReadCommand> commands = new ArrayList<ReadCommand>();
if (predicate.column_names != null)
{
- for (byte[] key : keys)
+ for (ByteBuffer key : keys)
{
AvroValidation.validateKey(key);
@@ -286,16 +289,16 @@ public class CassandraServer implements
for (ByteBuffer name : predicate.column_names)
column_names.add(name.array());
- commands.add(new SliceByNamesReadCommand(keyspace, key, queryPath, column_names));
+ commands.add(new SliceByNamesReadCommand(keyspace, key.array(), queryPath, column_names));
}
}
else
{
SliceRange range = predicate.slice_range;
- for (byte[] key : keys)
+ for (ByteBuffer key : keys)
{
AvroValidation.validateKey(key);
- commands.add(new SliceFromReadCommand(keyspace, key, queryPath, range.start.array(), range.finish.array(), range.reversed, range.count));
+ commands.add(new SliceFromReadCommand(keyspace, key.array(), queryPath, range.start.array(), range.finish.array(), range.reversed, range.count));
}
}
@@ -330,6 +333,29 @@ public class CassandraServer implements
}
@Override
+ public GenericArray<CoscsMapEntry> multiget_slice(GenericArray<ByteBuffer> keys, ColumnParent columnParent,
+ SlicePredicate predicate, ConsistencyLevel consistencyLevel)
+ throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("multiget_slice");
+
+ // FIXME: This is bad; Shaving yaks to get to the right return type.
+ Map<ByteBuffer, GenericArray<ColumnOrSuperColumn>> results = multigetSliceInternal(curKeyspace.get(),
+ keys,
+ columnParent,
+ predicate,
+ consistencyLevel);
+ Schema sch = Schema.createArray(CoscsMapEntry.SCHEMA$);
+ GenericArray<CoscsMapEntry> avroResults = new GenericData.Array<CoscsMapEntry>(results.size(), sch);
+
+ for (Map.Entry<ByteBuffer, GenericArray<ColumnOrSuperColumn>> entry : results.entrySet())
+ avroResults.add(newCoscsMapEntry(entry.getKey(), entry.getValue()));
+
+ return avroResults;
+ }
+
+ @Override
public Void insert(ByteBuffer key, ColumnParent parent, Column column, ConsistencyLevel consistencyLevel)
throws AvroRemoteException, InvalidRequestException, UnavailableException, TimedOutException
{
Modified: cassandra/trunk/test/system/test_avro_server.py
URL: http://svn.apache.org/viewvc/cassandra/trunk/test/system/test_avro_server.py?rev=955759&r1=955758&r2=955759&view=diff
==============================================================================
--- cassandra/trunk/test/system/test_avro_server.py (original)
+++ cassandra/trunk/test/system/test_avro_server.py Thu Jun 17 21:57:50 2010
@@ -237,6 +237,52 @@ class TestRpcOperations(AvroTester):
assert_columns_match(coscs[0]['column'], columns[2])
assert_columns_match(coscs[3]['column'], columns[5])
+ def test_multiget_slice_simple(self):
+ "performing a slice of simple columns, multiple keys"
+ self.__set_keyspace('Keyspace1')
+
+ columns = list(); mutation_params = dict()
+
+ for i in range(12):
+ columns.append(new_column(i))
+
+ # key1, first 6 columns
+ mutations_one = list()
+ for column in columns[:6]:
+ mutation = {'column_or_supercolumn': {'column': column}}
+ mutations_one.append(mutation)
+
+ map_entry = {'key': 'key1', 'mutations': {'Standard1': mutations_one}}
+ mutation_params['mutation_map'] = [map_entry]
+
+ # key2, last 6 columns
+ mutations_two = list()
+ for column in columns[6:]:
+ mutation = {'column_or_supercolumn': {'column': column}}
+ mutations_two.append(mutation)
+
+ map_entry = {'key': 'key2', 'mutations': {'Standard1': mutations_two}}
+ mutation_params['mutation_map'].append(map_entry)
+
+ mutation_params['consistency_level'] = 'ONE'
+
+ self.client.request('batch_mutate', mutation_params)
+
+ # Slice all 6 columns on both keys
+ slice_params= dict()
+ slice_params['keys'] = ['key1', 'key2']
+ slice_params['column_parent'] = {'column_family': 'Standard1'}
+ sr = {'start': '', 'finish': '', 'reversed': False, 'count': 1000}
+ slice_params['predicate'] = {'slice_range': sr}
+ slice_params['consistency_level'] = 'ONE'
+
+ coscs_map = self.client.request('multiget_slice', slice_params)
+ for entry in coscs_map:
+ assert(entry['key'] in ['key1', 'key2']), \
+ "expected one of [key1, key2]; got %s" % entry['key']
+ assert(len(entry['columns']) == 6), \
+ "expected 6 results, got %d" % len(entry['columns'])
+
def test_get_count(self):
"counting columns"
self.__set_keyspace('Keyspace1')