You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2009/03/27 06:42:00 UTC

svn commit: r759028 [1/2] - /incubator/cassandra/trunk/src/org/apache/cassandra/service/

Author: alakshman
Date: Fri Mar 27 05:41:59 2009
New Revision: 759028

URL: http://svn.apache.org/viewvc?rev=759028&view=rev
Log:
Basic implementation of multiget() functionality. Fix to how read-repair is done in the ConsistencyManager.

Removed:
    incubator/cassandra/trunk/src/org/apache/cassandra/service/RangeVerbHandler.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageServiceMBean.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/TokenUpdateVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/WriteResponseResolver.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Fri Mar 27 05:41:59 2009
@@ -18,29 +18,27 @@
 
 package org.apache.cassandra.service;
 
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
+import com.facebook.thrift.*;
+import com.facebook.thrift.server.*;
+import com.facebook.thrift.server.TThreadPoolServer.Options;
+import com.facebook.thrift.transport.*;
+import com.facebook.thrift.protocol.*;
+import com.facebook.fb303.FacebookBase;
+import com.facebook.fb303.fb_status;
+import java.io.*;
+import java.util.Arrays;
 import java.util.Collection;
-import java.util.Iterator;
+import java.util.HashMap;
 import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
-import java.util.Arrays;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import com.facebook.fb303.FacebookBase;
-import com.facebook.fb303.fb_status;
-import com.facebook.thrift.TException;
-import com.facebook.thrift.protocol.TBinaryProtocol;
-import com.facebook.thrift.protocol.TProtocolFactory;
-import com.facebook.thrift.server.TThreadPoolServer;
-import com.facebook.thrift.server.TThreadPoolServer.Options;
-import com.facebook.thrift.transport.TServerSocket;
+import java.util.concurrent.TimeoutException;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql.common.CqlResult;
@@ -50,21 +48,18 @@
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.RowMutationMessage;
+import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.io.DataInputBuffer;
-import org.apache.cassandra.io.DataOutputBuffer;
-import org.apache.cassandra.dht.OrderPreservingPartitioner;
-
+import org.apache.log4j.Logger;
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
 
-public class CassandraServer extends FacebookBase implements
-		Cassandra.Iface
+public class CassandraServer extends FacebookBase implements Cassandra.Iface
 {
 
 	private static Logger logger_ = Logger.getLogger(CassandraServer.class);
@@ -81,7 +76,7 @@
 		storageService = StorageService.instance();
 	}
 
