You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/20 22:05:26 UTC

svn commit: r766841 - in /incubator/cassandra/trunk/src/org/apache/cassandra: cql/common/ db/ service/

Author: jbellis
Date: Mon Apr 20 20:05:26 2009
New Revision: 766841

URL: http://svn.apache.org/viewvc?rev=766841&view=rev
Log:
refactor read path: first we make readColumnFamily accept a ReadCommand, and use that to clean out duplicate code in CassandraServer.  Then we clean up the duplicate versions of the read methods in StorageService by making them ReadCommand-based, too.  [not touching multiget code for now.]
patch by jbellis; reviewed by Eric Evans for #88

Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java Mon Apr 20 20:05:26 2009
@@ -28,6 +28,7 @@
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
@@ -99,8 +100,8 @@
         try
         {
             String key = (String)(rowKey_.get());
-            row = StorageProxy.readProtocol(cfMetaData_.tableName, key, columnFamily_column,
-                                            offset_, limit_, StorageService.ConsistencyLevel.WEAK);
+            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, offset_, limit_);
+            row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)
         {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java Mon Apr 20 20:05:26 2009
@@ -28,6 +28,7 @@
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
@@ -68,8 +69,8 @@
         try
         {
             String key = (String)(rowKey_.get());
-            row = StorageProxy.readProtocol(cfMetaData_.tableName, key, cfMetaData_.cfName,
-                                            offset_, limit_, StorageService.ConsistencyLevel.WEAK);
+            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, cfMetaData_.cfName, offset_, limit_);
+            row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)
         {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java Mon Apr 20 20:05:26 2009
@@ -29,6 +29,7 @@
 import org.apache.cassandra.db.ColumnFamily;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
+import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.LogUtil;
@@ -84,8 +85,8 @@
         try
         {
             String key = (String)(rowKey_.get());
-            row = StorageProxy.readProtocol(cfMetaData_.tableName, key, columnFamily_column, -1,
-                                            Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
+            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, -1, Integer.MAX_VALUE);
+            row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)
         {

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java Mon Apr 20 20:05:26 2009
@@ -126,6 +126,26 @@
         return new ReadCommand(table, key, columnFamilyColumn, start, count, sinceTimestamp, columnNames);
     }
 
+    public Row getRow(Table table) throws IOException, ColumnFamilyNotDefinedException
+    {
+        if (columnNames != EMPTY_COLUMNS)
+        {
+            return table.getRow(key, columnFamilyColumn, columnNames);
+        }
+
+        if (sinceTimestamp > 0)
+        {
+            return table.getRow(key, columnFamilyColumn, sinceTimestamp);
+        }
+
+        if (start > 0 || (count > 0 && count < Integer.MAX_VALUE))
+        {
+            return table.getRow(key, columnFamilyColumn, start, count);
+        }
+
+        return table.getRow(key, columnFamilyColumn);
+    }
+
     public String toString()
     {
         return "ReadMessage(" +

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Mon Apr 20 20:05:26 2009
@@ -26,6 +26,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.log4j.Logger;
 
@@ -37,6 +38,9 @@
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.ReadCommand;
+import org.apache.cassandra.db.Table;
+import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
 import org.apache.cassandra.utils.LogUtil;
 import org.apache.thrift.TException;
 
@@ -80,43 +84,26 @@
 		}
 	}
     
