You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/27 22:07:13 UTC

svn commit: r808589 [3/3] - in /incubator/cassandra/trunk: interface/ interface/gen-java/org/apache/cassandra/service/ src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/ test/system/

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Thu Aug 27 20:07:12 2009
@@ -36,10 +36,12 @@
     private AtomicBoolean done_ = new AtomicBoolean(false);
     private Lock lock_ = new ReentrantLock();
     private Condition condition_;
+    private long startTime_;
 
     public AsyncResult()
     {        
         condition_ = lock_.newCondition();
+        startTime_ = System.currentTimeMillis();
     }    
     
     public byte[] get()
@@ -77,8 +79,12 @@
             try
             {
                 if ( !done_.get() )
-                {                    
-                    bVal = condition_.await(timeout, tu);
+                {
+                    long overall_timeout = System.currentTimeMillis() - startTime_ + timeout;
+                    if(overall_timeout > 0)
+                        bVal = condition_.await(overall_timeout, TimeUnit.MILLISECONDS);
+                    else
+                        bVal = false;
                 }
             }
             catch ( InterruptedException ex )
@@ -97,17 +103,7 @@
         }
         return result_;
     }
-    
-    public List<byte[]> multiget()
-    {
-        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
-    }
-    
-    public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
-    {
-        throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
-    }
-    
+      
     public void result(Message response)
     {        
         try

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Thu Aug 27 20:07:12 2009
@@ -45,22 +45,7 @@
      * @return the result wrapped in an Object[]
     */
     public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
-    
-    /**
-     * Returns the result for all tasks that was submitted.
-     * @return the list of results wrapped in an Object[]
-    */
-    public List<byte[]> multiget();
-    
-    /**
-     * Same operation as the above get() but allows the calling
-     * thread to specify a timeout.
-     * @param timeout the maximum time to wait
-     * @param tu the time unit of the timeout argument
-     * @return the result wrapped in an Object[]
-    */
-    public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
-    
+        
     /**
      * Store the result obtained for the submitted task.
      * @param result the response message

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java Thu Aug 27 20:07:12 2009
@@ -119,19 +119,6 @@
      * array is sent to the ith element in the <code>to</code> array.This method assumes
      * there is a one-one mapping between the <code>messages</code> array and
      * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
-     * This method also informs the MessagingService to wait for at least
-     * <code>howManyResults</code> responses to determine success of failure.
-     * @param messages messages to be sent.
-     * @param to endpoints to which the message needs to be sent
-     * @return an reference to IAsyncResult
-     */
-    public IAsyncResult sendRR(Message[] messages, EndPoint[] to);
-    
-    /**
-     * Send a message to a given endpoint. The ith element in the <code>messages</code>
-     * array is sent to the ith element in the <code>to</code> array.This method assumes
-     * there is a one-one mapping between the <code>messages</code> array and
-     * the <code>to</code> array. Otherwise an  IllegalArgumentException will be thrown.
      * The idea is that multi-groups of messages are grouped as one logical message
      * whose results are harnessed via the <i>IAsyncResult</i>
      * @param messages groups of grouped messages.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Aug 27 20:07:12 2009
@@ -379,25 +379,6 @@
         return groupId;
     } 
     
-    public IAsyncResult sendRR(Message[] messages, EndPoint[] to)
-    {
-        if ( messages.length != to.length )
-        {
-            throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
-        }
-        
-        IAsyncResult iar = new MultiAsyncResult(messages.length);
-        String groupId = GuidGenerator.guid();
-        taskCompletionMap_.put(groupId, iar);
-        for ( int i = 0; i < messages.length; ++i )
-        {
-            messages[i].setMessageId(groupId);
-            sendOneWay(messages[i], to[i]);
-        }
-        
-        return iar;
-    }
-    
     public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb)
     {
         if ( messages.length != to.length )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Thu Aug 27 20:07:12 2009
@@ -36,7 +36,6 @@
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.thrift.TException;
 
@@ -73,10 +72,17 @@
 		storageService.start();
 	}
 
-    protected ColumnFamily readColumnFamily(ReadCommand command, int consistency_level) throws InvalidRequestException
+    protected Map<String, ColumnFamily> readColumnFamily(List<ReadCommand> commands, int consistency_level) throws InvalidRequestException
     {
-        String cfName = command.getColumnFamilyName();
-        ThriftValidation.validateKey(command.key);
+        // TODO - Support multiple column families per row, right now row only contains 1 column family
+        String cfName = commands.get(0).getColumnFamilyName();
+
+        Map<String, ColumnFamily> columnFamilyKeyMap = new HashMap<String,ColumnFamily>();
+
+        for (ReadCommand command: commands)
+        {
+            ThriftValidation.validateKey(command.key);
+        }
 
         if (consistency_level == ConsistencyLevel.ZERO)
         {
@@ -87,10 +93,10 @@
             throw new InvalidRequestException("Consistency level all is not yet supported on read operations");
         }
 
-        Row row;
+        List<Row> rows;
         try
         {
-            row = StorageProxy.readProtocol(command, consistency_level);
+            rows = StorageProxy.readProtocol(commands, consistency_level);
         }
         catch (IOException e)
         {
@@ -101,11 +107,11 @@
             throw new RuntimeException(e);
         }
 
-        if (row == null)
+        for (Row row: rows)
         {
-            return null;
+            columnFamilyKeyMap.put(row.key(), row.getColumnFamily(cfName));
         }
-        return row.getColumnFamily(cfName);
+        return columnFamilyKeyMap;
 	}
 
     public List<Column> thriftifySubColumns(Collection<IColumn> columns)
@@ -169,50 +175,78 @@
         return thriftSuperColumns;
     }
 
-    private List<ColumnOrSuperColumn> getSlice(ReadCommand command, int consistency_level) throws InvalidRequestException
+    private Map<String, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, int consistency_level) throws InvalidRequestException
     {
-        ColumnFamily cfamily = readColumnFamily(command, consistency_level);
-        boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).reversed;
-
-        if (cfamily == null || cfamily.getColumnsMap().size() == 0)
-        {
-            return EMPTY_COLUMNS;
-        }
-        if (command.queryPath.superColumnName != null)
+        Map<String, ColumnFamily> cfamilies = readColumnFamily(commands, consistency_level);
+        Map<String, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<String, List<ColumnOrSuperColumn>>();
+        for (ReadCommand command: commands)
         {
-            IColumn column = cfamily.getColumnsMap().values().iterator().next();
-            Collection<IColumn> subcolumns = column.getSubColumns();
-            if (subcolumns == null || subcolumns.isEmpty())
+            ColumnFamily cfamily = cfamilies.get(command.key);
+            boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).reversed;
+
+            if (cfamily == null || cfamily.getColumnsMap().size() == 0)
             {
-                return EMPTY_COLUMNS;
+                columnFamiliesMap.put(command.key, EMPTY_COLUMNS);
+                continue;
             }
-            return thriftifyColumns(subcolumns, reverseOrder);
-        }
-        if (cfamily.isSuper())
-        {
-            return thriftifySuperColumns(cfamily.getSortedColumns(), reverseOrder);
+            if (command.queryPath.superColumnName != null)
+            {
+                IColumn column = cfamily.getColumnsMap().values().iterator().next();
+                Collection<IColumn> subcolumns = column.getSubColumns();
+                if (subcolumns == null || subcolumns.isEmpty())
+                {
+                    columnFamiliesMap.put(command.key, EMPTY_COLUMNS);
+                    continue;
+                }
+                columnFamiliesMap.put(command.key, thriftifyColumns(subcolumns, reverseOrder));
+                continue;
+            }
+            if (cfamily.isSuper())
+                columnFamiliesMap.put(command.key, thriftifySuperColumns(cfamily.getSortedColumns(), reverseOrder));
+            else
+                columnFamiliesMap.put(command.key, thriftifyColumns(cfamily.getSortedColumns(), reverseOrder));
         }
-        return thriftifyColumns(cfamily.getSortedColumns(), reverseOrder);
+
+        return columnFamiliesMap;
     }
 
     public List<ColumnOrSuperColumn> get_slice(String keyspace, String key, ColumnParent column_parent, SlicePredicate predicate, int consistency_level)
     throws InvalidRequestException, NotFoundException
     {
         if (logger.isDebugEnabled())
-            logger.debug("get_slice_from");
+            logger.debug("get_slice");
+        return multigetSliceInternal(keyspace, Arrays.asList(key), column_parent, predicate, consistency_level).get(key);
+    }
+    
+    public Map<String, List<ColumnOrSuperColumn>> multiget_slice(String keyspace, List<String> keys, ColumnParent column_parent, SlicePredicate predicate, int consistency_level)
+    throws InvalidRequestException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("multiget_slice");
+        return multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+    }
+
+    private Map<String, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<String> keys, ColumnParent column_parent, SlicePredicate predicate, int consistency_level)
+    throws InvalidRequestException
+    {
         ThriftValidation.validateColumnParent(keyspace, column_parent);
+        List<ReadCommand> commands = new ArrayList<ReadCommand>();
+        SliceRange range = predicate.slice_range;
 
         if (predicate.column_names != null)
         {
+            for (String key: keys)
+                commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names));
             ThriftValidation.validateColumns(keyspace, column_parent, predicate.column_names);
-            return getSlice(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names), consistency_level);
         }
         else
         {
-            SliceRange range = predicate.slice_range;
+            for (String key: keys)
+                commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count));
             ThriftValidation.validateRange(keyspace, column_parent, range);
-            return getSlice(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count), consistency_level);
         }
+
+        return getSlice(commands, consistency_level);
     }
 
     public ColumnOrSuperColumn get(String table, String key, ColumnPath column_path, int consistency_level)
@@ -220,50 +254,127 @@
     {
         if (logger.isDebugEnabled())
             logger.debug("get");
-        ThriftValidation.validateColumnPath(table, column_path);
-
-        QueryPath path = new QueryPath(column_path.column_family, column_path.super_column);
-        List<byte[]> nameAsList = Arrays.asList(column_path.column == null ? column_path.super_column : column_path.column);
-        ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(table, key, path, nameAsList), consistency_level);
-        if (cfamily == null)
+        ColumnOrSuperColumn column = multiget(table, Arrays.asList(key), column_path, consistency_level).get(key);
+        if (!column.isSetColumn() && !column.isSetSuper_column())
         {
             throw new NotFoundException();
         }
-        Collection<IColumn> columns = null;
-        if (column_path.super_column != null && column_path.column != null)
+        return column;
+    }
+
+    /** no values will be mapped to keys with no data */
+    private Map<String, Collection<IColumn>> multigetColumns(List<ReadCommand> commands, int consistency_level)
+    throws InvalidRequestException
+    {
+        Map<String, ColumnFamily> cfamilies = readColumnFamily(commands, consistency_level);
+        Map<String, Collection<IColumn>> columnFamiliesMap = new HashMap<String, Collection<IColumn>>();
+
+        for (ReadCommand command: commands)
         {
-            IColumn column = cfamily.getColumn(column_path.super_column);
-            if (column != null)
+            ColumnFamily cfamily = cfamilies.get(command.key);
+            if (cfamily == null)
+                continue;
+
+            Collection<IColumn> columns = null;
+            if (command.queryPath.superColumnName != null)
             {
-                columns = column.getSubColumns();
+                IColumn column = cfamily.getColumn(command.queryPath.superColumnName);
+                if (column != null)
+                {
+                    columns = column.getSubColumns();
+                }
+            }
+            else
+            {
+                columns = cfamily.getSortedColumns();
+            }
+
+            if (columns != null && columns.size() != 0)
+            {
+                columnFamiliesMap.put(command.key, columns);
             }
         }
-        else
-        {
-            columns = cfamily.getSortedColumns();
-        }
-        if (columns == null || columns.size() == 0)
+        return columnFamiliesMap;
+    }
+
+    /** always returns a ColumnOrSuperColumn for each key, even if there is no data for it */
+    public Map<String, ColumnOrSuperColumn> multiget(String table, List<String> keys, ColumnPath column_path, int consistency_level)
+    throws InvalidRequestException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("multiget");
+        return multigetInternal(table, keys, column_path, consistency_level);
+    }
+
+    private Map<String, ColumnOrSuperColumn> multigetInternal(String table, List<String> keys, ColumnPath column_path, int consistency_level)
+    throws InvalidRequestException
+    {
+        ThriftValidation.validateColumnPath(table, column_path);
+
+        QueryPath path = new QueryPath(column_path.column_family, column_path.super_column);
+        List<byte[]> nameAsList = Arrays.asList(column_path.column == null ? column_path.super_column : column_path.column);
+        List<ReadCommand> commands = new ArrayList<ReadCommand>();
+        for (String key: keys)
         {
-            throw new NotFoundException();
+            commands.add(new SliceByNamesReadCommand(table, key, path, nameAsList));
         }
 
-        assert columns.size() == 1;
-        IColumn column = columns.iterator().next();
-        if (column.isMarkedForDelete())
+        Map<String, ColumnOrSuperColumn> columnFamiliesMap = new HashMap<String, ColumnOrSuperColumn>();
+        Map<String, Collection<IColumn>> columnsMap = multigetColumns(commands, consistency_level);
+
+        for (ReadCommand command: commands)
         {
-            throw new NotFoundException();
+            ColumnOrSuperColumn columnorsupercolumn;
+
+            Collection<IColumn> columns = columnsMap.get(command.key);
+            if (columns == null)
+            {
+               columnorsupercolumn = new ColumnOrSuperColumn();
+            }
+            else
+            {
+                assert columns.size() == 1;
+                IColumn column = columns.iterator().next();
+
+
+                if (column.isMarkedForDelete())
+                {
+                    columnorsupercolumn = new ColumnOrSuperColumn();
+                }
+                else
+                {
+                    columnorsupercolumn = column instanceof org.apache.cassandra.db.Column
+                                          ? new ColumnOrSuperColumn(new Column(column.name(), column.value(), column.timestamp()), null)
+                                          : new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns())));
+                }
+
+            }
+            columnFamiliesMap.put(command.key, columnorsupercolumn);
         }
 