-	public CassandraServer()
+	public CassandraServer() throws Throwable
 	{
 		super("CassandraServer");
 		// Create the instance of the storage service
@@ -93,7 +88,7 @@
 	 * specified port.
 	 */
 	public void start() throws Throwable
-    {
+	{
 		LogUtil.init();
 		//LogUtil.setLogLevel("com.facebook", "DEBUG");
 		// Start the storage service
@@ -125,7 +120,7 @@
 			{
 				throw new CassandraException("No row exists for key " + key);			
 			}
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 			if (cfMap == null || cfMap.size() == 0)
 			{				
 				logger_	.info("ERROR ColumnFamily " + columnFamily + " map is missing.....: " + "   key:" + key );
@@ -167,7 +162,7 @@
 	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
 			}
 
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 			if (cfMap == null || cfMap.size() == 0)
 			{
 				logger_	.info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + "   key:" + key);
@@ -282,7 +277,7 @@
 	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
 			}
 
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 			if (cfMap == null || cfMap.size() == 0)
 			{
 				logger_	.info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + "   key:" + key);
@@ -350,7 +345,7 @@
 	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
 			}
 			
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 			if (cfMap == null || cfMap.size() == 0)
 			{
 				logger_	.info("ERROR ColumnFamily map is missing.....: "
@@ -422,7 +417,7 @@
 	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
 			}
 
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 			if (cfMap == null || cfMap.size() == 0)
 			{
 				logger_	.info("ERROR ColumnFamily map is missing.....: "
@@ -486,34 +481,132 @@
     
     public boolean batch_insert_blocking(batch_mutation_t batchMutation)
     {
-        logger_.debug("batch_insert_blocking");
-        RowMutation rm = RowMutation.getRowMutation(batchMutation);
-        return StorageProxy.insertBlocking(rm);
-    }
+		// 1. Get the N nodes from storage service where the data needs to be
+		// replicated
+		// 2. Construct a message for read\write
+		// 3. SendRR ( to all the nodes above )
+		// 4. Wait for a response from atleast X nodes where X <= N
+		// 5. return success
+    	boolean result = false;
+		try
+		{
+			logger_.warn(" batch_insert_blocking");
+			validateTable(batchMutation.table);
+			IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
+			QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
+					DatabaseDescriptor.getReplicationFactor(),
+					writeResponseResolver);
+			EndPoint[] endpoints = storageService.getNStorageEndPoint(batchMutation.key);
+			// TODO: throw a thrift exception if we do not have N nodes
+
+			logger_.debug(" Creating the row mutation");
+			RowMutation rm = new RowMutation(batchMutation.table,
+					batchMutation.key.trim());
+			Set keys = batchMutation.cfmap.keySet();
+			Iterator keyIter = keys.iterator();
+			while (keyIter.hasNext())
+			{
+				Object key = keyIter.next(); // Get the next key.
+				List<column_t> list = batchMutation.cfmap.get(key);
+				for (column_t columnData : list)
+				{
+					rm.add(key.toString() + ":" + columnData.columnName,
+							columnData.value.getBytes(), columnData.timestamp);
 
+				}
+			}            
+            
+			RowMutationMessage rmMsg = new RowMutationMessage(rm);           
+			Message message = new Message(StorageService.getLocalStorageEndPoint(), 
+                    StorageService.mutationStage_,
+					StorageService.mutationVerbHandler_, 
+                    new Object[]{ rmMsg }
+            );
+			MessagingService.getMessagingInstance().sendRR(message, endpoints,
+					quorumResponseHandler);
+			logger_.debug(" Calling quorum response handler's get");
+			result = quorumResponseHandler.get(); 
+                       
+			// TODO: if the result is false that means the writes to all the
+			// servers failed hence we need to throw an exception or return an
+			// error back to the client so that it can take appropriate action.
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return result;
+    	
+    }
 	public void batch_insert(batch_mutation_t batchMutation)
 	{
-        logger_.debug("batch_insert");
-        RowMutation rm = RowMutation.getRowMutation(batchMutation);
-        StorageProxy.insert(rm);
-	}
+		// 1. Get the N nodes from storage service where the data needs to be
+		// replicated
+		// 2. Construct a message for read\write
+		// 3. SendRR ( to all the nodes above )
+		// 4. Wait for a response from atleast X nodes where X <= N
+		// 5. return success
 
-    public void remove(String tablename, String key, String columnFamily_column)
-	{
-		throw new UnsupportedOperationException("Remove is coming soon");
+		try
+		{
+			logger_.debug(" batch_insert");
+			logger_.debug(" Creating the row mutation");
+			validateTable(batchMutation.table);
+			RowMutation rm = new RowMutation(batchMutation.table,
+					batchMutation.key.trim());
+			if(batchMutation.cfmap != null)
+			{
+				Set keys = batchMutation.cfmap.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<column_t> list = batchMutation.cfmap.get(key);
+					for (column_t columnData : list)
+					{
+						rm.add(key.toString() + ":" + columnData.columnName,
+								columnData.value.getBytes(), columnData.timestamp);
+	
+					}
+				}
+			}
+			if(batchMutation.cfmapdel != null)
+			{
+				Set keys = batchMutation.cfmapdel.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<column_t> list = batchMutation.cfmapdel.get(key);
+					for (column_t columnData : list)
+					{
+						rm.delete(key.toString() + ":" + columnData.columnName);
+					}
+				}            
+			}
+			StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return;
 	}
 
-    public boolean remove(String tablename, String key, String columnFamily_column, long timestamp, int block_for)
+    public void remove(String tablename, String key, String columnFamily_column)
 	{
-        logger_.debug("remove");
-        RowMutation rm = new RowMutation(tablename, key.trim());
-        rm.delete(columnFamily_column, timestamp);
-        if (block_for > 0) {
-            return StorageProxy.insertBlocking(rm);
-        } else {
+		try
+		{
+			validateTable(tablename);
+			RowMutation rm = new RowMutation(tablename, key.trim());
+			rm.delete(columnFamily_column);
             StorageProxy.insert(rm);
-            return true;
-        }
+		}
+		catch (Exception e)
+		{
+			logger_.debug( LogUtil.throwableToString(e) );
+		}
+		return;
 	}
 
     public List<superColumn_t> get_slice_super_by_names(String tablename, String key, String columnFamily, List<String> superColumnNames) throws CassandraException, TException
@@ -590,7 +683,7 @@
 	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
 			}
 
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 			if (cfMap == null || cfMap.size() == 0)
 			{
 				logger_	.info("ERROR ColumnFamily map is missing.....: "
@@ -665,7 +758,7 @@
 	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
 			}
 
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
+			Map<String, ColumnFamily> cfMap = row.getColumnFamilies();
 			if (cfMap == null || cfMap.size() == 0)
 			{
 				logger_	.info("ERROR ColumnFamily map is missing.....: "
@@ -721,16 +814,110 @@
     
     public boolean batch_insert_superColumn_blocking(batch_mutation_super_t batchMutationSuper)
     {
-        logger_.debug("batch_insert_SuperColumn_blocking");
-        RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
-        return StorageProxy.insertBlocking(rm);
+    	boolean result = false;
+		try
+		{
+			logger_.warn(" batch_insert_SuperColumn_blocking");
+			logger_.debug(" Creating the row mutation");
+			validateTable(batchMutationSuper.table);
+			RowMutation rm = new RowMutation(batchMutationSuper.table,
+					batchMutationSuper.key.trim());
+			Set keys = batchMutationSuper.cfmap.keySet();
+			Iterator keyIter = keys.iterator();
+			while (keyIter.hasNext())
+			{
+				Object key = keyIter.next(); // Get the next key.
+				List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+				for (superColumn_t superColumnData : list)
+				{
+					if(superColumnData.columns.size() != 0 )
+					{
+						for (column_t columnData : superColumnData.columns)
+						{
+							rm.add(key.toString() + ":" + superColumnData.name  +":" + columnData.columnName,
+									columnData.value.getBytes(), columnData.timestamp);
+						}
+					}
+					else
+					{
+						rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+					}
+				}
+			}            
+            StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return result;
+    	
     }
-
     public void batch_insert_superColumn(batch_mutation_super_t batchMutationSuper)
     {
-        logger_.debug("batch_insert_SuperColumn");
-        RowMutation rm = RowMutation.getRowMutation(batchMutationSuper);
-        StorageProxy.insert(rm);
+		try
+		{
+			logger_.debug(" batch_insert");
+			logger_.debug(" Creating the row mutation");
+			validateTable(batchMutationSuper.table);
+			RowMutation rm = new RowMutation(batchMutationSuper.table,
+					batchMutationSuper.key.trim());
+			if(batchMutationSuper.cfmap != null)
+			{
+				Set keys = batchMutationSuper.cfmap.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<superColumn_t> list = batchMutationSuper.cfmap.get(key);
+					for (superColumn_t superColumnData : list)
+					{
+						if(superColumnData.columns.size() != 0 )
+						{
+							for (column_t columnData : superColumnData.columns)
+							{
+								rm.add(key.toString() + ":" + superColumnData.name  +":" + columnData.columnName,
+										columnData.value.getBytes(), columnData.timestamp);
+							}
+						}
+						else
+						{
+							rm.add(key.toString() + ":" + superColumnData.name, new byte[0], 0);
+						}
+					}
+				} 
+			}
+			if(batchMutationSuper.cfmapdel != null)
+			{
+				Set keys = batchMutationSuper.cfmapdel.keySet();
+				Iterator keyIter = keys.iterator();
+				while (keyIter.hasNext())
+				{
+					Object key = keyIter.next(); // Get the next key.
+					List<superColumn_t> list = batchMutationSuper.cfmapdel.get(key);
+					for (superColumn_t superColumnData : list)
+					{
+						if(superColumnData.columns.size() != 0 )
+						{
+							for (column_t columnData : superColumnData.columns)
+							{
+								rm.delete(key.toString() + ":" + superColumnData.name  +":" + columnData.columnName);
+							}
+						}
+						else
+						{
+							rm.delete(key.toString() + ":" + superColumnData.name);
+						}
+					}
+				} 
+			}
+            StorageProxy.insert(rm);
+		}
+		catch (Exception e)
+		{
+			logger_.info( LogUtil.throwableToString(e) );
+		}
+		return;
     }
 
     public String getStringProperty(String propertyName) throws TException
@@ -817,76 +1004,7 @@
         }
         return result;
     }
-
-    public List<String> get_range(String tablename, final String startkey) throws CassandraException
-    {
-        if (!(StorageService.getPartitioner() instanceof OrderPreservingPartitioner)) {
-            throw new CassandraException("range queries may only be performed against an order-preserving partitioner");
-        }
-
-        logger_.debug("get_range");
-
-        // send request
-        Message message;
-        DataOutputBuffer dob = new DataOutputBuffer();
-        try
-        {
-            dob.writeUTF(startkey);
-        }
-        catch (IOException e)
-        {
-            logger_.error("unable to write startkey", e);
-            throw new RuntimeException(e);
-        }
-        byte[] messageBody = Arrays.copyOf(dob.getData(), dob.getLength());
-        message = new Message(StorageService.getLocalStorageEndPoint(),
-                              StorageService.readStage_,
-                              StorageService.rangeVerbHandler_,
-                              messageBody);
-        EndPoint endPoint;
-        try
-        {
-            endPoint = StorageService.instance().findSuitableEndPoint(startkey);
-        }
-        catch (Exception e)
-        {
-            throw new CassandraException("Unable to find endpoint for " + startkey);
-        }
-        IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
-
-        // read response
-        // TODO send more requests if we need to span multiple nodes (or can we just let client worry about that,
-        // since they have to handle multiple requests anyway?)
-        byte[] responseBody;
-        try
-        {
-            responseBody = (byte[]) iar.get(2 * DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS)[0];
-        }
-        catch (TimeoutException e)
-        {
-            throw new RuntimeException(e);
-        }
-        DataInputBuffer bufIn = new DataInputBuffer();
-        bufIn.reset(responseBody, responseBody.length);
-
-        // turn into List
-        List<String> keys = new ArrayList<String>();
-        while (bufIn.getPosition() < responseBody.length)
-        {
-            try
-            {
-                keys.add(bufIn.readUTF());
-            }
-            catch (IOException e)
-            {
-                logger_.error("bad utf", e);
-                throw new RuntimeException(e);
-            }
-        }
-
-        return keys;
-    }
-
+    
     /*
      * This method is used to ensure that all keys
      * prior to the specified key, as dtermined by
@@ -924,15 +1042,6 @@
 	public static void main(String[] args) throws Throwable
 	{
 		int port = 9160;		
-
-        Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler()
-        {
-            public void uncaughtException(Thread t, Throwable e)
-            {
-                logger_.error("Fatal exception in thread " + t, e);
-            }
-        });
-
 		try
 		{
 			CassandraServer peerStorageServer = new CassandraServer();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java Fri Mar 27 05:41:59 2009
@@ -58,6 +58,11 @@
 			if ( responses_.size() == ConsistencyManager.this.replicas_.size() )
 				handleDigestResponses();
 		}
+        
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException("This operation is not currently supported.");
+        }
 		
 		private void handleDigestResponses()
 		{
@@ -91,7 +96,8 @@
             replicas_.add(StorageService.getLocalStorageEndPoint());
 			IAsyncCallback responseHandler = new DataRepairHandler(ConsistencyManager.this.replicas_.size(), readResponseResolver);	
 			String table = DatabaseDescriptor.getTables().get(0);
-			ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+            ReadMessage readMessage = constructReadMessage(false);
+			// ReadMessage readMessage = new ReadMessage(table, row_.key(), columnFamily_);
             Message message = ReadMessage.makeReadMessage(readMessage);
 			MessagingService.getMessagingInstance().sendRR(message, replicas_.toArray( new EndPoint[0] ), responseHandler);			
 		}
@@ -116,10 +122,14 @@
 			if ( responses_.size() == majority_ )
 			{
 				String messageId = message.getMessageId();
-				readRepairTable_.put(messageId, messageId, this);
-				// handleResponses();
+				readRepairTable_.put(messageId, messageId, this);				
 			}
 		}
+        
+        public void attachContext(Object o)
+        {
+            throw new UnsupportedOperationException("This operation is not currently supported.");
+        }
 		
 		public void callMe(String key, String value)
 		{
@@ -176,30 +186,8 @@
 
 	public void run()
 	{
-		logger_.debug(" Run the consistency checks for " + columnFamily_);
-		String table = DatabaseDescriptor.getTables().get(0);
-		ReadMessage readMessageDigestOnly = null;
-		if(columnNames_.size() == 0)
-		{
-			if( start_ >= 0 && count_ < Integer.MAX_VALUE)
-			{
-				readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
-			}
-			else if(sinceTimestamp_ > 0)
-			{
-				readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
-			}
-			else
-			{
-				readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_);
-			}
-		}
-		else
-		{
-			readMessageDigestOnly = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
-			
-		}
-		readMessageDigestOnly.setIsDigestQuery(true);
+		logger_.debug(" Run the consistency checks for " + columnFamily_);		
+        ReadMessage readMessageDigestOnly = constructReadMessage(true);
 		try
 		{
 			Message messageDigestOnly = ReadMessage.makeReadMessage(readMessageDigestOnly);
@@ -211,4 +199,33 @@
 			logger_.info(LogUtil.throwableToString(ex));
 		}
 	}
+    
+    private ReadMessage constructReadMessage(boolean isDigestQuery)
+    {
+        ReadMessage readMessage = null;
+        String table = DatabaseDescriptor.getTables().get(0);
+        
+        if(columnNames_.size() == 0)
+        {
+            if( start_ >= 0 && count_ < Integer.MAX_VALUE)
+            {
+                readMessage = new ReadMessage(table, row_.key(), columnFamily_, start_, count_);
+            }
+            else if(sinceTimestamp_ > 0)
+            {
+                readMessage = new ReadMessage(table, row_.key(), columnFamily_, sinceTimestamp_);
+            }
+            else
+            {
+                readMessage = new ReadMessage(table, row_.key(), columnFamily_);
+            }
+        }
+        else
+        {
+            readMessage = new ReadMessage(table, row_.key(), columnFamily_, columnNames_);
+            
+        }
+        readMessage.setIsDigestQuery(isDigestQuery);
+        return readMessage;
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/HttpRequestVerbHandler.java Fri Mar 27 05:41:59 2009
@@ -23,18 +23,15 @@
 import java.lang.management.MemoryMXBean;
 import java.lang.management.MemoryUsage;
 import java.lang.management.RuntimeMXBean;
+import java.math.BigInteger;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.log4j.Logger;
+import java.util.*;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.CalloutDeployMessage;
 import org.apache.cassandra.db.CalloutManager;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.dht.Range;
@@ -51,6 +48,8 @@
 import org.apache.cassandra.net.http.HttpWriteResponse;
 import org.apache.cassandra.procedures.GroovyScriptRunner;
 import org.apache.cassandra.utils.LogUtil;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
 
 /*
  * This class handles the incoming HTTP request after

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/QuorumResponseHandler.java Fri Mar 27 05:41:59 2009
@@ -26,12 +26,13 @@
 import java.util.concurrent.TimeoutException;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.WriteResponseMessage;
 import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.log4j.Logger;
-
+import org.apache.cassandra.utils.*;
 /**
  * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
  */
@@ -122,4 +123,9 @@
             lock_.unlock();
         }
     }