-	protected ColumnFamily readColumnFamily(String tablename, String key, String columnFamily, List<String> columNames) throws CassandraException, TException
-	{
-    	ColumnFamily cfamily = null;
-		try
-		{
-			validateTable(tablename);
-	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily);
-	        // check for  values 
-	        if( values.length < 1 )
-	        {
-	        	throw new CassandraException("Column Family " + columnFamily + " is invalid.");	        	
-	        }
-	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily, columNames, StorageService.ConsistencyLevel.WEAK);
-	        if (row == null)
-			{
-				throw new CassandraException("No row exists for key " + key);			
-			}
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
-			if (cfMap == null || cfMap.size() == 0)
-			{				
-				logger_	.info("ERROR ColumnFamily " + columnFamily + " map is missing.....: " + "   key:" + key );
-				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
-			}
-			cfamily = cfMap.get(values[0]);
-			if (cfamily == null)
-			{
-				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: " + "   key:" + key + "  ColumnFamily:" + values[0]);
-				throw new CassandraException("Either the key " + key + " is not present or the column family " + values[0] +  " is not present.");
-			}
-		}
-		catch (Throwable ex)
-		{
-			String exception = LogUtil.throwableToString(ex);
-			logger_.info( exception );
-			throw new CassandraException(exception);
-		}
-		return cfamily;
+	protected ColumnFamily readColumnFamily(ReadCommand command) throws CassandraException, TException, IOException, ColumnFamilyNotDefinedException, TimeoutException
+    {
+        validateTable(command.table);
+        String[] values = RowMutation.getColumnAndColumnFamily(command.columnFamilyColumn);
+        if( values.length < 1 )
+        {
+            throw new CassandraException("Empty column Family is invalid.");
+        }
+        Table table = Table.open(command.table);
+        if (!table.getColumnFamilies().contains(values[0]))
+        {
+            throw new CassandraException("Column Family " + values[0] + " is invalid.");
+        }
+
+        Row row = StorageProxy.readProtocol(command, StorageService.ConsistencyLevel.WEAK);
+        if (row == null)
+        {
+            return null;
+        }
+        return row.getColumnFamily(values[0]);
 	}
 
     public List<column_t> thriftifyColumns(Collection<IColumn> columns)