-        return column instanceof org.apache.cassandra.db.Column
-               ? new ColumnOrSuperColumn(new Column(column.name(), column.value(), column.timestamp()), null)
-               : new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns())));
+        return columnFamiliesMap;
     }
 
     public int get_count(String table, String key, ColumnParent column_parent, int consistency_level)
     throws InvalidRequestException
     {
         if (logger.isDebugEnabled())
-            logger.debug("get_column_count");
+            logger.debug("get_count");
+        return multigetCountInternal(table, Arrays.asList(key), column_parent, consistency_level).get(key);
+    }
+
+    public Map<String, Integer> multiget_count(String table, List<String> keys, ColumnParent column_parent, int consistency_level)
+    throws InvalidRequestException
+    {
+        if (logger.isDebugEnabled())
+            logger.debug("multiget_count");
+        return multigetCountInternal(table, keys, column_parent, consistency_level);
+
+    }
+
+    private Map<String, Integer> multigetCountInternal(String table, List<String> keys, ColumnParent column_parent, int consistency_level)
+    throws InvalidRequestException
+    {
         // validateColumnParent assumes we require simple columns; g_c_c is the only
         // one of the columnParent-taking apis that can also work at the SC level.
         // so we roll a one-off validator here.
@@ -273,31 +384,29 @@
             throw new InvalidRequestException("columnfamily alone is required for standard CF " + column_parent.column_family);
         }
 