+    
+    public void attachContext(Object o)
+    {
+        throw new UnsupportedOperationException("This operation is not supported in this version of the callback handler");
+    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ReadResponseResolver.java Fri Mar 27 05:41:59 2009
@@ -37,8 +37,8 @@
 import org.apache.log4j.Logger;
 
 
-/*
- * This class is used by all read functions and is called by the Qorum 
+/**
+ * This class is used by all read functions and is called by the Quorum 
  * when atleast a few of the servers ( few is specified in Quorum)
  * have sent the response . The resolve fn then schedules read repair 
  * and resolution of read data from the various servers.
@@ -46,7 +46,6 @@
  */
 public class ReadResponseResolver implements IResponseResolver<Row>
 {
-
 	private static Logger logger_ = Logger.getLogger(WriteResponseResolver.class);
 
 	/*
@@ -101,15 +100,16 @@
                 logger_.info(LogUtil.throwableToString(ex));
             }
 		}
-		// If there was a digest query compare it withh all teh data digests 
-		// If there is a mismatch then thwrow an exception so that read repair can happen.
+		// If there was a digest query compare it with all the data digests 
+		// If there is a mismatch then throw an exception so that read repair can happen.
 		if(isDigestQuery)
 		{
 			for(Row row: rowList)
 			{
 				if( !Arrays.equals(row.digest(), digest) )
 				{
-					throw new DigestMismatchException("The Digest does not match");
+                    /* Wrap the key as the context in this exception */
+					throw new DigestMismatchException(row.key());
 				}
 			}
 		}
