You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/25 17:46:20 UTC

svn commit: r768553 - in /incubator/cassandra/trunk: src/org/apache/cassandra/cql/common/ src/org/apache/cassandra/db/ src/org/apache/cassandra/service/ src/org/apache/cassandra/test/ test/org/apache/cassandra/db/

Author: jbellis
Date: Sat Apr 25 15:46:19 2009
New Revision: 768553

URL: http://svn.apache.org/viewvc?rev=768553&view=rev
Log:
split ReadCommand into separate classes for each type of command.  patch by Jun Rao; reviewed by jbellis

Added:
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java
Modified:
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
    incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
    incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
    incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
    incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SliceReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
@@ -100,7 +101,7 @@
         try
         {
             String key = (String)(rowKey_.get());
-            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, offset_, limit_);
+            ReadCommand readCommand = new SliceReadCommand(cfMetaData_.tableName, key, columnFamily_column, offset_, limit_);
             row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SliceReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
@@ -69,7 +70,7 @@
         try
         {
             String key = (String)(rowKey_.get());
-            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, cfMetaData_.cfName, offset_, limit_);
+            ReadCommand readCommand = new SliceReadCommand(cfMetaData_.tableName, key, cfMetaData_.cfName, offset_, limit_);
             row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
 import org.apache.cassandra.config.CFMetaData;
 import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.ReadCommand;