-        ColumnFamily cfamily;
-        cfamily = readColumnFamily(new SliceFromReadCommand(table, key, column_parent, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE), consistency_level);
-        if (cfamily == null)
+        List<ReadCommand> commands = new ArrayList<ReadCommand>();
+        for (String key: keys)
         {
-            return 0;
+            commands.add(new SliceFromReadCommand(table, key, column_parent, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE));
         }
-        Collection<IColumn> columns = null;
-        if (column_parent.super_column != null)
+
+        Map<String, Integer> columnFamiliesMap = new HashMap<String, Integer>();
+        Map<String, Collection<IColumn>> columnsMap = multigetColumns(commands, consistency_level);
+
+        for (ReadCommand command: commands)
         {
-            IColumn column = cfamily.getColumn(column_parent.super_column);
-            if (column != null)
+            Collection<IColumn> columns = columnsMap.get(command.key);
+            if(columns == null)
             {
-                columns = column.getSubColumns();
+               columnFamiliesMap.put(command.key, 0);
+            }
+            else
+            {
+               columnFamiliesMap.put(command.key, columns.size());
             }
         }
-        else
-        {
-            columns = cfamily.getSortedColumns();
-        }
-        if (columns == null || columns.size() == 0)
-        {
-            return 0;
-        }
-        return columns.size();
-	}
+        return columnFamiliesMap;
+    }
 
     public void insert(String table, String key, ColumnPath column_path, byte[] value, long timestamp, int consistency_level)
     throws InvalidRequestException, UnavailableException

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Thu Aug 27 20:07:12 2009
@@ -41,6 +41,7 @@
     private List<Message> responses_ = new ArrayList<Message>();
     private IResponseResolver<T> responseResolver_;
     private AtomicBoolean done_ = new AtomicBoolean(false);
