You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/04/25 17:46:20 UTC
svn commit: r768553 - in /incubator/cassandra/trunk:
src/org/apache/cassandra/cql/common/ src/org/apache/cassandra/db/
src/org/apache/cassandra/service/ src/org/apache/cassandra/test/
test/org/apache/cassandra/db/
Author: jbellis
Date: Sat Apr 25 15:46:19 2009
New Revision: 768553
URL: http://svn.apache.org/viewvc?rev=768553&view=rev
Log:
split ReadCommand into separate classes for each type of command. patch by Jun Rao; reviewed by jbellis
Added:
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java
Modified:
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/ColumnRangeQueryRSD.java Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SliceReadCommand;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
@@ -100,7 +101,7 @@
try
{
String key = (String)(rowKey_.get());
- ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, offset_, limit_);
+ ReadCommand readCommand = new SliceReadCommand(cfMetaData_.tableName, key, columnFamily_column, offset_, limit_);
row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
}
catch (Exception e)
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/SuperColumnRangeQueryRSD.java Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.SliceReadCommand;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
@@ -69,7 +70,7 @@
try
{
String key = (String)(rowKey_.get());
- ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, cfMetaData_.cfName, offset_, limit_);
+ ReadCommand readCommand = new SliceReadCommand(cfMetaData_.tableName, key, cfMetaData_.cfName, offset_, limit_);
row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
}
catch (Exception e)
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/cql/common/UniqueKeyQueryRSD.java Sat Apr 25 15:46:19 2009
@@ -27,6 +27,7 @@
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql.execution.RuntimeErrorMsg;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnReadCommand;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.ReadCommand;
@@ -85,7 +86,7 @@
try
{
String key = (String)(rowKey_.get());
- ReadCommand readCommand = new ReadCommand(cfMetaData_.tableName, key, columnFamily_column, -1, Integer.MAX_VALUE);
+ ReadCommand readCommand = new ColumnReadCommand(cfMetaData_.tableName, key, columnFamily_column);
row = StorageProxy.readProtocol(readCommand, StorageService.ConsistencyLevel.WEAK);
}
catch (Exception e)
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class ColumnReadCommand extends ReadCommand
+{
+ public final String columnFamilyColumn;
+
+ public ColumnReadCommand(String table, String key, String columnFamilyColumn)
+ {
+ super(table, key, CMD_TYPE_GET_COLUMN);
+ this.columnFamilyColumn = columnFamilyColumn;
+ }
+
+ @Override
+ public String getColumnFamilyName()
+ {
+ String[] values = RowMutation.getColumnAndColumnFamily(columnFamilyColumn);
+ return values[0];
+ }
+
+ @Override
+ public ReadCommand copy()
+ {
+ ReadCommand readCommand= new ColumnReadCommand(table, key, columnFamilyColumn);
+ readCommand.setDigestQuery(isDigestQuery());
+ return readCommand;
+ }
+
+ @Override
+ public Row getRow(Table table) throws IOException
+ {
+ return table.getRow(key, columnFamilyColumn);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GetColumnReadMessage(" +
+ "table='" + table + '\'' +
+ ", key='" + key + '\'' +
+ ", columnFamilyColumn='" + columnFamilyColumn + '\'' +
+ ')';
+ }
+}
+
+class ColumnReadCommandSerializer extends ReadCommandSerializer
+{
+ @Override
+ public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ {
+ ColumnReadCommand realRM = (ColumnReadCommand)rm;
+ dos.writeBoolean(realRM.isDigestQuery());
+ dos.writeUTF(realRM.table);
+ dos.writeUTF(realRM.key);
+ dos.writeUTF(realRM.columnFamilyColumn);
+ }
+
+ @Override
+ public ReadCommand deserialize(DataInputStream dis) throws IOException
+ {
+ boolean isDigest = dis.readBoolean();
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ String columnFamily_column = dis.readUTF();
+ ColumnReadCommand rm = new ColumnReadCommand(table, key, columnFamily_column);
+ rm.setDigestQuery(isDigest);
+ return rm;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ColumnsSinceReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class ColumnsSinceReadCommand extends ReadCommand
+{
+ public final String columnFamily;
+ public final long sinceTimestamp;
+
+ public ColumnsSinceReadCommand(String table, String key, String columnFamily, long sinceTimestamp)
+ {
+ super(table, key, CMD_TYPE_GET_COLUMNS_SINCE);
+ this.columnFamily = columnFamily;
+ this.sinceTimestamp = sinceTimestamp;
+ }
+
+ @Override
+ public String getColumnFamilyName()
+ {
+ return columnFamily;
+ }
+
+ @Override
+ public ReadCommand copy()
+ {
+ ReadCommand readCommand= new ColumnsSinceReadCommand(table, key, columnFamily, sinceTimestamp);
+ readCommand.setDigestQuery(isDigestQuery());
+ return readCommand;
+ }
+
+ @Override
+ public Row getRow(Table table) throws IOException
+ {
+ return table.getRow(key, columnFamily, sinceTimestamp);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GetColumnsSinceMessage(" +
+ "table='" + table + '\'' +
+ ", key='" + key + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ ", sinceTimestamp='" + sinceTimestamp + '\'' +
+ ')';
+ }
+
+}
+
+class ColumnsSinceReadCommandSerializer extends ReadCommandSerializer
+{
+ @Override
+ public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ {
+ ColumnsSinceReadCommand realRM = (ColumnsSinceReadCommand)rm;
+ dos.writeBoolean(realRM.isDigestQuery());
+ dos.writeUTF(realRM.table);
+ dos.writeUTF(realRM.key);
+ dos.writeUTF(realRM.columnFamily);
+ dos.writeLong(realRM.sinceTimestamp);
+ }
+
+ @Override
+ public ReadCommand deserialize(DataInputStream dis) throws IOException
+ {
+ boolean isDigest = dis.readBoolean();
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ String columnFamily = dis.readUTF();
+ long sinceTimestamp = dis.readLong();
+
+ ColumnsSinceReadCommand rm = new ColumnsSinceReadCommand(table, key, columnFamily, sinceTimestamp);
+ rm.setDigestQuery(isDigest);
+ return rm;
+ }
+}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadCommand.java Sat Apr 25 15:46:19 2009
@@ -22,26 +22,24 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Arrays;
-import java.util.Collections;
-
-import org.apache.commons.lang.StringUtils;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.cassandra.io.ICompactSerializer;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.service.StorageService;
-/**
- * Author : Avinash Lakshman ( alakshman@facebook.com) & Prashant Malik ( pmalik@facebook.com )
- */
-
-public class ReadCommand
+public abstract class ReadCommand
{
public static final String DO_REPAIR = "READ-REPAIR";
-
+ public static final byte CMD_TYPE_GET_ROW=1;
+ public static final byte CMD_TYPE_GET_COLUMN=2;
+ public static final byte CMD_TYPE_GET_SLICE_BY_NAMES=3;
+ public static final byte CMD_TYPE_GET_COLUMNS_SINCE=4;
+ public static final byte CMD_TYPE_GET_SLICE=5;
+ public static final String EMPTY_CF = "";
+
private static ReadCommandSerializer serializer = new ReadCommandSerializer();
public static ReadCommandSerializer serializer()
@@ -49,8 +47,6 @@
return serializer;
}
- private static List<String> EMPTY_COLUMNS = Arrays.asList(new String[0]);
-
public Message makeReadMessage() throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -60,57 +56,17 @@
}
public final String table;
-
public final String key;
+ private boolean isDigestQuery = false;
+ protected final byte commandType;
- public final String columnFamilyColumn;
-
- public final int start;
-
- public final int count;
-
- public final long sinceTimestamp;
-
- public final List<String> columnNames;
-
- private boolean isDigestQuery = false;
-
- public ReadCommand(String table, String key, String columnFamilyColumn, int start, int count, long sinceTimestamp, List<String> columnNames)
+ protected ReadCommand(String table, String key, byte cmdType)
{
this.table = table;
this.key = key;
- this.columnFamilyColumn = columnFamilyColumn;
- this.start = start;
- this.count = count;
- this.sinceTimestamp = sinceTimestamp;
- this.columnNames = Collections.unmodifiableList(columnNames);
+ this.commandType = cmdType;
}
-
- public ReadCommand(String table, String key)
- {
- this(table, key, null, -1, -1, -1, EMPTY_COLUMNS);
- }
-
- public ReadCommand(String table, String key, String columnFamilyColumn)
- {
- this(table, key, columnFamilyColumn, -1, -1, -1, EMPTY_COLUMNS);
- }
-
- public ReadCommand(String table, String key, String columnFamilyColumn, List<String> columnNames)
- {
- this(table, key, columnFamilyColumn, -1, -1, -1, columnNames);
- }
-
- public ReadCommand(String table, String key, String columnFamilyColumn, int start, int count)
- {
- this(table, key, columnFamilyColumn, start, count, -1, EMPTY_COLUMNS);
- }
-
- public ReadCommand(String table, String key, String columnFamilyColumn, long sinceTimestamp)
- {
- this(table, key, columnFamilyColumn, -1, -1, sinceTimestamp, EMPTY_COLUMNS);
- }
-
+
public boolean isDigestQuery()
{
return isDigestQuery;
@@ -121,99 +77,37 @@
this.isDigestQuery = isDigestQuery;
}
- public ReadCommand copy()
- {
- return new ReadCommand(table, key, columnFamilyColumn, start, count, sinceTimestamp, columnNames);
- }
-
- public Row getRow(Table table) throws IOException
- {
- if (!columnNames.isEmpty())
- {
- return table.getRow(key, columnFamilyColumn, columnNames);
- }
-
- if (sinceTimestamp > 0)
- {
- return table.getRow(key, columnFamilyColumn, sinceTimestamp);
- }
-
- if (start > 0 || (count > 0 && count < Integer.MAX_VALUE))
- {
- return table.getRow(key, columnFamilyColumn, start, count);
- }
+ public abstract String getColumnFamilyName();
+
+ public abstract ReadCommand copy();
- return table.getRow(key, columnFamilyColumn);
- }
-
- public String toString()
- {
- return "ReadMessage(" +
- "table='" + table + '\'' +
- ", key='" + key + '\'' +
- ", columnFamilyColumn='" + columnFamilyColumn + '\'' +
- ", start=" + start +
- ", count=" + count +
- ", sinceTimestamp=" + sinceTimestamp +
- ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
- ')';
- }
+ public abstract Row getRow(Table table) throws IOException;
}
class ReadCommandSerializer implements ICompactSerializer<ReadCommand>
{
+ private static final Map<Byte, ReadCommandSerializer> CMD_SERIALIZER_MAP = new HashMap<Byte, ReadCommandSerializer>();
+ static
+ {
+ CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_ROW, new RowReadCommandSerializer());
+ CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_COLUMN, new ColumnReadCommandSerializer());
+ CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE_BY_NAMES, new SliceByNamesReadCommandSerializer());
+ CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_COLUMNS_SINCE, new ColumnsSinceReadCommandSerializer());
+ CMD_SERIALIZER_MAP.put(ReadCommand.CMD_TYPE_GET_SLICE, new SliceReadCommandSerializer());
+ }
+
+
public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
{
- dos.writeUTF(rm.table);
- dos.writeUTF(rm.key);
- dos.writeUTF(rm.columnFamilyColumn);
- dos.writeInt(rm.start);
- dos.writeInt(rm.count);
- dos.writeLong(rm.sinceTimestamp);
- dos.writeBoolean(rm.isDigestQuery());
- dos.writeInt(rm.columnNames.size());
- if (rm.columnNames.size() > 0)
- {
- for (String cName : rm.columnNames)
- {
- dos.writeInt(cName.getBytes().length);
- dos.write(cName.getBytes());
- }
- }
+ dos.writeByte(rm.commandType);
+ ReadCommandSerializer ser = CMD_SERIALIZER_MAP.get(rm.commandType);
+ ser.serialize(rm, dos);
}
public ReadCommand deserialize(DataInputStream dis) throws IOException
{
- String table = dis.readUTF();
- String key = dis.readUTF();
- String columnFamily_column = dis.readUTF();
- int start = dis.readInt();
- int count = dis.readInt();
- long sinceTimestamp = dis.readLong();
- boolean isDigest = dis.readBoolean();
-
- int size = dis.readInt();
- List<String> columns = new ArrayList<String>();
- for (int i = 0; i < size; ++i)
- {
- byte[] bytes = new byte[dis.readInt()];
- dis.readFully(bytes);
- columns.add(new String(bytes));
- }
- ReadCommand rm = null;
- if (columns.size() > 0)
- {
- rm = new ReadCommand(table, key, columnFamily_column, columns);
- }
- else if (sinceTimestamp > 0)
- {
- rm = new ReadCommand(table, key, columnFamily_column, sinceTimestamp);
- }
- else
- {
- rm = new ReadCommand(table, key, columnFamily_column, start, count);
- }
- rm.setDigestQuery(isDigest);
- return rm;
+ byte msgType = dis.readByte();
+ return CMD_SERIALIZER_MAP.get(msgType).deserialize(dis);
}
+
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/ReadVerbHandler.java Sat Apr 25 15:46:19 2009
@@ -118,28 +118,11 @@
private void doReadRepair(Row row, ReadCommand readCommand)
{
- if ( DatabaseDescriptor.getConsistencyCheck() )
- {
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(readCommand.key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove( StorageService.getLocalStorageEndPoint() );
- if(readCommand.columnNames.size() == 0)
- {
- if( readCommand.start >= 0 && readCommand.count < Integer.MAX_VALUE)
- {
- StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamilyColumn, readCommand.start, readCommand.count);
- }
-
- if( readCommand.sinceTimestamp > 0)
- {
- StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamilyColumn, readCommand.sinceTimestamp);
- }
- }
- else
- {
- StorageService.instance().doConsistencyCheck(row, endpoints, readCommand.columnFamilyColumn, readCommand.columnNames);
- }
- }
+ if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, readCommand);
}
}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/RowReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class RowReadCommand extends ReadCommand
+{
+ public RowReadCommand(String table, String key)
+ {
+ super(table, key, CMD_TYPE_GET_ROW);
+ }
+
+ @Override
+ public String getColumnFamilyName()
+ {
+ return null;
+ }
+
+ @Override
+ public ReadCommand copy()
+ {
+ ReadCommand readCommand= new RowReadCommand(table, key);
+ readCommand.setDigestQuery(isDigestQuery());
+ return readCommand;
+ }
+
+ @Override
+ public Row getRow(Table table) throws IOException
+ {
+ return table.get(key);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GetColumnReadMessage(" +
+ "table='" + table + '\'' +
+ ", key='" + key + '\'' +
+ ')';
+ }
+
+}
+
+class RowReadCommandSerializer extends ReadCommandSerializer
+{
+ @Override
+ public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ {
+ RowReadCommand realRM = (RowReadCommand)rm;
+ dos.writeBoolean(realRM.isDigestQuery());
+ dos.writeUTF(realRM.table);
+ dos.writeUTF(realRM.key);
+ }
+
+ @Override
+ public ReadCommand deserialize(DataInputStream dis) throws IOException
+ {
+ boolean isDigest = dis.readBoolean();
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ RowReadCommand rm = new RowReadCommand(table, key);
+ rm.setDigestQuery(isDigest);
+ return rm;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceByNamesReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.lang.StringUtils;
+
+public class SliceByNamesReadCommand extends ReadCommand
+{
+ public final String columnFamily;
+ public final List<String> columnNames;
+
+ public SliceByNamesReadCommand(String table, String key, String columnFamily, List<String> columnNames)
+ {
+ super(table, key, CMD_TYPE_GET_SLICE_BY_NAMES);
+ this.columnFamily = columnFamily;
+ this.columnNames = Collections.unmodifiableList(columnNames);
+ }
+
+ @Override
+ public String getColumnFamilyName()
+ {
+ return columnFamily;
+ }
+
+ @Override
+ public ReadCommand copy()
+ {
+ ReadCommand readCommand= new SliceByNamesReadCommand(table, key, columnFamily, columnNames);
+ readCommand.setDigestQuery(isDigestQuery());
+ return readCommand;
+ }
+
+ @Override
+ public Row getRow(Table table) throws IOException
+ {
+ return table.getRow(key, columnFamily, columnNames);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GetSliceByNamesReadMessage(" +
+ "table='" + table + '\'' +
+ ", key='" + key + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ ", columns=[" + StringUtils.join(columnNames, ", ") + "]" +
+ ')';
+ }
+
+}
+
+class SliceByNamesReadCommandSerializer extends ReadCommandSerializer
+{
+ @Override
+ public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ {
+ SliceByNamesReadCommand realRM = (SliceByNamesReadCommand)rm;
+ dos.writeBoolean(realRM.isDigestQuery());
+ dos.writeUTF(realRM.table);
+ dos.writeUTF(realRM.key);
+ dos.writeUTF(realRM.columnFamily);
+ dos.writeInt(realRM.columnNames.size());
+ if (realRM.columnNames.size() > 0)
+ {
+ for (String cName : realRM.columnNames)
+ {
+ dos.writeInt(cName.getBytes().length);
+ dos.write(cName.getBytes());
+ }
+ }
+ }
+
+ @Override
+ public ReadCommand deserialize(DataInputStream dis) throws IOException
+ {
+ boolean isDigest = dis.readBoolean();
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ String columnFamily = dis.readUTF();
+
+ int size = dis.readInt();
+ List<String> columns = new ArrayList<String>();
+ for (int i = 0; i < size; ++i)
+ {
+ byte[] bytes = new byte[dis.readInt()];
+ dis.readFully(bytes);
+ columns.add(new String(bytes));
+ }
+ SliceByNamesReadCommand rm = new SliceByNamesReadCommand(table, key, columnFamily, columns);
+ rm.setDigestQuery(isDigest);
+ return rm;
+ }
+}
Added: incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java?rev=768553&view=auto
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java (added)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/db/SliceReadCommand.java Sat Apr 25 15:46:19 2009
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+public class SliceReadCommand extends ReadCommand
+{
+ public final String columnFamily;
+ public final int start;
+ public final int count;
+
+ public SliceReadCommand(String table, String key, String columnFamily, int start, int count)
+ {
+ super(table, key, CMD_TYPE_GET_SLICE);
+ this.columnFamily = columnFamily;
+ this.start = start;
+ this.count = count;
+ }
+
+ @Override
+ public String getColumnFamilyName()
+ {
+ return columnFamily;
+ }
+
+ @Override
+ public ReadCommand copy()
+ {
+ ReadCommand readCommand= new SliceReadCommand(table, key, columnFamily, start, count);
+ readCommand.setDigestQuery(isDigestQuery());
+ return readCommand;
+ }
+
+ @Override
+ public Row getRow(Table table) throws IOException
+ {
+ return table.getRow(key, columnFamily, start, count);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "GetSliceReadMessage(" +
+ "table='" + table + '\'' +
+ ", key='" + key + '\'' +
+ ", columnFamily='" + columnFamily + '\'' +
+ ", start='" + start + '\'' +
+ ", count='" + count + '\'' +
+ ')';
+ }
+}
+
+class SliceReadCommandSerializer extends ReadCommandSerializer
+{
+ @Override
+ public void serialize(ReadCommand rm, DataOutputStream dos) throws IOException
+ {
+ SliceReadCommand realRM = (SliceReadCommand)rm;
+ dos.writeBoolean(realRM.isDigestQuery());
+ dos.writeUTF(realRM.table);
+ dos.writeUTF(realRM.key);
+ dos.writeUTF(realRM.columnFamily);
+ dos.writeInt(realRM.start);
+ dos.writeInt(realRM.count);
+ }
+
+ @Override
+ public ReadCommand deserialize(DataInputStream dis) throws IOException
+ {
+ boolean isDigest = dis.readBoolean();
+ String table = dis.readUTF();
+ String key = dis.readUTF();
+ String columnFamily = dis.readUTF();
+ int start = dis.readInt();
+ int count = dis.readInt();
+
+ SliceReadCommand rm = new SliceReadCommand(table, key, columnFamily, start, count);
+ rm.setDigestQuery(isDigest);
+ return rm;
+ }
+}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/CassandraServer.java Sat Apr 25 15:46:19 2009
@@ -36,6 +36,10 @@
import org.apache.cassandra.cql.common.CqlResult;
import org.apache.cassandra.cql.driver.CqlDriver;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.ColumnReadCommand;
+import org.apache.cassandra.db.ColumnsSinceReadCommand;
+import org.apache.cassandra.db.SliceByNamesReadCommand;
+import org.apache.cassandra.db.SliceReadCommand;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
@@ -102,12 +106,8 @@
protected ColumnFamily readColumnFamily(ReadCommand command) throws InvalidRequestException
{
- String[] values = RowMutation.getColumnAndColumnFamily(command.columnFamilyColumn);
- if( values.length < 1 )
- {
- throw new ColumnFamilyNotDefinedException("Empty column Family is invalid.");
- }
- validateCommand(command.key, command.table, values[0]);
+ String cfName = command.getColumnFamilyName();
+ validateCommand(command.key, command.table, cfName);
Row row;
try
@@ -127,7 +127,7 @@
{
return null;
}
- return row.getColumnFamily(values[0]);
+ return row.getColumnFamily(cfName);
}
public List<column_t> thriftifyColumns(Collection<IColumn> columns)
@@ -156,7 +156,7 @@
long startTime = System.currentTimeMillis();
try
{
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, timeStamp));
+ ColumnFamily cfamily = readColumnFamily(new ColumnsSinceReadCommand(tablename, key, columnFamily_column, timeStamp));
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
if (cfamily == null)
{
@@ -188,7 +188,7 @@
long startTime = System.currentTimeMillis();
try
{
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, columnNames));
+ ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, columnNames));
if (cfamily == null)
{
return EMPTY_COLUMNS;
@@ -209,7 +209,7 @@
try
{
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, start, count));
+ ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, start, count));
if (cfamily == null)
{
return EMPTY_COLUMNS;
@@ -241,7 +241,7 @@
{
throw new InvalidRequestException("get_column requires both parts of columnfamily:column");
}
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
+ ColumnFamily cfamily = readColumnFamily(new ColumnReadCommand(tablename, key, columnFamily_column));
if (cfamily == null)
{
throw new NotFoundException();
@@ -277,7 +277,7 @@
public int get_column_count(String tablename, String key, String columnFamily_column) throws InvalidRequestException
{
String[] values = RowMutation.getColumnAndColumnFamily(columnFamily_column);
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
+ ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
if (cfamily == null)
{
return 0;
@@ -367,7 +367,7 @@
long startTime = System.currentTimeMillis();
try
{
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily, superColumnNames));
+ ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(tablename, key, columnFamily, superColumnNames));
if (cfamily == null)
{
return EMPTY_SUPERCOLUMNS;
@@ -405,7 +405,7 @@
public List<superColumn_t> get_slice_super(String tablename, String key, String columnFamily_superColumnName, int start, int count) throws InvalidRequestException
{
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_superColumnName, start, count));
+ ColumnFamily cfamily = readColumnFamily(new SliceReadCommand(tablename, key, columnFamily_superColumnName, start, count));
if (cfamily == null)
{
return EMPTY_SUPERCOLUMNS;
@@ -416,7 +416,7 @@
public superColumn_t get_superColumn(String tablename, String key, String columnFamily_column) throws InvalidRequestException, NotFoundException
{
- ColumnFamily cfamily = readColumnFamily(new ReadCommand(tablename, key, columnFamily_column, -1, Integer.MAX_VALUE));
+ ColumnFamily cfamily = readColumnFamily(new ColumnReadCommand(tablename, key, columnFamily_column));
if (cfamily == null)
{
throw new NotFoundException();
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/ConsistencyManager.java Sat Apr 25 15:46:19 2009
@@ -149,41 +149,17 @@
private static ICachetable<String, String> readRepairTable_ = new Cachetable<String, String>(scheduledTimeMillis_);
private Row row_;
protected List<EndPoint> replicas_;
- private String columnFamily_;
- private int start_;
- private int count_;
- private long sinceTimestamp_;
- private List<String> columnNames_ = new ArrayList<String>();
- public ConsistencyManager(Row row_, List<EndPoint> replicas_, String columnFamily_, int start_, int count_, long sinceTimestamp_, List<String> columnNames_)
+ private ReadCommand readCommand_;
+
+ public ConsistencyManager(Row row_, List<EndPoint> replicas_, ReadCommand readCommand)
{
- this.row_ = row_;
- this.replicas_ = replicas_;
- this.columnFamily_ = columnFamily_;
- this.start_ = start_;
- this.count_ = count_;
- this.sinceTimestamp_ = sinceTimestamp_;
- this.columnNames_ = columnNames_;
+ this.readCommand_ = readCommand;
}
- ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, List<String> columns)
- {
- this(row, replicas, columnFamily, 0, 0, 0, columns);
- }
-
- ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, int start, int count)
- {
- this(row, replicas, columnFamily, start, count, 0, null);
- }
-
- ConsistencyManager(Row row, List<EndPoint> replicas, String columnFamily, long sinceTimestamp)
- {
- this(row, replicas, columnFamily, 0, 0, sinceTimestamp, null);
- }
-
public void run()
{
- logger_.debug(" Run the consistency checks for " + columnFamily_);
+ logger_.debug(" Run the consistency checks for " + readCommand_.getColumnFamilyName());
ReadCommand readCommandDigestOnly = constructReadMessage(true);
try
{
@@ -199,29 +175,7 @@
private ReadCommand constructReadMessage(boolean isDigestQuery)
{
- ReadCommand readCommand = null;
- String table = DatabaseDescriptor.getTables().get(0);
-
- if(columnNames_.size() == 0)
- {
- if( start_ >= 0 && count_ < Integer.MAX_VALUE)
- {
- readCommand = new ReadCommand(table, row_.key(), columnFamily_, start_, count_);
- }
- else if(sinceTimestamp_ > 0)
- {
- readCommand = new ReadCommand(table, row_.key(), columnFamily_, sinceTimestamp_);
- }
- else
- {
- readCommand = new ReadCommand(table, row_.key(), columnFamily_);
- }
- }
- else
- {
- readCommand = new ReadCommand(table, row_.key(), columnFamily_, columnNames_);
-
- }
+ ReadCommand readCommand = readCommand_.copy();
readCommand.setDigestQuery(isDigestQuery);
return readCommand;
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageProxy.java Sat Apr 25 15:46:19 2009
@@ -334,21 +334,21 @@
return row;
}
- public static Map<String, Row> readProtocol(String tablename, String[] keys, String columnFamily, int start, int count, StorageService.ConsistencyLevel consistencyLevel) throws Exception
+ public static Map<String, Row> readProtocol(String[] keys, ReadCommand readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception
{
Map<String, Row> rows = new HashMap<String, Row>();
switch ( consistencyLevel )
{
case WEAK:
- rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+ rows = weakReadProtocol(keys, readCommand);
break;
case STRONG:
- rows = strongReadProtocol(tablename, keys, columnFamily, start, count);
+ rows = strongReadProtocol(keys, readCommand);
break;
default:
- rows = weakReadProtocol(tablename, keys, columnFamily, start, count);
+ rows = weakReadProtocol(keys, readCommand);
break;
}
return rows;
@@ -365,7 +365,7 @@
* @throws IOException
* @throws TimeoutException
*/
- public static Map<String, Row> strongReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws IOException, TimeoutException
+ public static Map<String, Row> strongReadProtocol(String[] keys, ReadCommand readCommand) throws IOException, TimeoutException
{
Map<String, Row> rows = new HashMap<String, Row>();
long startTime = System.currentTimeMillis();
@@ -374,23 +374,10 @@
for (String key : keys )
{
ReadCommand[] readParameters = new ReadCommand[2];
- if( start >= 0 && count < Integer.MAX_VALUE)
- {
- readParameters[0] = new ReadCommand(tablename, key, columnFamily, start, count);
- }
- else
- {
- readParameters[0] = new ReadCommand(tablename, key, columnFamily);
- }
- if( start >= 0 && count < Integer.MAX_VALUE)
- {
- readParameters[1] = new ReadCommand(tablename, key, columnFamily, start, count);
- }
- else
- {
- readParameters[1] = new ReadCommand(tablename, key, columnFamily);
- }
+ readParameters[0] = readCommand.copy();
+ readParameters[1] = readCommand.copy();
readParameters[1].setDigestQuery(true);
+ readMessages.put(key, readParameters);
}
rows = doStrongReadProtocol(readMessages);
logger_.debug("readProtocol: " + (System.currentTimeMillis() - startTime) + " ms.");
@@ -586,15 +573,15 @@
* @return a mapping of key --> Row
* @throws Exception
*/
- public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, List<String> columns) throws Exception
+ public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand readCommand) throws Exception
{
Row row = null;
long startTime = System.currentTimeMillis();
Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
for ( String key : keys )
{
- ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, columns);
- readMessages.put(key, readCommand);
+ ReadCommand readCmd = readCommand.copy();
+ readMessages.put(key, readCmd);
}
/* Performs the multiget in parallel */
Map<String, Row> rows = doReadProtocol(readMessages);
@@ -608,7 +595,7 @@
/* Remove the local storage endpoint from the list. */
endpoints.remove( StorageService.getLocalStorageEndPoint() );
if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, columns);
+ StorageService.instance().doConsistencyCheck(row, endpoints, readMessages.get(key));
}
return rows;
}
@@ -638,81 +625,4 @@
StorageService.instance().doConsistencyCheck(row, endpoints, command);
return row;
}
-
- /**
- * This version is used when results for multiple keys needs to be
- * retrieved.
- *
- * @param tablename name of the table that needs to be queried
- * @param keys keys whose values we are interested in
- * @param columnFamily name of the "column" we are interested in
- * @param start start index
- * @param count the number of columns we are interested in
- * @return a mapping of key --> Row
- * @throws Exception
- */
- public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, int start, int count) throws Exception
- {
- Row row = null;
- long startTime = System.currentTimeMillis();
- Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
- for ( String key : keys )
- {
- ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, start, count);
- readMessages.put(key, readCommand);
- }
- /* Performs the multiget in parallel */
- Map<String, Row> rows = doReadProtocol(readMessages);
- /*
- * Do the consistency checks for the keys that are being queried
- * in the background.
- */
- for ( String key : keys )
- {
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
- if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, start, count);
- }
- return rows;
- }
-
- /**
- * This version is used when results for multiple keys needs to be
- * retrieved.
- *
- * @param tablename name of the table that needs to be queried
- * @param keys keys whose values we are interested in
- * @param columnFamily name of the "column" we are interested in
- * @param sinceTimestamp this is lower bound of the timestamp
- * @return a mapping of key --> Row
- * @throws Exception
- */
- public static Map<String, Row> weakReadProtocol(String tablename, String[] keys, String columnFamily, long sinceTimestamp) throws Exception
- {
- Row row = null;
- long startTime = System.currentTimeMillis();
- Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
- for ( String key : keys )
- {
- ReadCommand readCommand = new ReadCommand(tablename, key, columnFamily, sinceTimestamp);
- readMessages.put(key, readCommand);
- }
- /* Performs the multiget in parallel */
- Map<String, Row> rows = doReadProtocol(readMessages);
- /*
- * Do the consistency checks for the keys that are being queried
- * in the background.
- */
- for ( String key : keys )
- {
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
- if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, columnFamily, sinceTimestamp);
- }
- return rows;
- }
}
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/service/StorageService.java Sat Apr 25 15:46:19 2009
@@ -567,32 +567,10 @@
*/
public void doConsistencyCheck(Row row, List<EndPoint> endpoints, ReadCommand message)
{
- Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, message.columnFamilyColumn,
- message.start, message.count, message.sinceTimestamp, message.columnNames);
+ Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, message);
consistencyManager_.submit(consistencySentinel);
}
- @Deprecated
- public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, int start, int count)
- {
- Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, start, count);
- consistencyManager_.submit(consistencySentinel);
- }
-
- @Deprecated
- public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, long sinceTimestamp)
- {
- Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, sinceTimestamp);
- consistencyManager_.submit(consistencySentinel);
- }
-
- @Deprecated
- public void doConsistencyCheck(Row row, List<EndPoint> endpoints, String columnFamily, List<String> columns)
- {
- Runnable consistencySentinel = new ConsistencyManager(row.cloneMe(), endpoints, columnFamily, columns);
- consistencyManager_.submit(consistencySentinel);
- }
-
/*
* This method displays all the ranges and the replicas
* that are responsible for the individual ranges. The
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/DataImporter.java Sat Apr 25 15:46:19 2009
@@ -42,6 +42,7 @@
import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
import org.apache.cassandra.db.ColumnFamily;
+import org.apache.cassandra.db.RowReadCommand;
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.ReadResponse;
@@ -880,7 +881,7 @@
key = user + ":1";
}
- ReadCommand readCommand = new ReadCommand(tablename_, key);
+ ReadCommand readCommand = new RowReadCommand(tablename_, key);
Message message = new Message(from_, StorageService.readStage_,
StorageService.readVerbHandler_,
new Object[] {readCommand});
Modified: incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java (original)
+++ incubator/cassandra/trunk/src/org/apache/cassandra/test/StressTest.java Sat Apr 25 15:46:19 2009
@@ -29,6 +29,7 @@
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ThreadFactoryImpl;
+import org.apache.cassandra.db.ColumnReadCommand;
import org.apache.cassandra.db.ReadCommand;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.RowMutation;
@@ -180,7 +181,7 @@
String stringKey = new Integer(key).toString();
stringKey = stringKey + keyFix_ ;
int j = random.nextInt(columns) + 1;
- ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
+ ReadCommand rm = new ColumnReadCommand(tablename_, stringKey, columnFamilyColumn_ + ":" + columnFix_ + j);
readLoad(rm);
if ( requestsPerSecond_ > 1000)
Thread.sleep(0, 1000000000/requestsPerSecond_);
@@ -250,7 +251,7 @@
stringKey = stringKey + keyFix_ ;
int i = random.nextInt(superColumns) + 1;
int j = random.nextInt(columns) + 1;
- ReadCommand rm = new ReadCommand(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
+ ReadCommand rm = new ColumnReadCommand(tablename_, stringKey, columnFamilySuperColumn_ + ":" + superColumnFix_ + i + ":" + columnFix_ + j);
readLoad(rm);
}
}
Modified: incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java?rev=768553&r1=768552&r2=768553&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java (original)
+++ incubator/cassandra/trunk/test/org/apache/cassandra/db/ReadMessageTest.java Sat Apr 25 15:46:19 2009
@@ -18,10 +18,27 @@
ArrayList<String> colList = new ArrayList<String>();
colList.add("col1");
colList.add("col2");
+
+ ReadCommand rm, rm2;
+
+ rm = new SliceByNamesReadCommand("Table1", "row1", "foo", colList);
+ rm2 = serializeAndDeserializeReadMessage(rm);
+ assert rm2.toString().equals(rm.toString());
+
+ rm = new ColumnReadCommand("Table1", "row1", "foo:col1");
+ rm2 = serializeAndDeserializeReadMessage(rm);
+ assert rm2.toString().equals(rm.toString());
- ReadCommand rm = new ReadCommand("Table1", "row1", "foo", colList);
- ReadCommand rm2 = serializeAndDeserializeReadMessage(rm);
+ rm = new RowReadCommand("Table1", "row1");
+ rm2 = serializeAndDeserializeReadMessage(rm);
+ assert rm2.toString().equals(rm.toString());
+
+ rm = new ColumnsSinceReadCommand("Table1", "row1", "foo", 1);
+ rm2 = serializeAndDeserializeReadMessage(rm);
+ assert rm2.toString().equals(rm.toString());
+ rm = new SliceReadCommand("Table1", "row1", "foo", 1, 2);
+ rm2 = serializeAndDeserializeReadMessage(rm);
assert rm2.toString().equals(rm.toString());
}
@@ -56,7 +73,7 @@
rm.add("Standard1:Column1", "abcd".getBytes(), 0);
rm.apply();
- ReadCommand command = new ReadCommand("Table1", "key1", "Standard1:Column1", -1, Integer.MAX_VALUE);
+ ReadCommand command = new ColumnReadCommand("Table1", "key1", "Standard1:Column1");
Row row = command.getRow(table);
ColumnFamily cf = row.getColumnFamily("Standard1");
IColumn col = cf.getColumn("Column1");