@@ -141,13 +141,13 @@
 				continue;
 			// create the row mutation message based on the diff and schedule a read repair 
 			RowMutation rowMutation = new RowMutation(table, key);            			
-	    	Map<String, ColumnFamily> columnFamilies = diffRow.getColumnFamilyMap();
+	    	Map<String, ColumnFamily> columnFamilies = diffRow.getColumnFamilies();
 	        Set<String> cfNames = columnFamilies.keySet();
 	        
 	        for ( String cfName : cfNames )
 	        {
 	            ColumnFamily cf = columnFamilies.get(cfName);
-	            rowMutation.add(cf);
+	            rowMutation.add(cfName, cf);
 	        }
             RowMutationMessage rowMutationMessage = new RowMutationMessage(rowMutation);
 	        // schedule the read repair

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageLoadBalancer.java Fri Mar 27 05:41:59 2009
@@ -19,30 +19,33 @@
 package org.apache.cassandra.service;
 
 import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
+import java.math.BigInteger;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.log4j.Logger;
-
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
-import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.dht.LeaveJoinProtocolImpl;
+import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;
 import org.apache.cassandra.gms.Gossiper;
 import org.apache.cassandra.gms.IEndPointStateChangeSubscriber;
+import org.apache.cassandra.io.SSTable;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IVerbHandler;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.log4j.Logger;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.utils.*;
 
 /*
  * The load balancing algorithm here is an implementation of
@@ -161,6 +164,7 @@
             if ( isMoveable_.get() )
             {
                 MoveMessage moveMessage = (MoveMessage)message.getMessageBody()[0];
+                BigInteger targetToken = moveMessage.getTargetToken();
                 /* Start the leave operation and join the ring at the position specified */
                 isMoveable_.set(false);
             }