+    private long startTime_;
 
     public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
     {
@@ -51,6 +52,7 @@
         condition_ = lock_.newCondition();
         responseCount_ = responseCount;
         responseResolver_ =  responseResolver;
+        startTime_ = System.currentTimeMillis();
     }
     
     public T get() throws TimeoutException, DigestMismatchException
@@ -62,8 +64,12 @@
             try
             {
             	if ( !done_.get() )
-                {            		
-            		bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+                {
+                    long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
+                    if(timeout > 0)
+                        bVal = condition_.await(timeout, TimeUnit.MILLISECONDS);
+                    else
+                        bVal = false;
                 }
             }
             catch ( InterruptedException ex )

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Aug 27 20:07:12 2009
@@ -238,171 +238,101 @@
         insertBlocking(rm, ConsistencyLevel.QUORUM);
     }
     
-    private static Map<String, Message> constructMessages(Map<String, ReadCommand> readMessages) throws IOException
-    {
-        Map<String, Message> messages = new HashMap<String, Message>();
-        Set<String> keys = readMessages.keySet();        
-        for ( String key : keys )
-        {
-            Message message = readMessages.get(key).makeReadMessage();
-            messages.put(key, message);
-        }        
-        return messages;
-    }
-    
-    private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Map<String, Message> messages)
-    {
-        Set<String> keys = endPoints.keySet();
-        EndPoint[] eps = new EndPoint[keys.size()];
-        Message[] msgs  = new Message[keys.size()];
-        
-        int i = 0;
-        for ( String key : keys )
-        {
-            eps[i] = endPoints.get(key);
-            msgs[i] = messages.get(key);
-            ++i;
-        }
-        
-        IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(msgs, eps);
-        return iar;
-    }
-    
-    /**
-     * This is an implementation for the multiget version. 
-     * @param readMessages map of key --> ReadMessage to be sent
-     * @return map of key --> Row
-     * @throws IOException
-     * @throws TimeoutException
-     */
-    public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> readMessages) throws IOException,TimeoutException
-    {
-        Map<String, Row> rows = new HashMap<String, Row>();
-        Set<String> keys = readMessages.keySet();
-        /* Find all the suitable endpoints for the keys */
-        Map<String, EndPoint> endPoints = StorageService.instance().findSuitableEndPoints(keys.toArray( new String[0] ));
-        /* Construct the messages to be sent out */
-        Map<String, Message> messages = constructMessages(readMessages);
-        /* Dispatch the messages to the respective endpoints */
-        IAsyncResult iar = dispatchMessages(endPoints, messages);        
-        List<byte[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-        
-        for ( byte[] body : results )
-        {
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-            ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
-            Row row = response.row();
-            rows.put(row.key(), row);
-        }        
-        return rows;
-    }
 
     /**
      * Read the data from one replica.  If there is no reply, read the data from another.  In the event we get
      * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
-     * @param command the read to perform
+     * @param commands a set of commands to perform reads
      * @return the row associated with command.key
      * @throws Exception
      */
-    private static Row weakReadRemote(ReadCommand command) throws IOException
+    private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException
     {
-        EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
-        assert endPoint != null;
-        Message message = command.makeReadMessage();
         if (logger.isDebugEnabled())
-            logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
-        message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
-        IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
-        byte[] body;
-        try
+            logger.debug("weakreadlocal reading " + StringUtils.join(commands, ", "));
+
+        List<Row> rows = new ArrayList<Row>();
+        List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
+        int commandIndex = 0;
+
+        for (ReadCommand command: commands)
         {
-            body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
+            assert endPoint != null;
+            Message message = command.makeReadMessage();
+
+            if (logger.isDebugEnabled())
+                logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
+            message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+            iars.add(MessagingService.getMessagingInstance().sendRR(message, endPoint));
         }
-        catch (TimeoutException e)
+
+        for (IAsyncResult iar: iars)
         {
-            throw new RuntimeException("error reading key " + command.key, e);
-            // TODO retry to a different endpoint?
+            byte[] body;
+            try
+            {
+                body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+            }
+            catch (TimeoutException e)
+            {
+                throw new RuntimeException("error reading key " + commands.get(commandIndex).key, e);
+                // TODO retry to a different endpoint?
+            }
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
+            if (response.row() != null)
+                rows.add(response.row());
+            commandIndex++;
         }
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(body, body.length);
-        ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
-        return response.row();
+        return rows;
     }
 
     /**
      * Performs the actual reading of a row out of the StorageService, fetching
      * a specific set of column names from a given column family.
      */
-    public static Row readProtocol(ReadCommand command, int consistency_level)
+    public static List<Row> readProtocol(List<ReadCommand> commands, int consistency_level)
     throws IOException, TimeoutException, InvalidRequestException
     {
         long startTime = System.currentTimeMillis();
 
-        Row row;
-        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
+        List<Row> rows = new ArrayList<Row>();
 
         if (consistency_level == ConsistencyLevel.ONE)
         {
-            boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
-            //TODO: Throw InvalidRequest if we're in bootstrap mode?
-            if (foundLocal && !StorageService.instance().isBootstrapMode())
-            {
-                row = weakReadLocal(command);
-            }
-            else
+            List<ReadCommand> localCommands = new ArrayList<ReadCommand>();
+            List<ReadCommand> remoteCommands = new ArrayList<ReadCommand>();
+
+            for (ReadCommand command: commands)
             {
-                row = weakReadRemote(command);
+                EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
+                boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
+                //TODO: Throw InvalidRequest if we're in bootstrap mode?
+                if (foundLocal && !StorageService.instance().isBootstrapMode())
+                {
+                    localCommands.add(command);
+                }
+                else
+                {
+                    remoteCommands.add(command);
+                }
             }
+            if (localCommands.size() > 0)
+                rows.addAll(weakReadLocal(localCommands));
+
+            if (remoteCommands.size() > 0)
+                rows.addAll(weakReadRemote(remoteCommands));
         }
         else
         {
             assert consistency_level == ConsistencyLevel.QUORUM;
-            row = strongRead(command);
+            rows = strongRead(commands);
         }
 
         readStats.add(System.currentTimeMillis() - startTime);
 
-        return row;
-    }
-
-    public static Map<String, Row> readProtocol(String[] keys, ReadCommand readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception
-    {
-        Map<String, Row> rows = new HashMap<String, Row>();        
-        switch ( consistencyLevel )
-        {
-            case WEAK:
-                rows = weakReadProtocol(keys, readCommand);
-                break;
-                
-            case STRONG:
-                rows = strongReadProtocol(keys, readCommand);
-                break;
-                
-            default:
-                rows = weakReadProtocol(keys, readCommand);
-                break;
-        }
-        return rows;
-    }
-
-    /**
-     * This is a multiget version of the above method.
-     */
-    public static Map<String, Row> strongReadProtocol(String[] keys, ReadCommand readCommand) throws IOException, TimeoutException
-    {       
-        Map<String, Row> rows;
-        // TODO: throw a thrift exception if we do not have N nodes
-        Map<String, ReadCommand[]> readMessages = new HashMap<String, ReadCommand[]>();
-        for (String key : keys )
-        {
-            ReadCommand[] readParameters = new ReadCommand[2];
-            readParameters[0] = readCommand.copy();
-            readParameters[1] = readCommand.copy();
-            readParameters[1].setDigestQuery(true);
-            readMessages.put(key, readParameters);
-        }        
-        rows = doStrongReadProtocol(readMessages);         
         return rows;
     }
 
@@ -418,80 +348,100 @@
          * 7. else carry out read repair by getting data from all the nodes.
         // 5. return success
      */
-    private static Row strongRead(ReadCommand command) throws IOException, TimeoutException, InvalidRequestException
+    private static List<Row> strongRead(List<ReadCommand> commands) throws IOException, TimeoutException, InvalidRequestException
     {
-        // TODO: throw a thrift exception if we do not have N nodes
-        assert !command.isDigestQuery();
-        ReadCommand readMessageDigestOnly = command.copy();
-        readMessageDigestOnly.setDigestQuery(true);
-
-        Row row = null;
-        Message message = command.makeReadMessage();
-        Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
-
-        IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
-        QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
-                DatabaseDescriptor.getQuorum(),
-                readResponseResolver);
-        EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
-        List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
-        /* Remove the local storage endpoint from the list. */
-        endpointList.remove(dataPoint);
-        EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
-        Message messages[] = new Message[endpointList.size() + 1];
-
-        /*
-         * First message is sent to the node that will actually get
-         * the data for us. The other two replicas are only sent a
-         * digest query.
-        */
-        endPoints[0] = dataPoint;
-        messages[0] = message;
-        if (logger.isDebugEnabled())
-            logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
-        for (int i = 1; i < endPoints.length; i++)
-        {
-            EndPoint digestPoint = endpointList.get(i - 1);
-            endPoints[i] = digestPoint;
-            messages[i] = messageDigestOnly;
+        List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
+        List<EndPoint[]> commandEndPoints = new ArrayList<EndPoint[]>();
+        List<Row> rows = new ArrayList<Row>();
+
+        int commandIndex = 0;
+
+        for (ReadCommand command: commands)
+        {
+            // TODO: throw a thrift exception if we do not have N nodes
+            assert !command.isDigestQuery();
+            ReadCommand readMessageDigestOnly = command.copy();
+            readMessageDigestOnly.setDigestQuery(true);
+
+            Message message = command.makeReadMessage();
+            Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
+
+            IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+            QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
+                    DatabaseDescriptor.getQuorum(),
+                    readResponseResolver);
+            EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
+            List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
+            /* Remove the local storage endpoint from the list. */
+            endpointList.remove(dataPoint);
+            EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
+            Message messages[] = new Message[endpointList.size() + 1];
+
+            /*
+             * First message is sent to the node that will actually get
+             * the data for us. The other two replicas are only sent a
+             * digest query.
+            */
+            endPoints[0] = dataPoint;
+            messages[0] = message;
             if (logger.isDebugEnabled())
-                logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
+                logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
+            for (int i = 1; i < endPoints.length; i++)
+            {
+                EndPoint digestPoint = endpointList.get(i - 1);
+                endPoints[i] = digestPoint;
+                messages[i] = messageDigestOnly;
+                if (logger.isDebugEnabled())
+                    logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
+            }
+            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+            quorumResponseHandlers.add(quorumResponseHandler);
+            commandEndPoints.add(endPoints);
         }
 
-        try
+        for (QuorumResponseHandler<Row> quorumResponseHandler: quorumResponseHandlers)
         {
-            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+            Row row = null;
+            ReadCommand command = commands.get(commandIndex);
+            try
+            {
+                long startTime2 = System.currentTimeMillis();
+                row = quorumResponseHandler.get();
+                if (row != null)
+                    rows.add(row);
 
-            long startTime2 = System.currentTimeMillis();
-            row = quorumResponseHandler.get();
-            if (logger.isDebugEnabled())
-                logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
-        }
-        catch (DigestMismatchException ex)
-        {
-            if ( DatabaseDescriptor.getConsistencyCheck())
+                if (logger.isDebugEnabled())
+                    logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
+            }
+            catch (DigestMismatchException ex)
             {
-                IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
-                QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
-                        DatabaseDescriptor.getQuorum(),
-                        readResponseResolverRepair);
-                logger.info("DigestMismatchException: " + command.key);
-                Message messageRepair = command.makeReadMessage();
-                MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
-                                                               quorumResponseHandlerRepair);
-                try
-                {
-                    row = quorumResponseHandlerRepair.get();
-                }
-                catch (DigestMismatchException e)
+                if ( DatabaseDescriptor.getConsistencyCheck())
                 {
-                    // TODO should this be a thrift exception?
-                    throw new RuntimeException("digest mismatch reading key " + command.key, e);
+                    IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+                    QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
+                            DatabaseDescriptor.getQuorum(),
+                            readResponseResolverRepair);
+                    logger.info("DigestMismatchException: " + command.key);
+                    Message messageRepair = command.makeReadMessage();
+                    MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex),
+                            quorumResponseHandlerRepair);
+                    try
+                    {
+                        row = quorumResponseHandlerRepair.get();
+                        if (row != null)
+                            rows.add(row);
+                    }
+                    catch (DigestMismatchException e)
+                    {
+                        // TODO should this be a thrift exception?
+                        throw new RuntimeException("digest mismatch reading key " + command.key, e);
+                    }
                 }
             }