@@ -139,27 +126,8 @@
         long startTime = System.currentTimeMillis();
 		try
 		{
-			validateTable(tablename);
-	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-	        // check for  values 
-	        if( values.length < 1 )
-	        {
-	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
-	        }
-	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, timeStamp, StorageService.ConsistencyLevel.WEAK);
-			if (row == null)
-			{
-				logger_.info("ERROR No row for this key .....: " + key);
-	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
-			}
-
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
-			if (cfMap == null || cfMap.size() == 0)
-			{
-				logger_	.info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + "   key:" + key);
-				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
-			}
-			ColumnFamily cfamily = cfMap.get(values[0]);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, timeStamp));
+            String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: "+"   key:" + key	+ "  ColumnFamily:" + values[0]);
@@ -204,7 +172,7 @@
 		try
 		{
 			validateTable(tablename);
-			ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, columnNames);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, columnNames));
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "
@@ -241,27 +209,8 @@
         long startTime = System.currentTimeMillis();
 		try
 		{
-			validateTable(tablename);
 	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-	        // check for  values 
-	        if( values.length < 1 )
-	        {
-	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
-	        }
-	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, start, count, StorageService.ConsistencyLevel.WEAK);
-			if (row == null)
-			{
-				logger_.info("ERROR No row for this key .....: " + key);
-	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
-			}
-
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
-			if (cfMap == null || cfMap.size() == 0)
-			{
-				logger_	.info("ERROR ColumnFamily " + columnFamily_column + " map is missing.....: " + "   key:" + key);
-				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
-			}
-			ColumnFamily cfamily = cfMap.get(values[0]);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, start, count));
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily " + columnFamily_column + " is missing.....: "	+ "   key:" + key + "  ColumnFamily:" + values[0]);
@@ -303,29 +252,8 @@
     {
 		try
 		{
-			validateTable(tablename);
 	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-	        // check for  values 
-	        if( values.length < 2 )
-	        {
-	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
-	        }
-	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
-			if (row == null)
-			{
-				logger_.info("ERROR No row for this key .....: " + key);
-	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
-			}
-			
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
-			if (cfMap == null || cfMap.size() == 0)
-			{
-				logger_	.info("ERROR ColumnFamily map is missing.....: "
-							   + "   key:" + key
-								);
-				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
-			}
-			ColumnFamily cfamily = cfMap.get(values[0]);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily  is missing.....: "
@@ -375,29 +303,8 @@
     	int count = -1;
 		try
 		{
-			validateTable(tablename);
 	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-	        // check for  values 
-	        if( values.length < 1 )
-	        {
-	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
-	        }
-	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
-			if (row == null)
-			{
-				logger_.info("ERROR No row for this key .....: " + key);
-	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
-			}
-
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
-			if (cfMap == null || cfMap.size() == 0)
-			{
-				logger_	.info("ERROR ColumnFamily map is missing.....: "
-							   + "   key:" + key
-								);
-				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
-			}
-			ColumnFamily cfamily = cfMap.get(values[0]);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily  is missing.....: "
@@ -485,7 +392,7 @@
 		try
 		{
 			validateTable(tablename);
-			ColumnFamily cfamily = readColumnFamily(tablename, key, columnFamily, superColumnNames);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, superColumnNames));
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily " + columnFamily + " is missing.....: "+"   key:" + key
@@ -533,29 +440,8 @@
     {
 		try
 		{
-			validateTable(tablename);
 	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_superColumnName);
-	        // check for  values 
-	        if( values.length < 1 )
-	        {
-	        	throw new CassandraException("Column Family " + columnFamily_superColumnName + " is invalid.");	        	
-	        }
-	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_superColumnName, start, count, StorageService.ConsistencyLevel.WEAK);
-			if (row == null)
-			{
-				logger_.info("ERROR No row for this key .....: " + key);
-	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
-			}
-
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
-			if (cfMap == null || cfMap.size() == 0)
-			{
-				logger_	.info("ERROR ColumnFamily map is missing.....: "
-							   + "   key:" + key
-								);
-				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
-			}
-			ColumnFamily cfamily = cfMap.get(values[0]);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_superColumnName, start, count));
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily  is missing.....: "
@@ -584,33 +470,10 @@
     
     public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws CassandraException
     {
-    	superColumn_t ret = null;
 		try
 		{
-			validateTable(tablename);
 	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-	        // check for  values 
-	        if( values.length < 2 )
-	        {
-	        	throw new CassandraException("Column Family " + columnFamily_column + " is invalid.");	        	
-	        }
-
-	        Row row = StorageProxy.readProtocol(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE, StorageService.ConsistencyLevel.WEAK);
-			if (row == null)
-			{
-				logger_.info("ERROR No row for this key .....: " + key);
-	        	throw new CassandraException("ERROR No row for this key .....: " + key);	        	
-			}
-
-			Map<String, ColumnFamily> cfMap = row.getColumnFamilyMap();
-			if (cfMap == null || cfMap.size() == 0)
-			{
-				logger_	.info("ERROR ColumnFamily map is missing.....: "
-							   + "   key:" + key
-								);
-				throw new CassandraException("Either the key " + key + " is not present or the columns requested are not present.");
-			}
-			ColumnFamily cfamily = cfMap.get(values[0]);
+			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
 			if (cfamily == null)
 			{
 				logger_.info("ERROR ColumnFamily  is missing.....: "

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java Mon Apr 20 20:05:26 2009
@@ -155,29 +155,30 @@
 	private long sinceTimestamp_;
 	private List<String> columnNames_ = new ArrayList<String>();	
 	
-	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
+    public ConsistencyManager(Row row_, List<EndPoint> replicas_, String columnFamily_, int start_, int count_, long sinceTimestamp_, List<String> columnNames_)
+    {
+        this.row_ = row_;
+        this.replicas_ = replicas_;
+        this.columnFamily_ = columnFamily_;
+        this.start_ = start_;
+        this.count_ = count_;
+        this.sinceTimestamp_ = sinceTimestamp_;
+        this.columnNames_ = columnNames_;
+    }
+
+    ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
 	{
-		row_ = row;
-		replicas_ = replicas;
-		columnFamily_ = columnFamily;
-		columnNames_ = columns;
+        this(row, replicas, columnFamily, 0, 0, 0, columns);
 	}
-	
+
 	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, int start, int count)
 	{
-		row_ = row;
-		replicas_ = replicas;
-		columnFamily_ = columnFamily;
-		start_ = start;
-		count_ = count;
+        this(row, replicas, columnFamily, start, count, 0, null);
 	}
-	
+
 	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, long sinceTimestamp)
 	{
-		row_ = row;
-		replicas_ = replicas;
-		columnFamily_ = columnFamily;
-		sinceTimestamp_ = sinceTimestamp;
+        this(row, replicas, columnFamily, 0, 0, sinceTimestamp, null);
 	}
 
 	public void run()

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Mon Apr 20 20:05:26 2009
@@ -36,6 +36,7 @@
 import org.apache.cassandra.db.RowMutation;
 import org.apache.cassandra.db.Table;
 import org.apache.cassandra.db.TouchMessage;
+import org.apache.cassandra.db.ColumnFamilyNotDefinedException;
 import org.apache.cassandra.io.DataInputBuffer;
 import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.net.IAsyncResult;
@@ -203,28 +204,37 @@
         }        
         return rows;
     }