@@ -392,18 +396,18 @@
 
 class MoveMessage implements Serializable
 {
-    private Token targetToken_;
+    private BigInteger targetToken_;
 
     private MoveMessage()
     {
     }
 
-    MoveMessage(Token targetToken)
+    MoveMessage(BigInteger targetToken)
     {
         targetToken_ = targetToken;
     }
 
-    Token getTargetToken()
+    BigInteger getTargetToken()
     {
         return targetToken_;
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=759028&r1=759027&r2=759028&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Fri Mar 27 05:41:59 2009
@@ -26,9 +26,6 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-
-import org.apache.commons.lang.StringUtils;
-
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ReadMessage;
 import org.apache.cassandra.db.ReadResponseMessage;
@@ -39,6 +36,7 @@
 import org.apache.cassandra.db.TouchMessage;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
+import org.apache.cassandra.net.IAsyncCallback;
 import org.apache.cassandra.net.IAsyncResult;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
@@ -48,35 +46,35 @@
 
 public class StorageProxy
 {
-    private static Logger logger_ = Logger.getLogger(StorageProxy.class);
+    private static Logger logger_ = Logger.getLogger(StorageProxy.class);    
     
     /**
      * This method is responsible for creating Message to be
      * sent over the wire to N replicas where some of the replicas
      * may be hints.
      */
-    private static Map<EndPoint, Message> createWriteMessages(RowMutation rm, Map<EndPoint, EndPoint> endpointMap) throws IOException
+    private static Map<EndPoint, Message> createWriteMessages(RowMutationMessage rmMessage, Map<EndPoint, EndPoint> endpointMap) throws IOException
     {
-		Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
-		Message message = rm.makeRowMutationMessage();
-
-		for (Map.Entry<EndPoint, EndPoint> entry : endpointMap.entrySet())
-		{
-            EndPoint target = entry.getKey();
-            EndPoint hint = entry.getValue();
+        Map<EndPoint, Message> messageMap = new HashMap<EndPoint, Message>();
+        Message message = RowMutationMessage.makeRowMutationMessage(rmMessage);
+        
+        Set<EndPoint> targets = endpointMap.keySet();
+        for( EndPoint target : targets )
+        {
+            EndPoint hint = endpointMap.get(target);
             if ( !target.equals(hint) )
-			{
-				Message hintedMessage = rm.makeRowMutationMessage();
-				hintedMessage.addHeader(RowMutation.HINT, EndPoint.toBytes(hint) );
-				logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
-				messageMap.put(target, hintedMessage);
-			}
-			else
-			{
-				messageMap.put(target, message);
-			}
-		}
-		return messageMap;
+            {
+                Message hintedMessage = RowMutationMessage.makeRowMutationMessage(rmMessage);
+                hintedMessage.addHeader(RowMutationMessage.hint_, EndPoint.toBytes(hint) );
+                logger_.debug("Sending the hint of " + target.getHost() + " to " + hint.getHost());
+                messageMap.put(target, hintedMessage);
+            }
+            else
+            {
+                messageMap.put(target, message);
+            }
+        }
+        return messageMap;
     }
     
     /**
@@ -84,96 +82,124 @@
      * across all replicas. This method will take care
      * of the possibility of a replica being down and hint
      * the data across to some other replica. 
-     * @param rm the mutation to be applied across the replicas
+     * @param RowMutation the mutation to be applied 
+     *                    across the replicas
     */
     public static void insert(RowMutation rm)
-	{
+    {
         /*
          * Get the N nodes from storage service where the data needs to be
          * replicated
          * Construct a message for write
          * Send them asynchronously to the replicas.
         */
-        assert rm.key() != null;
-
-		try
-		{
-			Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
-			// TODO: throw a thrift exception if we do not have N nodes
-			Map<EndPoint, Message> messageMap = createWriteMessages(rm, endpointMap);
-            logger_.debug("insert writing to [" + StringUtils.join(messageMap.keySet(), ", ") + "]");
-			for (Map.Entry<EndPoint, Message> entry : messageMap.entrySet())
-			{
-				MessagingService.getMessagingInstance().sendOneWay(entry.getValue(), entry.getKey());
-			}
-		}
+        try
+        {
+            logger_.debug(" insert");
+            Map<EndPoint, EndPoint> endpointMap = StorageService.instance().getNStorageEndPointMap(rm.key());
+            // TODO: throw a thrift exception if we do not have N nodes
+            RowMutationMessage rmMsg = new RowMutationMessage(rm); 
+            /* Create the write messages to be sent */
+            Map<EndPoint, Message> messageMap = createWriteMessages(rmMsg, endpointMap);
+            Set<EndPoint> endpoints = messageMap.keySet();
+            for(EndPoint endpoint : endpoints)
+            {
+                MessagingService.getMessagingInstance().sendOneWay(messageMap.get(endpoint), endpoint);
+            }
+        }
         catch (Exception e)
         {
-            logger_.error( LogUtil.throwableToString(e) );
+            logger_.info( LogUtil.throwableToString(e) );
         }
         return;
     }
 
-    public static boolean insertBlocking(RowMutation rm)
+    
+    private static Map<String, Message> constructMessages(Map<String, ReadMessage> readMessages) throws IOException
     {
-        assert rm.key() != null;
-
-        try
+        Map<String, Message> messages = new HashMap<String, Message>();
+        Set<String> keys = readMessages.keySet();        
+        for ( String key : keys )
         {
-            Message message = rm.makeRowMutationMessage();
-
-            IResponseResolver<Boolean> writeResponseResolver = new WriteResponseResolver();
-            QuorumResponseHandler<Boolean> quorumResponseHandler = new QuorumResponseHandler<Boolean>(
-                    DatabaseDescriptor.getReplicationFactor(),
-                    writeResponseResolver);
-            EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(rm.key());
-            logger_.debug("insertBlocking writing to [" + StringUtils.join(endpoints, ", ") + "]");
-            // TODO: throw a thrift exception if we do not have N nodes
-
-            MessagingService.getMessagingInstance().sendRR(message, endpoints, quorumResponseHandler);
-            return quorumResponseHandler.get();
-
-            // TODO: if the result is false that means the writes to all the
-            // servers failed hence we need to throw an exception or return an
-            // error back to the client so that it can take appropriate action.
-        }
-        catch (Exception e)
-        {
-            logger_.error( LogUtil.throwableToString(e) );
-            return false;
-        }
+            Message message = ReadMessage.makeReadMessage( readMessages.get(key) );
+            messages.put(key, message);
+        }        
+        return messages;
     }
     
-    public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+    private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Map<String, Message> messages)
     {
-        EndPoint endPoint = null;
-        try
+        Set<String> keys = endPoints.keySet();
+        EndPoint[] eps = new EndPoint[keys.size()];
+        Message[] msgs  = new Message[keys.size()];
+        
+        int i = 0;
+        for ( String key : keys )
         {
-            endPoint = StorageService.instance().findSuitableEndPoint(key);
+            eps[i] = endPoints.get(key);
+            msgs[i] = messages.get(key);
+            ++i;
         }
-        catch( Throwable ex)
+        
+        IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(msgs, eps);
+        return iar;
+    }
+    
+    /**
+     * This is an implementation for the multiget version. 
+     * @param readMessages map of key --> ReadMessage to be sent
+     * @return map of key --> Row
+     * @throws IOException
+     * @throws TimeoutException
+     */
+    public static Map<String, Row> doReadProtocol(Map<String, ReadMessage> readMessages) throws IOException,TimeoutException
+    {
+        Map<String, Row> rows = new HashMap<String, Row>();
+        Set<String> keys = readMessages.keySet();
+        /* Find all the suitable endpoints for the keys */
+        Map<String, EndPoint> endPoints = StorageService.instance().findSuitableEndPoints(keys.toArray( new String[0] ));
+        /* Construct the messages to be sent out */
+        Map<String, Message> messages = constructMessages(readMessages);
+        /* Dispatch the messages to the respective endpoints */
+        IAsyncResult iar = dispatchMessages(endPoints, messages);        
+        List<Object[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+        
+        for ( Object[] result : results )
         {
-            ex.printStackTrace();
-        }
+            byte[] body = (byte[])result[0];
+            DataInputBuffer bufIn = new DataInputBuffer();
+            bufIn.reset(body, body.length);
+            ReadResponseMessage responseMessage = ReadResponseMessage.serializer().deserialize(bufIn);
+            Row row = responseMessage.row();
+            rows.put(row.key(), row);
+        }        
+        return rows;
+    }
+    
+    public static Row doReadProtocol(String key, ReadMessage readMessage) throws IOException,TimeoutException
+    {
+        Row row = null;
+        EndPoint endPoint = StorageService.instance().findSuitableEndPoint(key);        
         if(endPoint != null)
         {
             Message message = ReadMessage.makeReadMessage(readMessage);
+            message.addHeader(ReadMessage.doRepair_, ReadMessage.doRepair_.getBytes());
             IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
             Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
             byte[] body = (byte[])result[0];
             DataInputBuffer bufIn = new DataInputBuffer();
             bufIn.reset(body, body.length);
             ReadResponseMessage responseMessage = ReadResponseMessage.serializer().deserialize(bufIn);
-            return responseMessage.row();
+            row = responseMessage.row();            
         }
         else
         {
             logger_.warn(" Alert : Unable to find a suitable end point for the key : " + key );
         }
-        return null;
+        return row;
     }
 
-    static void touch_local (String tablename, String key, boolean fData ) throws IOException
+    static void touch_local(String tablename, String key, boolean fData ) throws IOException
     {
 		Table table = Table.open( tablename );
 		table.touch(key, fData);
@@ -237,10 +263,8 @@
 	            weakTouchProtocol(tablename, key, fData);
 	            break;
         }
-    }
+    }  
         