+            commandIndex++;
         }
 
-        return row;
+        return rows;
     }
 
     private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]> readMessages) throws IOException
@@ -557,69 +507,6 @@
         MessagingService.getMessagingInstance().sendRR(msgList, epList, quorumResponseHandlers);
         return quorumResponseHandlers;
     }
-    
-    /**
-    *  This method performs the read from the replicas for a bunch of keys.
-    *  @param readMessages map of key --> readMessage[] of two entries where 
-    *         the first entry is the readMessage for the data and the second
-    *         is the entry for the digest 
-    *  @return map containing key ---> Row
-    *  @throws IOException, TimeoutException
-   */
-    private static Map<String, Row> doStrongReadProtocol(Map<String, ReadCommand[]> readMessages) throws IOException
-    {        
-        Map<String, Row> rows = new HashMap<String, Row>();
-        /* Construct the messages to be sent to the replicas */
-        Map<String, Message[]> replicaMessages = constructReplicaMessages(readMessages);
-        /* Dispatch the messages to the different replicas */
-        MultiQuorumResponseHandler cb = dispatchMessagesMulti(readMessages, replicaMessages);
-        try
-        {
-            Row[] rows2 = cb.get();
-            for ( Row row : rows2 )
-            {
-                rows.put(row.key(), row);
-            }
-        }
-        catch (TimeoutException e)
-        {
-            throw new RuntimeException("timeout reading keys " + StringUtils.join(rows.keySet(), ", "), e);
-        }
-        return rows;
-    }
-
-    /**
-     * This version is used when results for multiple keys needs to be
-     * retrieved.
-     * 
-     * @return a mapping of key --> Row
-     * @throws Exception
-     */
-    public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand readCommand) throws Exception
-    {
-        Row row = null;
-        Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
-        for ( String key : keys )
-        {
-            ReadCommand readCmd = readCommand.copy();
-            readMessages.put(key, readCmd);
-        }
-        /* Performs the multiget in parallel */
-        Map<String, Row> rows = doReadProtocol(readMessages);
-        /*
-         * Do the consistency checks for the keys that are being queried
-         * in the background.
-        */
-        for ( String key : keys )
-        {
-            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
-            /* Remove the local storage endpoint from the list. */
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
-            if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, readMessages.get(key));
-        }
-        return rows;
-    }
 
     /*
     * This function executes the read protocol locally and should be used only if consistency is not a concern.
@@ -627,25 +514,33 @@
     * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
     * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
     */