@@ -85,7 +86,7 @@
         try
         {
             String key = (String)(rowKey_.get());
-            ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, -1, Integer.MAX_VALUE);
+            ReadCommand readCommand = new ColumnReadCommand(cfMetaData_.tableName, key, columnFamily_column);
             row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
         }
         catch (Exception e)

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class ColumnReadCommand extends ReadCommand
+{
+    public final String columnFamilyColumn;
+
+    public ColumnReadCommand(String table, String key, String columnFamilyColumn)
+    {
+        super(table, key, CMD_TYPE_GET_COLUMN);
+        this.columnFamilyColumn = columnFamilyColumn;
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+        return values[0];
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new ColumnReadCommand(table, key, columnFamilyColumn);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+
+    @Override
+    public Row getRow(Table table) throws IOException    
+    {
+        return table.getRow(key, columnFamilyColumn);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetColumnReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamilyColumn='" + columnFamilyColumn + '\'' +
+               ')';
+    }
+}
+
+class ColumnReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    { 
+        ColumnReadCommand realRM = (ColumnReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamilyColumn);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily_column = dis.readUTF();
+        ColumnReadCommand rm = new ColumnReadCommand(table, key, columnFamily_column);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class ColumnsSinceReadCommand extends ReadCommand
+{
+    public final String columnFamily;
+    public final long sinceTimestamp;
+
+    public ColumnsSinceReadCommand(String table, String key, String columnFamily, long sinceTimestamp)
+    {
+        super(table, key, CMD_TYPE_GET_COLUMNS_SINCE);
+        this.columnFamily = columnFamily;
+        this.sinceTimestamp = sinceTimestamp;
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return columnFamily;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new ColumnsSinceReadCommand(table, key, columnFamily, sinceTimestamp);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+
+    @Override
+    public Row getRow(Table table) throws IOException
+    {        
+        return table.getRow(key, columnFamily, sinceTimestamp);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetColumnsSinceMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", sinceTimestamp='" + sinceTimestamp + '\'' +
+               ')';
+    }
+
+}
+
+class ColumnsSinceReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    {
+        ColumnsSinceReadCommand realRM = (ColumnsSinceReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamily);
+        dos.writeLong(realRM.sinceTimestamp);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily = dis.readUTF();
+        long sinceTimestamp = dis.readLong();
+
+        ColumnsSinceReadCommand rm = new ColumnsSinceReadCommand(table, key, columnFamily, sinceTimestamp);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java Sat Apr 25 15:46:19 2009
@@ -22,26 +22,24 @@
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Arrays;
-import java.util.Collections;
-
-import org.apache.commons.lang.StringUtils;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.cassandra.io.ICompactSerializer;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.service.StorageService;
 
 
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ReadCommand
+public abstract class ReadCommand
 {
     public static final String DO_REPAIR = "READ-REPAIR";
-
+    public static final byte CMD_TYPE_GET_ROW=1;
+    public static final byte CMD_TYPE_GET_COLUMN=2;
+    public static final byte CMD_TYPE_GET_SLICE_BY_NAMES=3;
+    public static final byte CMD_TYPE_GET_COLUMNS_SINCE=4;
+    public static final byte CMD_TYPE_GET_SLICE=5;
+    public static final String EMPTY_CF = "";
+    
     private static ReadCommandSerializer serializer = new ReadCommandSerializer();
 
     public static ReadCommandSerializer serializer()
@@ -49,8 +47,6 @@
         return serializer;
     }
 
-    private static List<String> EMPTY_COLUMNS = Arrays.asList(new String[0]);
-
     public Message makeReadMessage() throws IOException
     {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -60,57 +56,17 @@
     }
 
     public final String table;
-
     public final String key;
+    private boolean isDigestQuery = false;    
+    protected final byte commandType;
 
-    public final String columnFamilyColumn;
-
-    public final int start;
-
-    public final int count;
-
-    public final long sinceTimestamp;
-
-    public final List<String> columnNames;
-
-    private boolean isDigestQuery = false;
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, int start, int count, long sinceTimestamp, List<String> columnNames)
+    protected ReadCommand(String table, String key, byte cmdType)
     {
         this.table = table;
         this.key = key;
-        this.columnFamilyColumn = columnFamilyColumn;
-        this.start = start;
-        this.count = count;
-        this.sinceTimestamp = sinceTimestamp;
-        this.columnNames = Collections.unmodifiableList(columnNames);
+        this.commandType = cmdType;
     }
-
-    public ReadCommand(String table, String key)
-    {
-        this(table, key, null, -1, -1, -1, EMPTY_COLUMNS);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn)
-    {
-        this(table, key, columnFamilyColumn, -1, -1, -1, EMPTY_COLUMNS);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, List<String> columnNames)
-    {
-        this(table, key, columnFamilyColumn, -1, -1, -1, columnNames);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, int start, int count)
-    {
-        this(table, key, columnFamilyColumn, start, count, -1, EMPTY_COLUMNS);
-    }
-
-    public ReadCommand(String table, String key, String columnFamilyColumn, long sinceTimestamp)
-    {
-        this(table, key, columnFamilyColumn, -1, -1, sinceTimestamp, EMPTY_COLUMNS);
-    }
-
+    
     public boolean isDigestQuery()
     {
         return isDigestQuery;
@@ -121,99 +77,37 @@
         this.isDigestQuery = isDigestQuery;
     }
 
-    public ReadCommand copy()
-    {
-        return new ReadCommand(table, key, columnFamilyColumn, start, count, sinceTimestamp, columnNames);
-    }
-
-    public Row getRow(Table table) throws IOException
-    {
-        if (!columnNames.isEmpty())
-        {
-            return table.getRow(key, columnFamilyColumn, columnNames);
-        }
-
-        if (sinceTimestamp > 0)
-        {
-            return table.getRow(key, columnFamilyColumn, sinceTimestamp);
-        }
-
-        if (start > 0 || (count > 0 && count < Integer.MAX_VALUE))
-        {
-            return table.getRow(key, columnFamilyColumn, start, count);
-        }
+    public abstract String getColumnFamilyName();
+    
+    public abstract ReadCommand copy();
 
-        return table.getRow(key, columnFamilyColumn);
-    }
-
-    public String toString()
-    {
-        return "ReadMessage(" +
-               "table='" + table + '\'' +
-               ", key='" + key + '\'' +
-               ", columnFamilyColumn='" + columnFamilyColumn + '\'' +
-               ", start=" + start +
-               ", count=" + count +
-               ", sinceTimestamp=" + sinceTimestamp +
-               ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
-               ')';
-    }
+    public abstract Row getRow(Table table) throws IOException;
 }
 
 class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
 {
+    private static final Map<Byte, ReadCommandSerializer> CMD_SERIALIZER_MAP = new HashMap<Byte, ReadCommandSerializer>(); 
+    static 
+    {
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_ROW, new RowReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_COLUMN, new ColumnReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_COLUMNS_SINCE, new ColumnsSinceReadCommandSerializer());
+        CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE, new SliceReadCommandSerializer());
+    }
+
+
     public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
     {
-        dos.writeUTF(rm.table);
-        dos.writeUTF(rm.key);
-        dos.writeUTF(rm.columnFamilyColumn);
-        dos.writeInt(rm.start);
-        dos.writeInt(rm.count);
-        dos.writeLong(rm.sinceTimestamp);
-        dos.writeBoolean(rm.isDigestQuery());
-        dos.writeInt(rm.columnNames.size());
-        if (rm.columnNames.size() > 0)
-        {
-            for (String cName : rm.columnNames)
-            {
-                dos.writeInt(cName.getBytes().length);
-                dos.write(cName.getBytes());
-            }
-        }
+        dos.writeByte(rm.commandType);
+        ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
+        ser.serialize(rm, dos);
     }
 
     public ReadCommand deserialize(DataInputStream dis) throws IOException
     {
-        String table = dis.readUTF();
-        String key = dis.readUTF();
-        String columnFamily_column = dis.readUTF();
-        int start = dis.readInt();
-        int count = dis.readInt();
-        long sinceTimestamp = dis.readLong();
-        boolean isDigest = dis.readBoolean();
-
-        int size = dis.readInt();
-        List<String> columns = new ArrayList<String>();
-        for (int i = 0; i < size; ++i)
-        {
-            byte[] bytes = new byte[dis.readInt()];
-            dis.readFully(bytes);
-            columns.add(new String(bytes));
-        }
-        ReadCommand rm = null;
-        if (columns.size() > 0)
-        {
-            rm = new ReadCommand(table, key, columnFamily_column, columns);
-        }
-        else if (sinceTimestamp > 0)
-        {
-            rm = new ReadCommand(table, key, columnFamily_column, sinceTimestamp);
-        }
-        else
-        {
-            rm = new ReadCommand(table, key, columnFamily_column, start, count);
-        }
-        rm.setDigestQuery(isDigest);
-        return rm;
+        byte msgType = dis.readByte();
+        return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis);
     }