-    
-    
     public static Row readProtocol(String tablename, String key, String columnFamily, List<String> columnNames, StorageService.ConsistencyLevel consistencyLevel) throws Exception
     {
         Row row = null;
@@ -277,9 +301,7 @@
                 break;
             }
         }
-        return row;
-        
-        
+        return row;                
     }
         
     public static Row readProtocol(String tablename, String key, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
@@ -321,6 +343,26 @@
         return row;
     }
     
+    public static Map<String, Row> readProtocol(String tablename, String[] keys, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    {
+        Map<String, Row> rows = new HashMap<String, Row>();        
+        switch ( consistencyLevel )
+        {
+            case WEAK:
+                rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+                break;
+                
+            case STRONG:
+                rows = strongReadProtocol(tablename, keys, columnFamily, start, count);
+                break;
+                
+            default:
+                rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+                break;
+        }
+        return rows;
+    }
+    
     public static Row readProtocol(String tablename, String key, String columnFamily, long sinceTimestamp, StorageService.ConsistencyLevel consistencyLevel) throws Exception
     {
         Row row = null;
@@ -374,18 +416,21 @@
         return row;
     }
     
-    /*
-     * This function executes the read protocol.
-        // 1. Get the N nodes from storage service where the data needs to be
-        // replicated
-        // 2. Construct a message for read\write
-         * 3. Set one of teh messages to get teh data and teh rest to get teh digest
-        // 4. SendRR ( to all the nodes above )
-        // 5. Wait for a response from atleast X nodes where X <= N and teh data node
-         * 6. If the digest matches return teh data.
-         * 7. else carry out read repair by getting data from all the nodes.
-        // 5. return success
-     * 
+    /**
+      * This function executes the read protocol.
+      * 1. Get the N nodes from storage service where the data needs to be replicated
+      * 2. Construct a message for read\write
+      * 3. Set one of teh messages to get teh data and teh rest to get teh digest
+      * 4. SendRR ( to all the nodes above )
+      * 5. Wait for a response from atleast X nodes where X <= N and teh data node
+      * 6. If the digest matches return teh data.
+      * 7. else carry out read repair by getting data from all the nodes.
+      * @param tablename the name of the table
+      * @param key the row key identifier
+      * @param columnFamily the column in Cassandra format
+      * @start the start position
+      * @count the number of columns we are interested in
+      * @throws IOException, TimeoutException
      */
     public static Row strongReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws IOException, TimeoutException
     {       
@@ -416,6 +461,49 @@
         return row;
     }
     
+    /**
+     * This is a multiget version of the above method.
+     * @param tablename
+     * @param keys
+     * @param columnFamily
+     * @param start
+     * @param count
+     * @return
+     * @throws IOException
+     * @throws TimeoutException
+     */
+    public static Map<String, Row> strongReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws IOException, TimeoutException
+    {       
+        Map<String, Row> rows = new HashMap<String, Row>();
+        long startTime = System.currentTimeMillis();        
+        // TODO: throw a thrift exception if we do not have N nodes
+        Map<String, ReadMessage[]> readMessages = new HashMap<String, ReadMessage[]>();        
+        for (String key : keys )
+        {
+            ReadMessage[] readMessage = new ReadMessage[2];
+            if( start >= 0 && count < Integer.MAX_VALUE)
+            {
+                readMessage[0] = new ReadMessage(tablename, key, columnFamily, start, count);
+            }
+            else
+            {
+                readMessage[0] = new ReadMessage(tablename, key, columnFamily);
+            }            
+            if( start >= 0 && count < Integer.MAX_VALUE)
+            {
+                readMessage[1] = new ReadMessage(tablename, key, columnFamily, start, count);
+            }
+            else
+            {
+                readMessage[1] = new ReadMessage(tablename, key, columnFamily);
+            }
+            readMessage[1].setIsDigestQuery(true);
+        }        
+        rows = doStrongReadProtocol(readMessages);         
+        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+        return rows;
+    }
+    
     public static Row strongReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws IOException, TimeoutException
     {       
         long startTime = System.currentTimeMillis();        
@@ -431,8 +519,8 @@
         return row;
     }
 
-    /*
-     * This method performs the actual read from the replicas.
+    /**
+     *  This method performs the read from the replicas.
      *  param @ key - key for which the data is required.
      *  param @ readMessage - the read message to get the actual data
      *  param @ readMessageDigest - the read message to get the digest.
@@ -454,11 +542,14 @@
         EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
         Message messages[] = new Message[endpointList.size() + 1];
         
-        // first message is the data Point 
+        /* 
+         * First message is sent to the node that will actually get
+         * the data for us. The other two replicas are only sent a 
+         * digest query.
+        */
         endPoints[0] = dataPoint;
-        messages[0] = message;
-        
-        for(int i=1; i < endPoints.length ; i++)
+        messages[0] = message;        
+        for (int i=1; i < endPoints.length ; i++)
         {
             endPoints[i] = endpointList.get(i-1);
             messages[i] = messageDigestOnly;
@@ -466,8 +557,7 @@
         
         try
         {
-            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
-            
+            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);            
             long startTime2 = System.currentTimeMillis();
             row = quorumResponseHandler.get();
             logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2)
@@ -490,8 +580,7 @@
 	            readMessage.setIsDigestQuery(false);
 	            logger_.info("DigestMismatchException: " + key);            
 	            Message messageRepair = ReadMessage.makeReadMessage(readMessage);
-	            MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
-	                    quorumResponseHandlerRepair);
+	            MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, quorumResponseHandlerRepair);
 	            try
 	            {
 	                row = quorumResponseHandlerRepair.get();
@@ -509,6 +598,111 @@
         return row;
     }
     