-    private static Row weakReadLocal(ReadCommand command) throws IOException
+    private static List<Row> weakReadLocal(List<ReadCommand> commands) throws IOException
     {
-        if (logger.isDebugEnabled())
-            logger.debug("weakreadlocal reading " + command);
-        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
-        /* Remove the local storage endpoint from the list. */
-        endpoints.remove(StorageService.getLocalStorageEndPoint());
-        // TODO: throw a thrift exception if we do not have N nodes
-
-        Table table = Table.open(command.table);
-        Row row = command.getRow(table);
-
-        /*
-           * Do the consistency checks in the background and return the
-           * non NULL row.
-           */
-        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-            StorageService.instance().doConsistencyCheck(row, endpoints, command);
-        return row;
+        List<Row> rows = new ArrayList<Row>();
+        for (ReadCommand command: commands)
+        {
+            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
+            /* Remove the local storage endpoint from the list. */
+            endpoints.remove(StorageService.getLocalStorageEndPoint());
+            // TODO: throw a thrift exception if we do not have N nodes
+
+            if (logger.isDebugEnabled())
+                logger.debug("weakreadlocal reading " + command);
+
+            Table table = Table.open(command.table);
+            Row row = command.getRow(table);
+            if (row != null)
+                rows.add(row);
+            /*
+            * Do the consistency checks in the background and return the
+            * non NULL row.
+            */
+            if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+                StorageService.instance().doConsistencyCheck(row, endpoints, command);
+
+        }
+
+        return rows;
     }
 
     static List<String> getKeyRange(RangeCommand rawCommand) throws IOException

Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Thu Aug 27 20:07:12 2009
@@ -35,14 +35,22 @@
                                                    Column(_i64(6), 'value6', 0)])]
 
 def _insert_simple(block=True):