-    
-    public static Row doReadProtocol(String key, ReadCommand readCommand) throws IOException,TimeoutException
+
+    /**
+     * Read the data from one replica.  If there is no reply, read the data from another.  In the event we get
+     * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+     * @param command the read to perform
+     * @return the row associated with command.key
+     * @throws Exception
+     */
+    private static Row weakReadRemote(ReadCommand command) throws IOException
     {
-        Row row = null;
-        EndPoint endPoint = StorageService.instance().findSuitableEndPoint(key);        
-        if(endPoint != null)
+        EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
+        assert endPoint != null;
+        logger_.debug("weakreadremote reading " + command + " from " + endPoint);
+        Message message = command.makeReadMessage();
+        message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+        IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
+        byte[] body;
+        try
         {
-            Message message = readCommand.makeReadMessage();
-            message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
-            IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
             Object[] result = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-            byte[] body = (byte[])result[0];
-            DataInputBuffer bufIn = new DataInputBuffer();
-            bufIn.reset(body, body.length);
-            ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
-            row = response.row();
+            body = (byte[])result[0];
         }
-        else
+        catch (TimeoutException e)
         {
-            logger_.warn(" Alert : Unable to find a suitable end point for the key : " + key );
+            throw new RuntimeException(e);
+            // TODO retry to a different endpoint
         }
-        return row;
+        DataInputBuffer bufIn = new DataInputBuffer();
+        bufIn.reset(body, body.length);
+        ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
+        return response.row();
     }
 
     static void touch_local(String tablename, String key, boolean fData ) throws IOException
@@ -292,85 +302,41 @@
 	            break;
         }
     }  