+    private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadMessage[]> readMessages) throws IOException
+    {
+        Map<String, Message[]> messages = new HashMap<String, Message[]>();
+        Set<String> keys = readMessages.keySet();
+        
+        for ( String key : keys )
+        {
+            Message[] msg = new Message[DatabaseDescriptor.getReplicationFactor()];
+            ReadMessage[] readMessage = readMessages.get(key);
+            msg[0] = ReadMessage.makeReadMessage( readMessage[0] );            
+            for ( int i = 1; i < msg.length; ++i )
+            {
+                msg[i] = ReadMessage.makeReadMessage( readMessage[1] );
+            }
+        }        
+        return messages;
+    }
+    
+    private static MultiQuorumResponseHandler dispatchMessages(Map<String, ReadMessage[]> readMessages, Map<String, Message[]> messages) throws IOException
+    {
+        Set<String> keys = messages.keySet();
+        /* This maps the keys to the original data read messages */
+        Map<String, ReadMessage> readMessage = new HashMap<String, ReadMessage>();
+        /* This maps the keys to their respective endpoints/replicas */
+        Map<String, EndPoint[]> endpoints = new HashMap<String, EndPoint[]>();
+        /* Groups the messages that need to be sent to the individual keys */
+        Message[][] msgList = new Message[messages.size()][DatabaseDescriptor.getReplicationFactor()];
+        /* Respects the above grouping and provides the endpoints for the above messages */
+        EndPoint[][] epList = new EndPoint[messages.size()][DatabaseDescriptor.getReplicationFactor()];
+        
+        int i = 0;
+        for ( String key : keys )
+        {
+            /* This is the primary */
+            EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(key);
+            List<EndPoint> replicas = new ArrayList<EndPoint>( StorageService.instance().getNLiveStorageEndPoint(key) );
+            replicas.remove(dataPoint);
+            /* Get the messages to be sent index 0 is the data messages and index 1 is the digest message */
+            Message[] message = messages.get(key);           
+            msgList[i][0] = message[0];
+            int N = DatabaseDescriptor.getReplicationFactor();
+            for ( int j = 1; j < N; ++j )
+            {
+                msgList[i][j] = message[1];
+            }
+            /* Get the endpoints to which the above messages need to be sent */
+            epList[i][0] = dataPoint;
+            for ( int j = 1; i < N; ++i )
+            {                
+                epList[i][j] = replicas.get(j - 1);
+            } 
+            /* Data ReadMessage associated with this key */
+            readMessage.put( key, readMessages.get(key)[0] );
+            /* EndPoints for this specific key */
+            endpoints.put(key, epList[i]);
+            ++i;
+        }
+                
+        /* Handles the read semantics for this entire set of keys */
+        MultiQuorumResponseHandler quorumResponseHandlers = new MultiQuorumResponseHandler(readMessage, endpoints);
+        MessagingService.getMessagingInstance().sendRR(msgList, epList, quorumResponseHandlers);
+        return quorumResponseHandlers;
+    }
+    
+    /**
+    *  This method performs the read from the replicas for a bunch of keys.
+    *  @param readMessages map of key --> readMessage[] of two entries where 
+    *         the first entry is the readMessage for the data and the second
+    *         is the entry for the digest 
+    *  @return map containing key ---> Row
+    *  @throws IOException, TimeoutException
+   */
+    private static Map<String, Row> doStrongReadProtocol(Map<String, ReadMessage[]> readMessages) throws IOException
+    {        
+        Map<String, Row> rows = new HashMap<String, Row>();
+        /* Construct the messages to be sent to the replicas */
+        Map<String, Message[]> replicaMessages = constructReplicaMessages(readMessages);
+        /* Dispatch the messages to the different replicas */
+        MultiQuorumResponseHandler cb = dispatchMessages(readMessages, replicaMessages);
+        try
+        {
+            Row[] rows2 = cb.get();
+            for ( Row row : rows2 )
+            {
+                rows.put(row.key(), row);
+            }
+        }
+        catch ( TimeoutException ex )
+        {
+            logger_.info("Operation timed out waiting for responses ...");
+            logger_.info(LogUtil.throwableToString(ex));
+        }
+        return rows;
+    }
+    
+    /**
+     * This version is used to retrieve the row associated with
+     * the specified key
+     * @param tablename name of the table that needs to be queried
+     * @param keys keys whose values we are interested in 
+     * @param columnFamily name of the "column" we are interested in
+     * @param columns the columns we are interested in
+     * @return the interested row
+     * @throws Exception
+     */
     public static Row weakReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
     {       
         long startTime = System.currentTimeMillis();
@@ -530,11 +724,56 @@
         return row;
     }
     