+   return _insert_multi(['key1'], block)
+
+def _insert_batch(block):
+   return _insert_multi_batch(['key1'], block)
+
+def _insert_multi(keys, block=True):
     if block:
         consistencyLevel = ConsistencyLevel.ONE
     else:
         consistencyLevel = ConsistencyLevel.ZERO
-    client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c1'), 'value1', 0, consistencyLevel)
-    client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c2'), 'value2', 0, consistencyLevel)
 
-def _insert_batch(block):
+    for key in keys:
+        client.insert('Keyspace1', key, ColumnPath('Standard1', column='c1'), 'value1', 0, consistencyLevel)
+        client.insert('Keyspace1', key, ColumnPath('Standard1', column='c2'), 'value2', 0, consistencyLevel)
+
+def _insert_multi_batch(keys, block):
     cfmap = {'Standard1': [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS],
              'Standard2': [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]}
     if block:
@@ -50,12 +58,17 @@
     else:
         consistencyLevel = ConsistencyLevel.ZERO
 
-    client.batch_insert('Keyspace1', BatchMutation(key='key1', cfmap=cfmap), consistencyLevel)
+    for key in keys:
+        client.batch_insert('Keyspace1', BatchMutation(key=key, cfmap=cfmap), consistencyLevel)
 
 def _big_slice(keyspace, key, column_parent):
     p = SlicePredicate(slice_range=SliceRange('', '', False, 1000))
     return client.get_slice(keyspace, key, column_parent, p, ConsistencyLevel.ONE)
 