+        
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java Sat Apr 25 15:46:19 2009
@@ -118,28 +118,11 @@
     
     private void doReadRepair(Row row, ReadCommand readCommand)
     {
-        if ( DatabaseDescriptor.getConsistencyCheck() )
-        {
-            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
-            /* Remove the local storage endpoint from the list. */ 
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
+        List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
+        /* Remove the local storage endpoint from the list. */ 
+        endpoints.remove( StorageService.getLocalStorageEndPoint() );
             
-            if(readCommand.columnNames.size() == 0)
-            {
-                if( readCommand.start >= 0 && readCommand.count < Integer.MAX_VALUE)
-                {                
-                    StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamilyColumn, readCommand.start, readCommand.count);
-                }
-                
-                if( readCommand.sinceTimestamp > 0)
-                {                    
-                    StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamilyColumn, readCommand.sinceTimestamp);
-                }                
-            }
-            else
-            {
-                StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamilyColumn, readCommand.columnNames);
-            }
-        }
+        if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+            StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
     }     
 }

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class RowReadCommand extends ReadCommand
+{
+    public RowReadCommand(String table, String key)
+    {
+        super(table, key, CMD_TYPE_GET_ROW);
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return null;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new RowReadCommand(table, key);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+
+    @Override
+    public Row getRow(Table table) throws IOException    
+    {
+        return table.get(key);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetColumnReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ')';
+    }
+
+}
+
+class RowReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    { 
+        RowReadCommand realRM = (RowReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        RowReadCommand rm = new RowReadCommand(table, key);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
+public class SliceByNamesReadCommand extends ReadCommand
+{
+    public final String columnFamily;
+    public final List<String> columnNames;
+
+    public SliceByNamesReadCommand(String table, String key, String columnFamily, List<String> columnNames)
+    {
+        super(table, key, CMD_TYPE_GET_SLICE_BY_NAMES);
+        this.columnFamily = columnFamily;
+        this.columnNames = Collections.unmodifiableList(columnNames);
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return columnFamily;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new SliceByNamesReadCommand(table, key, columnFamily, columnNames);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+    
+    @Override
+    public Row getRow(Table table) throws IOException
+    {        
+        return table.getRow(key, columnFamily, columnNames);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetSliceByNamesReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
+               ')';
+    }
+
+}
+
+class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    {
+        SliceByNamesReadCommand realRM = (SliceByNamesReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamily);
+        dos.writeInt(realRM.columnNames.size());
+        if (realRM.columnNames.size() > 0)
+        {
+            for (String cName : realRM.columnNames)
+            {
+                dos.writeInt(cName.getBytes().length);
+                dos.write(cName.getBytes());
+            }
+        }
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily = dis.readUTF();
+
+        int size = dis.readInt();
+        List<String> columns = new ArrayList<String>();
+        for (int i = 0; i < size; ++i)
+        {
+            byte[] bytes = new byte[dis.readInt()];
+            dis.readFully(bytes);
+            columns.add(new String(bytes));
+        }
+        SliceByNamesReadCommand rm = new SliceByNamesReadCommand(table, key, columnFamily, columns);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class SliceReadCommand extends ReadCommand
+{
+    public final String columnFamily;
+    public final int start;
+    public final int count;
+
+    public SliceReadCommand(String table, String key, String columnFamily, int start, int count)
+    {
+        super(table, key, CMD_TYPE_GET_SLICE);
+        this.columnFamily = columnFamily;
+        this.start = start;
+        this.count = count;
+    }
+
+    @Override
+    public String getColumnFamilyName()
+    {
+        return columnFamily;
+    }
+
+    @Override
+    public ReadCommand copy()
+    {
+        ReadCommand readCommand= new SliceReadCommand(table, key, columnFamily, start, count);
+        readCommand.setDigestQuery(isDigestQuery());
+        return readCommand;
+    }
+    
+    @Override
+    public Row getRow(Table table) throws IOException
+    {
+        return table.getRow(key, columnFamily, start, count);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "GetSliceReadMessage(" +
+               "table='" + table + '\'' +
+               ", key='" + key + '\'' +
+               ", columnFamily='" + columnFamily + '\'' +
+               ", start='" + start + '\'' +
+               ", count='" + count + '\'' +
+               ')';
+    }
+}
+
+class SliceReadCommandSerializer extends ReadCommandSerializer
+{
+    @Override
+    public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+    {
+        SliceReadCommand realRM = (SliceReadCommand)rm;
+        dos.writeBoolean(realRM.isDigestQuery());
+        dos.writeUTF(realRM.table);
+        dos.writeUTF(realRM.key);
+        dos.writeUTF(realRM.columnFamily);
+        dos.writeInt(realRM.start);
+        dos.writeInt(realRM.count);
+    }
+
+    @Override
+    public ReadCommand deserialize(DataInputStream dis) throws IOException
+    {
+        boolean isDigest = dis.readBoolean();
+        String table = dis.readUTF();
+        String key = dis.readUTF();
+        String columnFamily = dis.readUTF();
+        int start = dis.readInt();
+        int count = dis.readInt();
+        
+        SliceReadCommand rm = new SliceReadCommand(table, key, columnFamily, start, count);
+        rm.setDigestQuery(isDigest);
+        return rm;
+    }
+}

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Sat Apr 25 15:46:19 2009
@@ -36,6 +36,10 @@
 import org.apache.cassandra.cql.common.CqlResult;
 import org.apache.cassandra.cql.driver.CqlDriver;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnReadCommand;
+import org.apache.cassandra.db.ColumnsSinceReadCommand;
+import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.SliceReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
@@ -102,12 +106,8 @@
     
 	protected ColumnFamily readColumnFamily(ReadCommand command) throws InvalidRequestException
     {
-        String[] values = RowMutation.getColumnAndColumnFamily(command.columnFamilyColumn);
-        if( values.length < 1 )
-        {
-            throw new ColumnFamilyNotDefinedException("Empty column Family is invalid.");
-        }
-        validateCommand(command.key, command.table, values[0]);
+        String cfName = command.getColumnFamilyName();
+        validateCommand(command.key, command.table, cfName);
 
         Row row;
         try
@@ -127,7 +127,7 @@
         {
             return null;
         }
-        return row.getColumnFamily(values[0]);
+        return row.getColumnFamily(cfName);
 	}
 
     public List<column_t> thriftifyColumns(Collection<IColumn> columns)
@@ -156,7 +156,7 @@
         long startTime = System.currentTimeMillis();
         try
         {
-            ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, timeStamp));
+            ColumnFamily cfamily = readColumnFamily(new ColumnsSinceReadCommand(tablename, key, columnFamily_column, timeStamp));
             String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
             if (cfamily == null)
             {
@@ -188,7 +188,7 @@
         long startTime = System.currentTimeMillis();
         try
         {
-            ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, columnNames));
+            ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, columnNames));
             if (cfamily == null)
             {
                 return EMPTY_COLUMNS;
@@ -209,7 +209,7 @@
 		try
 		{
 	        String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-            ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, start, count));
+            ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, start, count));
             if (cfamily == null)
 			{
                 return EMPTY_COLUMNS;
@@ -241,7 +241,7 @@
         {
             throw new InvalidRequestException("get_column requires both parts of columnfamily:column");
         }
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
+        ColumnFamily cfamily = readColumnFamily(new ColumnReadCommand(tablename, key, columnFamily_column));
         if (cfamily == null)
         {
             throw new NotFoundException();
@@ -277,7 +277,7 @@
     public int get_column_count(String tablename, String key, String columnFamily_column) throws InvalidRequestException
     {
         String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
+        ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
         if (cfamily == null)
         {
             return 0;
@@ -367,7 +367,7 @@
         long startTime = System.currentTimeMillis();
 		try
 		{
-			ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, superColumnNames));
+			ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, superColumnNames));
 			if (cfamily == null)
 			{
                 return EMPTY_SUPERCOLUMNS;
@@ -405,7 +405,7 @@
 
     public List<superColumn_t> get_slice_super(String tablename, String key, String columnFamily_superColumnName, int start, int count) throws InvalidRequestException
     {
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_superColumnName, start, count));
+        ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_superColumnName, start, count));
         if (cfamily == null)
         {
             return EMPTY_SUPERCOLUMNS;
@@ -416,7 +416,7 @@
     
     public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws InvalidRequestException, NotFoundException
     {
-        ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
+        ColumnFamily cfamily = readColumnFamily(new ColumnReadCommand(tablename, key, columnFamily_column));
         if (cfamily == null)
         {
             throw new NotFoundException();

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java Sat Apr 25 15:46:19 2009
@@ -149,41 +149,17 @@
 	private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
 	private Row row_;
 	protected List<EndPoint> replicas_;
-	private String columnFamily_;
-	private int start_;
-	private int count_;
-	private long sinceTimestamp_;
-	private List<String> columnNames_ = new ArrayList<String>();	
 	
-    public ConsistencyManager(Row row_, List<EndPoint> replicas_, String columnFamily_, int start_, int count_, long sinceTimestamp_, List<String> columnNames_)
+	private ReadCommand readCommand_;
+	
+    public ConsistencyManager(Row row_, List<EndPoint> replicas_, ReadCommand readCommand)
     {
-        this.row_ = row_;
-        this.replicas_ = replicas_;
-        this.columnFamily_ = columnFamily_;
-        this.start_ = start_;
-        this.count_ = count_;
-        this.sinceTimestamp_ = sinceTimestamp_;
-        this.columnNames_ = columnNames_;
+        this.readCommand_ = readCommand;
     }
 
-    ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
-	{
-        this(row, replicas, columnFamily, 0, 0, 0, columns);
-	}
-
-	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, int start, int count)
-	{
-        this(row, replicas, columnFamily, start, count, 0, null);
-	}
-
-	ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, long sinceTimestamp)
-	{
-        this(row, replicas, columnFamily, 0, 0, sinceTimestamp, null);
-	}
-
 	public void run()
 	{
-		logger_.debug(" Run the consistency checks for " + columnFamily_);		
+		logger_.debug(" Run the consistency checks for " + readCommand_.getColumnFamilyName());		
         ReadCommand readCommandDigestOnly = constructReadMessage(true);
 		try
 		{
@@ -199,29 +175,7 @@
     
     private ReadCommand constructReadMessage(boolean isDigestQuery)
     {
-        ReadCommand readCommand = null;
-        String table = DatabaseDescriptor.getTables().get(0);
-        
-        if(columnNames_.size() == 0)
-        {
-            if( start_ >= 0 && count_ < Integer.MAX_VALUE)
-            {
-                readCommand = new ReadCommand(table, row_.key(), columnFamily_, start_, count_);
-            }
-            else if(sinceTimestamp_ > 0)
-            {
-                readCommand = new ReadCommand(table, row_.key(), columnFamily_, sinceTimestamp_);
-            }
-            else
-            {
-                readCommand = new ReadCommand(table, row_.key(), columnFamily_);
-            }
-        }
-        else
-        {
-            readCommand = new ReadCommand(table, row_.key(), columnFamily_, columnNames_);
-            
-        }
+        ReadCommand readCommand = readCommand_.copy();
         readCommand.setDigestQuery(isDigestQuery);
         return readCommand;
     }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Sat Apr 25 15:46:19 2009
@@ -334,21 +334,21 @@
         return row;
     }
 
-    public static Map<String, Row> readProtocol(String tablename, String[] keys, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+    public static Map<String, Row> readProtocol(String[] keys, ReadCommand readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception
     {
         Map<String, Row> rows = new HashMap<String, Row>();        
         switch ( consistencyLevel )
         {
             case WEAK:
-                rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+                rows = weakReadProtocol(keys, readCommand);
                 break;
                 
             case STRONG:
-                rows = strongReadProtocol(tablename, keys, columnFamily, start, count);
+                rows = strongReadProtocol(keys, readCommand);
                 break;
                 
             default:
-                rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+                rows = weakReadProtocol(keys, readCommand);
                 break;
         }
         return rows;
@@ -365,7 +365,7 @@
      * @throws IOException
      * @throws TimeoutException
      */
-    public static Map<String, Row> strongReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws IOException, TimeoutException
+    public static Map<String, Row> strongReadProtocol(String[] keys, ReadCommand readCommand) throws IOException, TimeoutException
     {       
         Map<String, Row> rows = new HashMap<String, Row>();
         long startTime = System.currentTimeMillis();        
@@ -374,23 +374,10 @@
         for (String key : keys )
         {
             ReadCommand[] readParameters = new ReadCommand[2];
-            if( start >= 0 && count < Integer.MAX_VALUE)
-            {
-                readParameters[0] = new ReadCommand(tablename, key, columnFamily, start, count);
-            }
-            else
-            {
-                readParameters[0] = new ReadCommand(tablename, key, columnFamily);
-            }            
-            if( start >= 0 && count < Integer.MAX_VALUE)
-            {
-                readParameters[1] = new ReadCommand(tablename, key, columnFamily, start, count);
-            }
-            else
-            {
-                readParameters[1] = new ReadCommand(tablename, key, columnFamily);
-            }
+            readParameters[0] = readCommand.copy();
+            readParameters[1] = readCommand.copy();
             readParameters[1].setDigestQuery(true);
+            readMessages.put(key, readParameters);
         }        
         rows = doStrongReadProtocol(readMessages);         
         logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
@@ -586,15 +573,15 @@
      * @return a mapping of key --> Row
      * @throws Exception
      */
-    public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, List<String> columns) throws Exception
+    public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand readCommand) throws Exception
     {
         Row row = null;
         long startTime = System.currentTimeMillis();
         Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
         for ( String key : keys )
         {
-            ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
-            readMessages.put(key, readCommand);
+            ReadCommand readCmd = readCommand.copy();
+            readMessages.put(key, readCmd);
         }
         /* Performs the multiget in parallel */
         Map<String, Row> rows = doReadProtocol(readMessages);
@@ -608,7 +595,7 @@
             /* Remove the local storage endpoint from the list. */
             endpoints.remove( StorageService.getLocalStorageEndPoint() );
             if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
+                StorageService.instance().doConsistencyCheck(row, endpoints, readMessages.get(key));
         }
         return rows;
     }
@@ -638,81 +625,4 @@
             StorageService.instance().doConsistencyCheck(row, endpoints, command);
         return row;
     }
-
-    /**
-     * This version is used when results for multiple keys needs to be
-     * retrieved.
-     * 
-     * @param tablename name of the table that needs to be queried
-     * @param keys keys whose values we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param start start index
-     * @param count the number of columns we are interested in
-     * @return a mapping of key --> Row
-     * @throws Exception
-     */
-    public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws Exception
-    {
-        Row row = null;
-        long startTime = System.currentTimeMillis();
-        Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
-        for ( String key : keys )
-        {
-            ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
-            readMessages.put(key, readCommand);
-        }
-        /* Performs the multiget in parallel */
-        Map<String, Row> rows = doReadProtocol(readMessages);
-        /*
-         * Do the consistency checks for the keys that are being queried
-         * in the background.
-        */
-        for ( String key : keys )
-        {
-            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
-            /* Remove the local storage endpoint from the list. */ 
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
-            if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
-        }
-        return rows;         
-    }
-
-    /**
-     * This version is used when results for multiple keys needs to be
-     * retrieved.
-     * 
-     * @param tablename name of the table that needs to be queried
-     * @param keys keys whose values we are interested in 
-     * @param columnFamily name of the "column" we are interested in
-     * @param sinceTimestamp this is lower bound of the timestamp
-     * @return a mapping of key --> Row
-     * @throws Exception
-     */
-    public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, long sinceTimestamp) throws Exception
-    {
-        Row row = null;
-        long startTime = System.currentTimeMillis();
-        Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
-        for ( String key : keys )
-        {
-            ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
-            readMessages.put(key, readCommand);
-        }
-        /* Performs the multiget in parallel */
-        Map<String, Row> rows = doReadProtocol(readMessages);
-        /*
-         * Do the consistency checks for the keys that are being queried
-         * in the background.
-        */
-        for ( String key : keys )
-        {
-            List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
-            /* Remove the local storage endpoint from the list. */ 
-            endpoints.remove( StorageService.getLocalStorageEndPoint() );
-            if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
-                StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
-        }
-        return rows;         
-    }
 }

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Sat Apr 25 15:46:19 2009
@@ -567,32 +567,10 @@
      */
     public void doConsistencyCheck(Row row, List<EndPoint> endpoints, ReadCommand message)
     {
-        Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, message.columnFamilyColumn,
-                                                              message.start, message.count, message.sinceTimestamp, message.columnNames);
+        Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, message);
         consistencyManager_.submit(consistencySentinel);
     }
 