-        
-    public static Row readProtocol(String tablename, String key, String columnFamily, List<String> columnNames, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+
+    /**
+     * Performs the actual reading of a row out of the StorageService, fetching
+     * a specific set of column names from a given column family.
+     */
+    public static Row readProtocol(ReadCommand command, StorageService.ConsistencyLevel consistencyLevel)
+    throws IOException, ColumnFamilyNotDefinedException, TimeoutException
     {
+        assert command.key != null;
+        long startTime = System.currentTimeMillis();
         Row row = null;
-        boolean foundLocal = false;
-        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
-        for(EndPoint endPoint: endpoints)
-        {
-            if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
-            {
-                foundLocal = true;
-                break;
-            }
-        }   
-        if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
-        {
-            ReadCommand readCommand = null;
-            readCommand = new ReadCommand(tablename, key, columnFamily, columnNames);
-            return doReadProtocol(key, readCommand);
-        }
-        else
+        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
+
+        if (consistencyLevel == StorageService.ConsistencyLevel.WEAK)
         {
-            switch ( consistencyLevel )
+            boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
+            if (foundLocal)
             {
-            case WEAK:
-                row = weakReadProtocol(tablename, key, columnFamily, columnNames);
-                break;
-                
-            case STRONG:
-                row = strongReadProtocol(tablename, key, columnFamily, columnNames);
-                break;
-                
-            default:
-                row = weakReadProtocol(tablename, key, columnFamily, columnNames);
-                break;
+                row = weakReadLocal(command);
             }
-        }
-        return row;                
-    }
-        
-    public static Row readProtocol(String tablename, String key, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
-    {
-        Row row = null;
-        boolean foundLocal = false;
-        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
-        for(EndPoint endPoint: endpoints)
-        {
-            if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
+            else
             {
-                foundLocal = true;
-                break;
+                row = weakReadRemote(command);
             }
-        }   
-        if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
-        {
-            ReadCommand readCommand = null;
-            readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
-            return doReadProtocol(key, readCommand);
         }
         else
         {
-            switch ( consistencyLevel )
-            {
-            case WEAK:
-                row = weakReadProtocol(tablename, key, columnFamily, start, count);
-                break;
-                
-            case STRONG:
-                row = strongReadProtocol(tablename, key, columnFamily, start, count);
-                break;
-                
-            default:
-                row = weakReadProtocol(tablename, key, columnFamily, start, count);
-                break;
-            }
+            assert consistencyLevel == StorageService.ConsistencyLevel.STRONG;
+            row = strongRead(command);
         }
+
+        logger_.debug("Finished reading " + row + " in " + (System.currentTimeMillis() - startTime) + " ms.");
         return row;
     }
-    
+
     public static Map<String, Row> readProtocol(String tablename, String[] keys, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
     {
         Map<String, Row> rows = new HashMap<String, Row>();        
@@ -390,105 +356,7 @@
         }
         return rows;
     }
-    
-    public static Row readProtocol(String tablename, String key, String columnFamily, long sinceTimestamp, StorageService.ConsistencyLevel consistencyLevel) throws Exception
-    {
-        Row row = null;
-        boolean foundLocal = false;
-        EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(key);
-        for(EndPoint endPoint: endpoints)
-        {
-            if(endPoint.equals(StorageService.getLocalStorageEndPoint()))
-            {
-                foundLocal = true;
-                break;
-            }
-        }   
-        if(!foundLocal && consistencyLevel == StorageService.ConsistencyLevel.WEAK)
-        {
-            ReadCommand readCommand = null;
-            readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
-            return doReadProtocol(key, readCommand);
-        }
-        else
-        {
-            switch ( consistencyLevel )
-            {
-            case WEAK:
-                row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
-                break;
-                
-            case STRONG:
-                row = strongReadProtocol(tablename, key, columnFamily, sinceTimestamp);
-                break;
-                
-            default:
-                row = weakReadProtocol(tablename, key, columnFamily, sinceTimestamp);
-                break;
-            }
-        }
-        return row;
-    }
 
-    public static Row strongReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
-    {       
-        long startTime = System.currentTimeMillis();        
-        // TODO: throw a thrift exception if we do not have N nodes
-        ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
-        
-        ReadCommand readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, columns);
-        readCommandDigestOnly.setDigestQuery(true);
-        
-        Row row = StorageProxy.doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
-        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");     
-        return row;
-    }
-    
-    /**
-      * This function executes the read protocol.
-      * 1. Get the N nodes from storage service where the data needs to be replicated
-      * 2. Construct a message for read\write
-      * 3. Set one of teh messages to get teh data and teh rest to get teh digest
-      * 4. SendRR ( to all the nodes above )
-      * 5. Wait for a response from atleast X nodes where X <= N and teh data node
-      * 6. If the digest matches return teh data.
-      * 7. else carry out read repair by getting data from all the nodes.
-      * @param tablename the name of the table
-      * @param key the row key identifier
-      * @param columnFamily the column in Cassandra format
-      * @start the start position
-      * @count the number of columns we are interested in
-      * @throws IOException, TimeoutException
-     */
-    public static Row strongReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws IOException, TimeoutException
-    {       
-        long startTime = System.currentTimeMillis();        
-        // TODO: throw a thrift exception if we do not have N nodes
-        ReadCommand readCommand = null;
-        ReadCommand readCommandDigestOnly = null;
-        if( start >= 0 && count < Integer.MAX_VALUE)
-        {
-            readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
-        }
-        else
-        {
-            readCommand = new ReadCommand(tablename, key, columnFamily);
-        }
-        Message message = readCommand.makeReadMessage();
-        if( start >= 0 && count < Integer.MAX_VALUE)
-        {
-            readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, start, count);
-        }
-        else
-        {
-            readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily);
-        }
-        readCommandDigestOnly.setDigestQuery(true);
-        Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
-        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
-        return row;
-    }
-    
     /**
      * This is a multiget version of the above method.
      * @param tablename
@@ -531,101 +399,91 @@
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
         return rows;
     }
-    
-    public static Row strongReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws IOException, TimeoutException
-    {       
-        long startTime = System.currentTimeMillis();        
-        // TODO: throw a thrift exception if we do not have N nodes
-        ReadCommand readCommand = null;
-        ReadCommand readCommandDigestOnly = null;
-        readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
-        Message message = readCommand.makeReadMessage();
-        readCommandDigestOnly = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
-        readCommandDigestOnly.setDigestQuery(true);
-        Row row = doStrongReadProtocol(key, readCommand, readCommandDigestOnly);
-        logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
-        return row;
-    }
 
-    /**
-     *  This method performs the read from the replicas.
-     *  param @ key - key for which the data is required.
-     *  param @ readMessage - the read message to get the actual data
-     *  param @ readMessageDigest - the read message to get the digest.
-    */
-    private static Row doStrongReadProtocol(String key, ReadCommand readCommand, ReadCommand readCommandDigest) throws IOException, TimeoutException
+    /*
+     * This function executes the read protocol.
+        // 1. Get the N nodes from storage service where the data needs to be
+        // replicated
+        // 2. Construct a message for read\write
+         * 3. Set one of teh messages to get teh data and teh rest to get teh digest
+        // 4. SendRR ( to all the nodes above )
+        // 5. Wait for a response from atleast X nodes where X <= N and teh data node
+         * 6. If the digest matches return teh data.
+         * 7. else carry out read repair by getting data from all the nodes.
+        // 5. return success
+     */
+    private static Row strongRead(ReadCommand command) throws IOException, TimeoutException
     {
+        // TODO: throw a thrift exception if we do not have N nodes
+
+        ReadCommand readMessageDigestOnly = command.copy();
+        readMessageDigestOnly.setDigestQuery(true);
+
         Row row = null;
-        Message message = readCommand.makeReadMessage();
-        Message messageDigestOnly = readCommandDigest.makeReadMessage();
-        
+        Message message = command.makeReadMessage();
+        Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
+
         IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
         QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
                 DatabaseDescriptor.getReplicationFactor(),
                 readResponseResolver);