+def _big_multislice(keyspace, keys, column_parent):
+    p = SlicePredicate(slice_range=SliceRange('', '', False, 1000))
+    return client.multiget_slice(keyspace, keys, column_parent, p, ConsistencyLevel.ONE)
+
 def _verify_batch():
     _verify_simple()
     L = [result.column
@@ -492,3 +505,59 @@
         result = client.get_slice('Keyspace1', 'key1', ColumnParent('Super1', 'sc1'), p, ConsistencyLevel.ONE) 
         assert len(result) == 1
         assert result[0].column.name == _i64(4)
+
+    def test_multiget(self):
+        """Insert multiple keys and retrieve them using the multiget interface"""
+
+        """Generate a list of 10 keys and insert them"""
+        num_keys = 10
+        keys = ['key'+str(i) for i in range(1, num_keys+1)]
+        _insert_multi(keys)
+
+        """Retrieve all 10 keys"""
+        rows = client.multiget('Keyspace1', keys, ColumnPath('Standard1', column='c1'), ConsistencyLevel.ONE)
+        keys1 = rows.keys().sort()
+        keys2 = keys.sort()
+
+        """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted"""
+        for key in keys:
+            assert rows.has_key(key) == True
+            assert rows[key] == ColumnOrSuperColumn(column=Column(timestamp=0, name='c1', value='value1'))
+
+    def test_multiget_slice(self):
+        """Insert multiple keys and retrieve them using the multiget_slice interface"""
+
+        """Generate a list of 10 keys and insert them"""
+        num_keys = 10
+        keys = ['key'+str(i) for i in range(1, num_keys+1)]
+        _insert_multi(keys)
+
+        """Retrieve all 10 key slices"""
+        rows = _big_multislice('Keyspace1', keys, ColumnParent('Standard1'))
+        keys1 = rows.keys().sort()
+        keys2 = keys.sort()
+
+        columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]
+        """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted"""
+        for key in keys:
+            assert rows.has_key(key) == True
+            assert columns == rows[key]
+
+    def test_multiget_count(self):
+        """Insert multiple keys and retrieve them using the multiget_count interface"""
+
+        """Generate a list of 10 keys and insert them"""
+        num_keys = 10
+        keys = ['key'+str(i) for i in range(1, num_keys+1)]
+        _insert_multi(keys)
+
+        """Retrieve all 10 key slices"""
+        rows = client.multiget_count('Keyspace1', keys, ColumnParent('Standard1'), ConsistencyLevel.ONE)
+        keys1 = rows.keys().sort()
+        keys2 = keys.sort()
+
+        columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]
+        """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted"""
+        for key in keys:
+            assert rows.has_key(key) == True
+            assert rows[key] == 2
\ No newline at end of file