-    @Deprecated
-    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, int start, int count)
-	{
-		Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, start, count);
-		consistencyManager_.submit(consistencySentinel);
-	}
-
-    @Deprecated
-    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, long sinceTimestamp)
-	{
-		Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, sinceTimestamp);
-		consistencyManager_.submit(consistencySentinel);
-	}
-
-    @Deprecated
-    public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, List<String> columns)
-    {
-    	Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, columns);
-		consistencyManager_.submit(consistencySentinel);
-    }
-
     /*
      * This method displays all the ranges and the replicas
      * that are responsible for the individual ranges. The

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java Sat Apr 25 15:46:19 2009
@@ -42,6 +42,7 @@
 import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
 import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowReadCommand;
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.ReadResponse;
@@ -880,7 +881,7 @@
 				key = user + ":1";
 			}
 
-			ReadCommand readCommand = new ReadCommand(tablename_, key);
+			ReadCommand readCommand = new RowReadCommand(tablename_, key);
 			Message message = new Message(from_, StorageService.readStage_,
 					StorageService.readVerbHandler_,
 					new Object[] {readCommand});

Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java Sat Apr 25 15:46:19 2009
@@ -29,6 +29,7 @@
 
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.db.ColumnReadCommand;
 import org.apache.cassandra.db.ReadCommand;
 import org.apache.cassandra.db.Row;
 import org.apache.cassandra.db.RowMutation;
@@ -180,7 +181,7 @@
 	            String stringKey = new Integer(key).toString();
 	            stringKey = stringKey + keyFix_ ;
             	int j = random.nextInt(columns) + 1;
-	            ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
+	            ReadCommand rm = new ColumnReadCommand(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
 	            readLoad(rm);
 				if ( requestsPerSecond_ > 1000)
 					Thread.sleep(0, 1000000000/requestsPerSecond_);
@@ -250,7 +251,7 @@
 	            stringKey = stringKey + keyFix_ ;
             	int i = random.nextInt(superColumns) + 1;
             	int j = random.nextInt(columns) + 1;
-	            ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+	            ReadCommand rm = new ColumnReadCommand(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
 	            readLoad(rm);
 			}
 		}

Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java Sat Apr 25 15:46:19 2009
@@ -18,10 +18,27 @@
         ArrayList<String> colList = new ArrayList<String>();
         colList.add("col1");
         colList.add("col2");
+        
+        ReadCommand rm, rm2;
+        
+        rm = new SliceByNamesReadCommand("Table1", "row1", "foo", colList);
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
+
+        rm = new ColumnReadCommand("Table1", "row1", "foo:col1");
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
 
-        ReadCommand rm = new ReadCommand("Table1", "row1", "foo", colList);
-        ReadCommand rm2 = serializeAndDeserializeReadMessage(rm);
+        rm = new RowReadCommand("Table1", "row1");
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
+
+        rm = new ColumnsSinceReadCommand("Table1", "row1", "foo", 1);
+        rm2 = serializeAndDeserializeReadMessage(rm);
+        assert rm2.toString().equals(rm.toString());
 
+        rm = new SliceReadCommand("Table1", "row1", "foo", 1, 2);
+        rm2 = serializeAndDeserializeReadMessage(rm);
         assert rm2.toString().equals(rm.toString());
     }
 
@@ -56,7 +73,7 @@
         rm.add("Standard1:Column1", "abcd".getBytes(), 0);
         rm.apply();
 
-        ReadCommand command = new ReadCommand("Table1", "key1", "Standard1:Column1", -1, Integer.MAX_VALUE);
+        ReadCommand command = new ColumnReadCommand("Table1", "key1", "Standard1:Column1");
         Row row = command.getRow(table);
         ColumnFamily cf = row.getColumnFamily("Standard1");
         IColumn col = cf.getColumn("Column1");