-        EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(key);
-        List<EndPoint> endpointList = new ArrayList<EndPoint>( Arrays.asList( StorageService.instance().getNStorageEndPoint(key) ) );
-        /* Remove the local storage endpoint from the list. */ 
-        endpointList.remove( dataPoint );
+        EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
+        List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
+        /* Remove the local storage endpoint from the list. */
+        endpointList.remove(dataPoint);
         EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
         Message messages[] = new Message[endpointList.size() + 1];
-        
-        /* 
+
+        /*
          * First message is sent to the node that will actually get
-         * the data for us. The other two replicas are only sent a 
+         * the data for us. The other two replicas are only sent a
          * digest query.
         */
         endPoints[0] = dataPoint;
-        messages[0] = message;        
-        for (int i=1; i < endPoints.length ; i++)
+        messages[0] = message;
+        for (int i = 1; i < endPoints.length; i++)
         {
-            endPoints[i] = endpointList.get(i-1);
+            endPoints[i] = endpointList.get(i - 1);
             messages[i] = messageDigestOnly;
         }
-        
+        logger_.debug("strongread reading " + command + " from " + StringUtils.join(endPoints, ", "));
+
         try
         {
-            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);            
+            MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+
             long startTime2 = System.currentTimeMillis();
             row = quorumResponseHandler.get();