-    /*
+    /**
+     * This version is used when results for multiple keys needs to be
+     * retrieved.
+     * 
+     * @param tablename name of the table that needs to be queried
+     * @param keys keys whose values we are interested in 
+     * @param columnFamily name of the "column" we are interested in
+     * @param columns the columns we are interested in
+     * @return a mapping of key --> Row
+     * @throws Exception
+     */
+    public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, List<String> columns) throws Exception
+    {
+        Row row = null;
+        long startTime = System.currentTimeMillis();
+        Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+        for ( String key : keys )
+        {
+            ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, columns);
+            readMessages.put(key, readMessage);
+        }
+        /* Performs the multiget in parallel */
+        Map<String, Row> rows = doReadProtocol(readMessages);
+        /*
+         * Do the consistency checks for the keys that are being queried
+         * in the background.
+        */
+        for ( String key : keys )
+        {
+            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+            /* Remove the local storage endpoint from the list. */ 
+            endpoints.remove( StorageService.getLocalStorageEndPoint() );
+            if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+                StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
+        }
+        return rows;         
+    }
+    
+    /**
      * This function executes the read protocol locally and should be used only if consistency is not a concern. 
      * Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
      * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
-     * the data we perform consistency checks and figure out if any repairs need to be done to the replicas. 
+     * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+     * @param tablename name of the table that needs to be queried
+     * @param key key whose we are interested in 
+     * @param columnFamily name of the "column" we are interested in
+     * @param start start index
+     * @param count the number of columns we are interested in
+     * @return the row associated with this key
+     * @throws Exception 
      */
     public static Row weakReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws Exception
     {
@@ -565,6 +804,55 @@
         return row;         
     }
     
+    /**
+     * This version is used when results for multiple keys needs to be
+     * retrieved.
+     * 
+     * @param tablename name of the table that needs to be queried
+     * @param keys keys whose values we are interested in 
+     * @param columnFamily name of the "column" we are interested in
+     * @param start start index
+     * @param count the number of columns we are interested in
+     * @return a mapping of key --> Row
+     * @throws Exception
+     */
+    public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws Exception
+    {
+        Row row = null;
+        long startTime = System.currentTimeMillis();
+        Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+        for ( String key : keys )
+        {
+            ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, start, count);
+            readMessages.put(key, readMessage);
+        }
+        /* Performs the multiget in parallel */
+        Map<String, Row> rows = doReadProtocol(readMessages);
+        /*
+         * Do the consistency checks for the keys that are being queried
+         * in the background.
+        */
+        for ( String key : keys )
+        {
+            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+            /* Remove the local storage endpoint from the list. */ 
+            endpoints.remove( StorageService.getLocalStorageEndPoint() );
+            if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+                StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
+        }
+        return rows;         
+    }
+    
+    /**
+     * This version is used when retrieving a single key.
+     * 
+     * @param tablename name of the table that needs to be queried
+     * @param key key whose we are interested in 
+     * @param columnFamily name of the "column" we are interested in
+     * @param sinceTimestamp this is lower bound of the timestamp
+     * @return the row associated with this key
+     * @throws Exception
+     */
     public static Row weakReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws Exception
     {
         Row row = null;
@@ -585,5 +873,42 @@
         	StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
         return row;         
     }
-
+    
+    /**
+     * This version is used when results for multiple keys needs to be
+     * retrieved.
+     * 
+     * @param tablename name of the table that needs to be queried
+     * @param keys keys whose values we are interested in 
+     * @param columnFamily name of the "column" we are interested in
+     * @param sinceTimestamp this is lower bound of the timestamp
+     * @return a mapping of key --> Row
+     * @throws Exception
+     */
+    public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, long sinceTimestamp) throws Exception
+    {
+        Row row = null;
+        long startTime = System.currentTimeMillis();
+        Map<String, ReadMessage> readMessages = new HashMap<String, ReadMessage>();
+        for ( String key : keys )
+        {
+            ReadMessage readMessage = new ReadMessage(tablename, key, columnFamily, sinceTimestamp);
+            readMessages.put(key, readMessage);
+        }
+        /* Performs the multiget in parallel */
+        Map<String, Row> rows = doReadProtocol(readMessages);
+        /*
+         * Do the consistency checks for the keys that are being queried
+         * in the background.
+        */
+        for ( String key : keys )
+        {
+            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
+            /* Remove the local storage endpoint from the list. */ 
+            endpoints.remove( StorageService.getLocalStorageEndPoint() );
+            if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+                StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
+        }
+        return rows;         
+    }
 }