-            logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2)
-                    + " ms.");
-            if (row == null)
-            {
-                logger_.info("ERROR No row for this key .....: " + key);
-                // TODO: throw a thrift exception 
-                return row;
-            }
+            logger_.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
         }
         catch (DigestMismatchException ex)
         {
             if ( DatabaseDescriptor.getConsistencyCheck())
             {
-	            IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
-	            QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
-	                    DatabaseDescriptor.getReplicationFactor(),
-	                    readResponseResolverRepair);
-	            readCommand.setDigestQuery(false);
-	            logger_.info("DigestMismatchException: " + key);            
-	            Message messageRepair = readCommand.makeReadMessage();
-	            MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints, quorumResponseHandlerRepair);
-	            try
-	            {
-	                row = quorumResponseHandlerRepair.get();
-	            }
-	            catch(DigestMismatchException dex)
-	            {
-	                logger_.warn(LogUtil.throwableToString(dex));
-	            }
-	            if (row == null)
-	            {
-	                logger_.info("ERROR No row for this key .....: " + key);                
-	            }
+                IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+                QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
+                        DatabaseDescriptor.getReplicationFactor(),
+                        readResponseResolverRepair);
+                command.setDigestQuery(false);
+                logger_.info("DigestMismatchException: " + command.key);
+                Message messageRepair = command.makeReadMessage();
+                MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
+                                                               quorumResponseHandlerRepair);
+                try
+                {
+                    row = quorumResponseHandlerRepair.get();
+                }
+                catch (DigestMismatchException e)
+                {
+                    // TODO should this be a thrift exception?
+                    throw new RuntimeException(e);
+                }
             }
-        }        
+        }
+
         return row;
     }
-    
+
     private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]> readMessages) throws IOException
     {
         Map<String, Message[]> messages = new HashMap<String, Message[]>();
@@ -720,38 +578,7 @@
         }
         return rows;
     }
-    
-    /**
-     * This version is used to retrieve the row associated with
-     * the specified key
-     * @param tablename name of the table that needs to be queried
-     * @param keys keys whose values we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param columns the columns we are interested in
-     * @return the interested row
-     * @throws Exception
-     */
-    public static Row weakReadProtocol(String tablename, String key, String columnFamily, List<String> columns) throws Exception
-    {       
-        long startTime = System.currentTimeMillis();
-        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
-        /* Remove the local storage endpoint from the list. */ 
-        endpoints.remove( StorageService.getLocalStorageEndPoint() );
-        // TODO: throw a thrift exception if we do not have N nodes
-        
-        Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
-        Row row = table.getRow(key, columnFamily, columns);
-        
-        logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
-        /*
-         * Do the consistency checks in the background and return the
-         * non NULL row.
-         */
-        if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-            StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
-        return row;
-    }
-    
+
     /**
      * This version is used when results for multiple keys needs to be
      * retrieved.
@@ -782,56 +609,40 @@
         for ( String key : keys )
         {
             List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
-            /* Remove the local storage endpoint from the list. */ 
+            /* Remove the local storage endpoint from the list. */
             endpoints.remove( StorageService.getLocalStorageEndPoint() );
             if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
                 StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
         }
-        return rows;         
+        return rows;
     }
-    
-    /**
-     * This function executes the read protocol locally and should be used only if consistency is not a concern. 
-     * Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
-     * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
-     * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
-     * @param tablename name of the table that needs to be queried
-     * @param key key whose we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param start start index
-     * @param count the number of columns we are interested in
-     * @return the row associated with this key
-     * @throws Exception 
-     */
-    public static Row weakReadProtocol(String tablename, String key, String columnFamily, int start, int count) throws Exception
+
+    /*
+    * This function executes the read protocol locally and should be used only if consistency is not a concern.
+    * Read the data from the local disk and return if the row is NOT NULL. If the data is NULL do the read from
+    * one of the other replicas (in the same data center if possible) till we get the data. In the event we get
+    * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
+    */
+    private static Row weakReadLocal(ReadCommand command) throws IOException, ColumnFamilyNotDefinedException
     {
-        Row row = null;
-        long startTime = System.currentTimeMillis();
-        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
-        /* Remove the local storage endpoint from the list. */ 
-        endpoints.remove( StorageService.getLocalStorageEndPoint() );
+        logger_.debug("weakreadlocal for " + command);
+        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
+        /* Remove the local storage endpoint from the list. */
+        endpoints.remove(StorageService.getLocalStorageEndPoint());
         // TODO: throw a thrift exception if we do not have N nodes
-        
-        Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
-        if( start >= 0 && count < Integer.MAX_VALUE)
-        {
-            row = table.getRow(key, columnFamily, start, count);
-        }
-        else
-        {
-            row = table.getRow(key, columnFamily);
-        }
-        
-        logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
+
+        Table table = Table.open(DatabaseDescriptor.getTables().get(0));
+        Row row = command.getRow(table);
+
         /*
-         * Do the consistency checks in the background and return the
-         * non NULL row.
-         */
-        if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-        	StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
-        return row;         
+           * Do the consistency checks in the background and return the
+           * non NULL row.
+           */
+        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+            StorageService.instance().doConsistencyCheck(row, endpoints, command);
+        return row;
     }
-    
+
     /**
      * This version is used when results for multiple keys needs to be
      * retrieved.
@@ -870,38 +681,7 @@
         }
         return rows;         
     }
-    
-    /**
-     * This version is used when retrieving a single key.
-     * 
-     * @param tablename name of the table that needs to be queried
-     * @param key key whose we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param sinceTimestamp this is lower bound of the timestamp
-     * @return the row associated with this key
-     * @throws Exception
-     */
-    public static Row weakReadProtocol(String tablename, String key, String columnFamily, long sinceTimestamp) throws Exception
-    {
-        Row row = null;
-        long startTime = System.currentTimeMillis();
-        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
-        /* Remove the local storage endpoint from the list. */ 
-        endpoints.remove( StorageService.getLocalStorageEndPoint() );
-        // TODO: throw a thrift exception if we do not have N nodes
-        
-        Table table = Table.open( DatabaseDescriptor.getTables().get(0) );
-        row = table.getRow(key, columnFamily,sinceTimestamp);
-        logger_.debug("Local Read Protocol: " + (System.currentTimeMillis() - startTime) + " ms.");
-        /*
-         * Do the consistency checks in the background and return the
-         * non NULL row.
-         */
-        if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-        	StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
-        return row;         
-    }
-    
+
     /**
      * This version is used when results for multiple keys needs to be
      * retrieved.

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=766841&r1=766840&r2=766841&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Mon Apr 20 20:05:26 2009
@@ -565,18 +565,28 @@
      * sure that the N replicas are in sync. We do this in the
      * background when we do not care much about consistency.
      */
+    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, ReadCommand message)
+    {
+        Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, message.columnFamilyColumn,
+                                                              message.start, message.count, message.sinceTimestamp, message.columnNames);
+        consistencyManager_.submit(consistencySentinel);
+    }
+
+    @Deprecated
     public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, int start, int count)
 	{
 		Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, start, count);
 		consistencyManager_.submit(consistencySentinel);
 	}
-    
+
+    @Deprecated
     public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, long sinceTimestamp)
 	{
 		Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, sinceTimestamp);
 		consistencyManager_.submit(consistencySentinel);
 	}
 
+    @Deprecated
     public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, List<String> columns)
     {
     	Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, columns);