You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/08/27 22:07:13 UTC
svn commit: r808589 [3/3] - in /incubator/cassandra/trunk: interface/
interface/gen-java/org/apache/cassandra/service/
src/java/org/apache/cassandra/net/ src/java/org/apache/cassandra/service/
test/system/
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/AsyncResult.java Thu Aug 27 20:07:12 2009
@@ -36,10 +36,12 @@
private AtomicBoolean done_ = new AtomicBoolean(false);
private Lock lock_ = new ReentrantLock();
private Condition condition_;
+ private long startTime_;
public AsyncResult()
{
condition_ = lock_.newCondition();
+ startTime_ = System.currentTimeMillis();
}
public byte[] get()
@@ -77,8 +79,12 @@
try
{
if ( !done_.get() )
- {
- bVal = condition_.await(timeout, tu);
+ {
+ long overall_timeout = System.currentTimeMillis() - startTime_ + timeout;
+ if(overall_timeout > 0)
+ bVal = condition_.await(overall_timeout, TimeUnit.MILLISECONDS);
+ else
+ bVal = false;
}
}
catch ( InterruptedException ex )
@@ -97,17 +103,7 @@
}
return result_;
}
-
- public List<byte[]> multiget()
- {
- throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
- }
-
- public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException
- {
- throw new UnsupportedOperationException("This operation is not supported in the AsyncResult abstraction.");
- }
-
+
public void result(Message response)
{
try
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IAsyncResult.java Thu Aug 27 20:07:12 2009
@@ -45,22 +45,7 @@
* @return the result wrapped in an Object[]
*/
public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
-
- /**
- * Returns the result for all tasks that was submitted.
- * @return the list of results wrapped in an Object[]
- */
- public List<byte[]> multiget();
-
- /**
- * Same operation as the above get() but allows the calling
- * thread to specify a timeout.
- * @param timeout the maximum time to wait
- * @param tu the time unit of the timeout argument
- * @return the result wrapped in an Object[]
- */
- public List<byte[]> multiget(long timeout, TimeUnit tu) throws TimeoutException;
-
+
/**
* Store the result obtained for the submitted task.
* @param result the response message
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/IMessagingService.java Thu Aug 27 20:07:12 2009
@@ -119,19 +119,6 @@
* array is sent to the ith element in the <code>to</code> array.This method assumes
* there is a one-one mapping between the <code>messages</code> array and
* the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
- * This method also informs the MessagingService to wait for at least
- * <code>howManyResults</code> responses to determine success of failure.
- * @param messages messages to be sent.
- * @param to endpoints to which the message needs to be sent
- * @return an reference to IAsyncResult
- */
- public IAsyncResult sendRR(Message[] messages, EndPoint[] to);
-
- /**
- * Send a message to a given endpoint. The ith element in the <code>messages</code>
- * array is sent to the ith element in the <code>to</code> array.This method assumes
- * there is a one-one mapping between the <code>messages</code> array and
- * the <code>to</code> array. Otherwise an IllegalArgumentException will be thrown.
* The idea is that multi-groups of messages are grouped as one logical message
* whose results are harnessed via the <i>IAsyncResult</i>
* @param messages groups of grouped messages.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Thu Aug 27 20:07:12 2009
@@ -379,25 +379,6 @@
return groupId;
}
- public IAsyncResult sendRR(Message[] messages, EndPoint[] to)
- {
- if ( messages.length != to.length )
- {
- throw new IllegalArgumentException("Number of messages and the number of endpoints need to be same.");
- }
-
- IAsyncResult iar = new MultiAsyncResult(messages.length);
- String groupId = GuidGenerator.guid();
- taskCompletionMap_.put(groupId, iar);
- for ( int i = 0; i < messages.length; ++i )
- {
- messages[i].setMessageId(groupId);
- sendOneWay(messages[i], to[i]);
- }
-
- return iar;
- }
-
public String sendRR(Message[][] messages, EndPoint[][] to, IAsyncCallback cb)
{
if ( messages.length != to.length )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/CassandraServer.java Thu Aug 27 20:07:12 2009
@@ -36,7 +36,6 @@
import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.LogUtil;
-import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.Token;
import org.apache.thrift.TException;
@@ -73,10 +72,17 @@
storageService.start();
}
- protected ColumnFamily readColumnFamily(ReadCommand command, int consistency_level) throws InvalidRequestException
+ protected Map<String, ColumnFamily> readColumnFamily(List<ReadCommand> commands, int consistency_level) throws InvalidRequestException
{
- String cfName = command.getColumnFamilyName();
- ThriftValidation.validateKey(command.key);
+ // TODO - Support multiple column families per row, right now row only contains 1 column family
+ String cfName = commands.get(0).getColumnFamilyName();
+
+ Map<String, ColumnFamily> columnFamilyKeyMap = new HashMap<String,ColumnFamily>();
+
+ for (ReadCommand command: commands)
+ {
+ ThriftValidation.validateKey(command.key);
+ }
if (consistency_level == ConsistencyLevel.ZERO)
{
@@ -87,10 +93,10 @@
throw new InvalidRequestException("Consistency level all is not yet supported on read operations");
}
- Row row;
+ List<Row> rows;
try
{
- row = StorageProxy.readProtocol(command, consistency_level);
+ rows = StorageProxy.readProtocol(commands, consistency_level);
}
catch (IOException e)
{
@@ -101,11 +107,11 @@
throw new RuntimeException(e);
}
- if (row == null)
+ for (Row row: rows)
{
- return null;
+ columnFamilyKeyMap.put(row.key(), row.getColumnFamily(cfName));
}
- return row.getColumnFamily(cfName);
+ return columnFamilyKeyMap;
}
public List<Column> thriftifySubColumns(Collection<IColumn> columns)
@@ -169,50 +175,78 @@
return thriftSuperColumns;
}
- private List<ColumnOrSuperColumn> getSlice(ReadCommand command, int consistency_level) throws InvalidRequestException
+ private Map<String, List<ColumnOrSuperColumn>> getSlice(List<ReadCommand> commands, int consistency_level) throws InvalidRequestException
{
- ColumnFamily cfamily = readColumnFamily(command, consistency_level);
- boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).reversed;
-
- if (cfamily == null || cfamily.getColumnsMap().size() == 0)
- {
- return EMPTY_COLUMNS;
- }
- if (command.queryPath.superColumnName != null)
+ Map<String, ColumnFamily> cfamilies = readColumnFamily(commands, consistency_level);
+ Map<String, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<String, List<ColumnOrSuperColumn>>();
+ for (ReadCommand command: commands)
{
- IColumn column = cfamily.getColumnsMap().values().iterator().next();
- Collection<IColumn> subcolumns = column.getSubColumns();
- if (subcolumns == null || subcolumns.isEmpty())
+ ColumnFamily cfamily = cfamilies.get(command.key);
+ boolean reverseOrder = command instanceof SliceFromReadCommand && ((SliceFromReadCommand)command).reversed;
+
+ if (cfamily == null || cfamily.getColumnsMap().size() == 0)
{
- return EMPTY_COLUMNS;
+ columnFamiliesMap.put(command.key, EMPTY_COLUMNS);
+ continue;
}
- return thriftifyColumns(subcolumns, reverseOrder);
- }
- if (cfamily.isSuper())
- {
- return thriftifySuperColumns(cfamily.getSortedColumns(), reverseOrder);
+ if (command.queryPath.superColumnName != null)
+ {
+ IColumn column = cfamily.getColumnsMap().values().iterator().next();
+ Collection<IColumn> subcolumns = column.getSubColumns();
+ if (subcolumns == null || subcolumns.isEmpty())
+ {
+ columnFamiliesMap.put(command.key, EMPTY_COLUMNS);
+ continue;
+ }
+ columnFamiliesMap.put(command.key, thriftifyColumns(subcolumns, reverseOrder));
+ continue;
+ }
+ if (cfamily.isSuper())
+ columnFamiliesMap.put(command.key, thriftifySuperColumns(cfamily.getSortedColumns(), reverseOrder));
+ else
+ columnFamiliesMap.put(command.key, thriftifyColumns(cfamily.getSortedColumns(), reverseOrder));
}
- return thriftifyColumns(cfamily.getSortedColumns(), reverseOrder);
+
+ return columnFamiliesMap;
}
public List<ColumnOrSuperColumn> get_slice(String keyspace, String key, ColumnParent column_parent, SlicePredicate predicate, int consistency_level)
throws InvalidRequestException, NotFoundException
{
if (logger.isDebugEnabled())
- logger.debug("get_slice_from");
+ logger.debug("get_slice");
+ return multigetSliceInternal(keyspace, Arrays.asList(key), column_parent, predicate, consistency_level).get(key);
+ }
+
+ public Map<String, List<ColumnOrSuperColumn>> multiget_slice(String keyspace, List<String> keys, ColumnParent column_parent, SlicePredicate predicate, int consistency_level)
+ throws InvalidRequestException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("multiget_slice");
+ return multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+ }
+
+ private Map<String, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<String> keys, ColumnParent column_parent, SlicePredicate predicate, int consistency_level)
+ throws InvalidRequestException
+ {
ThriftValidation.validateColumnParent(keyspace, column_parent);
+ List<ReadCommand> commands = new ArrayList<ReadCommand>();
+ SliceRange range = predicate.slice_range;
if (predicate.column_names != null)
{
+ for (String key: keys)
+ commands.add(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names));
ThriftValidation.validateColumns(keyspace, column_parent, predicate.column_names);
- return getSlice(new SliceByNamesReadCommand(keyspace, key, column_parent, predicate.column_names), consistency_level);
}
else
{
- SliceRange range = predicate.slice_range;
+ for (String key: keys)
+ commands.add(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count));
ThriftValidation.validateRange(keyspace, column_parent, range);
- return getSlice(new SliceFromReadCommand(keyspace, key, column_parent, range.start, range.finish, range.reversed, range.count), consistency_level);
}
+
+ return getSlice(commands, consistency_level);
}
public ColumnOrSuperColumn get(String table, String key, ColumnPath column_path, int consistency_level)
@@ -220,50 +254,127 @@
{
if (logger.isDebugEnabled())
logger.debug("get");
- ThriftValidation.validateColumnPath(table, column_path);
-
- QueryPath path = new QueryPath(column_path.column_family, column_path.super_column);
- List<byte[]> nameAsList = Arrays.asList(column_path.column == null ? column_path.super_column : column_path.column);
- ColumnFamily cfamily = readColumnFamily(new SliceByNamesReadCommand(table, key, path, nameAsList), consistency_level);
- if (cfamily == null)
+ ColumnOrSuperColumn column = multiget(table, Arrays.asList(key), column_path, consistency_level).get(key);
+ if (!column.isSetColumn() && !column.isSetSuper_column())
{
throw new NotFoundException();
}
- Collection<IColumn> columns = null;
- if (column_path.super_column != null && column_path.column != null)
+ return column;
+ }
+
+ /** no values will be mapped to keys with no data */
+ private Map<String, Collection<IColumn>> multigetColumns(List<ReadCommand> commands, int consistency_level)
+ throws InvalidRequestException
+ {
+ Map<String, ColumnFamily> cfamilies = readColumnFamily(commands, consistency_level);
+ Map<String, Collection<IColumn>> columnFamiliesMap = new HashMap<String, Collection<IColumn>>();
+
+ for (ReadCommand command: commands)
{
- IColumn column = cfamily.getColumn(column_path.super_column);
- if (column != null)
+ ColumnFamily cfamily = cfamilies.get(command.key);
+ if (cfamily == null)
+ continue;
+
+ Collection<IColumn> columns = null;
+ if (command.queryPath.superColumnName != null)
{
- columns = column.getSubColumns();
+ IColumn column = cfamily.getColumn(command.queryPath.superColumnName);
+ if (column != null)
+ {
+ columns = column.getSubColumns();
+ }
+ }
+ else
+ {
+ columns = cfamily.getSortedColumns();
+ }
+
+ if (columns != null && columns.size() != 0)
+ {
+ columnFamiliesMap.put(command.key, columns);
}
}
- else
- {
- columns = cfamily.getSortedColumns();
- }
- if (columns == null || columns.size() == 0)
+ return columnFamiliesMap;
+ }
+
+ /** always returns a ColumnOrSuperColumn for each key, even if there is no data for it */
+ public Map<String, ColumnOrSuperColumn> multiget(String table, List<String> keys, ColumnPath column_path, int consistency_level)
+ throws InvalidRequestException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("multiget");
+ return multigetInternal(table, keys, column_path, consistency_level);
+ }
+
+ private Map<String, ColumnOrSuperColumn> multigetInternal(String table, List<String> keys, ColumnPath column_path, int consistency_level)
+ throws InvalidRequestException
+ {
+ ThriftValidation.validateColumnPath(table, column_path);
+
+ QueryPath path = new QueryPath(column_path.column_family, column_path.super_column);
+ List<byte[]> nameAsList = Arrays.asList(column_path.column == null ? column_path.super_column : column_path.column);
+ List<ReadCommand> commands = new ArrayList<ReadCommand>();
+ for (String key: keys)
{
- throw new NotFoundException();
+ commands.add(new SliceByNamesReadCommand(table, key, path, nameAsList));
}
- assert columns.size() == 1;
- IColumn column = columns.iterator().next();
- if (column.isMarkedForDelete())
+ Map<String, ColumnOrSuperColumn> columnFamiliesMap = new HashMap<String, ColumnOrSuperColumn>();
+ Map<String, Collection<IColumn>> columnsMap = multigetColumns(commands, consistency_level);
+
+ for (ReadCommand command: commands)
{
- throw new NotFoundException();
+ ColumnOrSuperColumn columnorsupercolumn;
+
+ Collection<IColumn> columns = columnsMap.get(command.key);
+ if (columns == null)
+ {
+ columnorsupercolumn = new ColumnOrSuperColumn();
+ }
+ else
+ {
+ assert columns.size() == 1;
+ IColumn column = columns.iterator().next();
+
+
+ if (column.isMarkedForDelete())
+ {
+ columnorsupercolumn = new ColumnOrSuperColumn();
+ }
+ else
+ {
+ columnorsupercolumn = column instanceof org.apache.cassandra.db.Column
+ ? new ColumnOrSuperColumn(new Column(column.name(), column.value(), column.timestamp()), null)
+ : new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns())));
+ }
+
+ }
+ columnFamiliesMap.put(command.key, columnorsupercolumn);
}
- return column instanceof org.apache.cassandra.db.Column
- ? new ColumnOrSuperColumn(new Column(column.name(), column.value(), column.timestamp()), null)
- : new ColumnOrSuperColumn(null, new SuperColumn(column.name(), thriftifySubColumns(column.getSubColumns())));
+ return columnFamiliesMap;
}
public int get_count(String table, String key, ColumnParent column_parent, int consistency_level)
throws InvalidRequestException
{
if (logger.isDebugEnabled())
- logger.debug("get_column_count");
+ logger.debug("get_count");
+ return multigetCountInternal(table, Arrays.asList(key), column_parent, consistency_level).get(key);
+ }
+
+ public Map<String, Integer> multiget_count(String table, List<String> keys, ColumnParent column_parent, int consistency_level)
+ throws InvalidRequestException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("multiget_count");
+ return multigetCountInternal(table, keys, column_parent, consistency_level);
+
+ }
+
+ private Map<String, Integer> multigetCountInternal(String table, List<String> keys, ColumnParent column_parent, int consistency_level)
+ throws InvalidRequestException
+ {
// validateColumnParent assumes we require simple columns; g_c_c is the only
// one of the columnParent-taking apis that can also work at the SC level.
// so we roll a one-off validator here.
@@ -273,31 +384,29 @@
throw new InvalidRequestException("columnfamily alone is required for standard CF " + column_parent.column_family);
}
- ColumnFamily cfamily;
- cfamily = readColumnFamily(new SliceFromReadCommand(table, key, column_parent, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE), consistency_level);
- if (cfamily == null)
+ List<ReadCommand> commands = new ArrayList<ReadCommand>();
+ for (String key: keys)
{
- return 0;
+ commands.add(new SliceFromReadCommand(table, key, column_parent, ArrayUtils.EMPTY_BYTE_ARRAY, ArrayUtils.EMPTY_BYTE_ARRAY, true, Integer.MAX_VALUE));
}
- Collection<IColumn> columns = null;
- if (column_parent.super_column != null)
+
+ Map<String, Integer> columnFamiliesMap = new HashMap<String, Integer>();
+ Map<String, Collection<IColumn>> columnsMap = multigetColumns(commands, consistency_level);
+
+ for (ReadCommand command: commands)
{
- IColumn column = cfamily.getColumn(column_parent.super_column);
- if (column != null)
+ Collection<IColumn> columns = columnsMap.get(command.key);
+ if(columns == null)
{
- columns = column.getSubColumns();
+ columnFamiliesMap.put(command.key, 0);
+ }
+ else
+ {
+ columnFamiliesMap.put(command.key, columns.size());
}
}
- else
- {
- columns = cfamily.getSortedColumns();
- }
- if (columns == null || columns.size() == 0)
- {
- return 0;
- }
- return columns.size();
- }
+ return columnFamiliesMap;
+ }
public void insert(String table, String key, ColumnPath column_path, byte[] value, long timestamp, int consistency_level)
throws InvalidRequestException, UnavailableException
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/QuorumResponseHandler.java Thu Aug 27 20:07:12 2009
@@ -41,6 +41,7 @@
private List<Message> responses_ = new ArrayList<Message>();
private IResponseResolver<T> responseResolver_;
private AtomicBoolean done_ = new AtomicBoolean(false);
+ private long startTime_;
public QuorumResponseHandler(int responseCount, IResponseResolver<T> responseResolver) throws InvalidRequestException
{
@@ -51,6 +52,7 @@
condition_ = lock_.newCondition();
responseCount_ = responseCount;
responseResolver_ = responseResolver;
+ startTime_ = System.currentTimeMillis();
}
public T get() throws TimeoutException, DigestMismatchException
@@ -62,8 +64,12 @@
try
{
if ( !done_.get() )
- {
- bVal = condition_.await(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ {
+ long timeout = System.currentTimeMillis() - startTime_ + DatabaseDescriptor.getRpcTimeout();
+ if(timeout > 0)
+ bVal = condition_.await(timeout, TimeUnit.MILLISECONDS);
+ else
+ bVal = false;
}
}
catch ( InterruptedException ex )
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Thu Aug 27 20:07:12 2009
@@ -238,171 +238,101 @@
insertBlocking(rm, ConsistencyLevel.QUORUM);
}
- private static Map<String, Message> constructMessages(Map<String, ReadCommand> readMessages) throws IOException
- {
- Map<String, Message> messages = new HashMap<String, Message>();
- Set<String> keys = readMessages.keySet();
- for ( String key : keys )
- {
- Message message = readMessages.get(key).makeReadMessage();
- messages.put(key, message);
- }
- return messages;
- }
-
- private static IAsyncResult dispatchMessages(Map<String, EndPoint> endPoints, Map<String, Message> messages)
- {
- Set<String> keys = endPoints.keySet();
- EndPoint[] eps = new EndPoint[keys.size()];
- Message[] msgs = new Message[keys.size()];
-
- int i = 0;
- for ( String key : keys )
- {
- eps[i] = endPoints.get(key);
- msgs[i] = messages.get(key);
- ++i;
- }
-
- IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(msgs, eps);
- return iar;
- }
-
- /**
- * This is an implementation for the multiget version.
- * @param readMessages map of key --> ReadMessage to be sent
- * @return map of key --> Row
- * @throws IOException
- * @throws TimeoutException
- */
- public static Map<String, Row> doReadProtocol(Map<String, ReadCommand> readMessages) throws IOException,TimeoutException
- {
- Map<String, Row> rows = new HashMap<String, Row>();
- Set<String> keys = readMessages.keySet();
- /* Find all the suitable endpoints for the keys */
- Map<String, EndPoint> endPoints = StorageService.instance().findSuitableEndPoints(keys.toArray( new String[0] ));
- /* Construct the messages to be sent out */
- Map<String, Message> messages = constructMessages(readMessages);
- /* Dispatch the messages to the respective endpoints */
- IAsyncResult iar = dispatchMessages(endPoints, messages);
- List<byte[]> results = iar.multiget(2*DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
-
- for ( byte[] body : results )
- {
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
- ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
- Row row = response.row();
- rows.put(row.key(), row);
- }
- return rows;
- }
/**
* Read the data from one replica. If there is no reply, read the data from another. In the event we get
* the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
- * @param command the read to perform
+ * @param commands a set of commands to perform reads
* @return the row associated with command.key
* @throws Exception
*/
- private static Row weakReadRemote(ReadCommand command) throws IOException
+ private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException
{
- EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
- assert endPoint != null;
- Message message = command.makeReadMessage();
if (logger.isDebugEnabled())
- logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
- message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
- IAsyncResult iar = MessagingService.getMessagingInstance().sendRR(message, endPoint);
- byte[] body;
- try
+ logger.debug("weakreadlocal reading " + StringUtils.join(commands, ", "));
+
+ List<Row> rows = new ArrayList<Row>();
+ List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
+ int commandIndex = 0;
+
+ for (ReadCommand command: commands)
{
- body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ EndPoint endPoint = StorageService.instance().findSuitableEndPoint(command.key);
+ assert endPoint != null;
+ Message message = command.makeReadMessage();
+
+ if (logger.isDebugEnabled())
+ logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
+ message.addHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+ iars.add(MessagingService.getMessagingInstance().sendRR(message, endPoint));
}
- catch (TimeoutException e)
+
+ for (IAsyncResult iar: iars)
{
- throw new RuntimeException("error reading key " + command.key, e);
- // TODO retry to a different endpoint?
+ byte[] body;
+ try
+ {
+ body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (TimeoutException e)
+ {
+ throw new RuntimeException("error reading key " + commands.get(commandIndex).key, e);
+ // TODO retry to a different endpoint?
+ }
+ DataInputBuffer bufIn = new DataInputBuffer();
+ bufIn.reset(body, body.length);
+ ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
+ if (response.row() != null)
+ rows.add(response.row());
+ commandIndex++;
}
- DataInputBuffer bufIn = new DataInputBuffer();
- bufIn.reset(body, body.length);
- ReadResponse response = ReadResponse.serializer().deserialize(bufIn);
- return response.row();
+ return rows;
}
/**
* Performs the actual reading of a row out of the StorageService, fetching
* a specific set of column names from a given column family.
*/
- public static Row readProtocol(ReadCommand command, int consistency_level)
+ public static List<Row> readProtocol(List<ReadCommand> commands, int consistency_level)
throws IOException, TimeoutException, InvalidRequestException
{
long startTime = System.currentTimeMillis();
- Row row;
- EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
+ List<Row> rows = new ArrayList<Row>();
if (consistency_level == ConsistencyLevel.ONE)
{
- boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
- //TODO: Throw InvalidRequest if we're in bootstrap mode?
- if (foundLocal && !StorageService.instance().isBootstrapMode())
- {
- row = weakReadLocal(command);
- }
- else
+ List<ReadCommand> localCommands = new ArrayList<ReadCommand>();
+ List<ReadCommand> remoteCommands = new ArrayList<ReadCommand>();
+
+ for (ReadCommand command: commands)
{
- row = weakReadRemote(command);
+ EndPoint[] endpoints = StorageService.instance().getNStorageEndPoint(command.key);
+ boolean foundLocal = Arrays.asList(endpoints).contains(StorageService.getLocalStorageEndPoint());
+ //TODO: Throw InvalidRequest if we're in bootstrap mode?
+ if (foundLocal && !StorageService.instance().isBootstrapMode())
+ {
+ localCommands.add(command);
+ }
+ else
+ {
+ remoteCommands.add(command);
+ }
}
+ if (localCommands.size() > 0)
+ rows.addAll(weakReadLocal(localCommands));
+
+ if (remoteCommands.size() > 0)
+ rows.addAll(weakReadRemote(remoteCommands));
}
else
{
assert consistency_level == ConsistencyLevel.QUORUM;
- row = strongRead(command);
+ rows = strongRead(commands);
}
readStats.add(System.currentTimeMillis() - startTime);
- return row;
- }
-
- public static Map<String, Row> readProtocol(String[] keys, ReadCommand readCommand, StorageService.ConsistencyLevel consistencyLevel) throws Exception
- {
- Map<String, Row> rows = new HashMap<String, Row>();
- switch ( consistencyLevel )
- {
- case WEAK:
- rows = weakReadProtocol(keys, readCommand);
- break;
-
- case STRONG:
- rows = strongReadProtocol(keys, readCommand);
- break;
-
- default:
- rows = weakReadProtocol(keys, readCommand);
- break;
- }
- return rows;
- }
-
- /**
- * This is a multiget version of the above method.
- */
- public static Map<String, Row> strongReadProtocol(String[] keys, ReadCommand readCommand) throws IOException, TimeoutException
- {
- Map<String, Row> rows;
- // TODO: throw a thrift exception if we do not have N nodes
- Map<String, ReadCommand[]> readMessages = new HashMap<String, ReadCommand[]>();
- for (String key : keys )
- {
- ReadCommand[] readParameters = new ReadCommand[2];
- readParameters[0] = readCommand.copy();
- readParameters[1] = readCommand.copy();
- readParameters[1].setDigestQuery(true);
- readMessages.put(key, readParameters);
- }
- rows = doStrongReadProtocol(readMessages);
return rows;
}
@@ -418,80 +348,100 @@
* 7. else carry out read repair by getting data from all the nodes.
// 5. return success
*/
- private static Row strongRead(ReadCommand command) throws IOException, TimeoutException, InvalidRequestException
+ private static List<Row> strongRead(List<ReadCommand> commands) throws IOException, TimeoutException, InvalidRequestException
{
- // TODO: throw a thrift exception if we do not have N nodes
- assert !command.isDigestQuery();
- ReadCommand readMessageDigestOnly = command.copy();
- readMessageDigestOnly.setDigestQuery(true);
-
- Row row = null;
- Message message = command.makeReadMessage();
- Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
-
- IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
- QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
- DatabaseDescriptor.getQuorum(),
- readResponseResolver);
- EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
- List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
- /* Remove the local storage endpoint from the list. */
- endpointList.remove(dataPoint);
- EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
- Message messages[] = new Message[endpointList.size() + 1];
-
- /*
- * First message is sent to the node that will actually get
- * the data for us. The other two replicas are only sent a
- * digest query.
- */
- endPoints[0] = dataPoint;
- messages[0] = message;
- if (logger.isDebugEnabled())
- logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
- for (int i = 1; i < endPoints.length; i++)
- {
- EndPoint digestPoint = endpointList.get(i - 1);
- endPoints[i] = digestPoint;
- messages[i] = messageDigestOnly;
+ List<QuorumResponseHandler<Row>> quorumResponseHandlers = new ArrayList<QuorumResponseHandler<Row>>();
+ List<EndPoint[]> commandEndPoints = new ArrayList<EndPoint[]>();
+ List<Row> rows = new ArrayList<Row>();
+
+ int commandIndex = 0;
+
+ for (ReadCommand command: commands)
+ {
+ // TODO: throw a thrift exception if we do not have N nodes
+ assert !command.isDigestQuery();
+ ReadCommand readMessageDigestOnly = command.copy();
+ readMessageDigestOnly.setDigestQuery(true);
+
+ Message message = command.makeReadMessage();
+ Message messageDigestOnly = readMessageDigestOnly.makeReadMessage();
+
+ IResponseResolver<Row> readResponseResolver = new ReadResponseResolver();
+ QuorumResponseHandler<Row> quorumResponseHandler = new QuorumResponseHandler<Row>(
+ DatabaseDescriptor.getQuorum(),
+ readResponseResolver);
+ EndPoint dataPoint = StorageService.instance().findSuitableEndPoint(command.key);
+ List<EndPoint> endpointList = new ArrayList<EndPoint>(Arrays.asList(StorageService.instance().getNStorageEndPoint(command.key)));
+ /* Remove the local storage endpoint from the list. */
+ endpointList.remove(dataPoint);
+ EndPoint[] endPoints = new EndPoint[endpointList.size() + 1];
+ Message messages[] = new Message[endpointList.size() + 1];
+
+ /*
+ * First message is sent to the node that will actually get
+ * the data for us. The other two replicas are only sent a
+ * digest query.
+ */
+ endPoints[0] = dataPoint;
+ messages[0] = message;
if (logger.isDebugEnabled())
- logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
+ logger.debug("strongread reading data for " + command + " from " + message.getMessageId() + "@" + dataPoint);
+ for (int i = 1; i < endPoints.length; i++)
+ {
+ EndPoint digestPoint = endpointList.get(i - 1);
+ endPoints[i] = digestPoint;
+ messages[i] = messageDigestOnly;
+ if (logger.isDebugEnabled())
+ logger.debug("strongread reading digest for " + command + " from " + messageDigestOnly.getMessageId() + "@" + digestPoint);
+ }
+ MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+ quorumResponseHandlers.add(quorumResponseHandler);
+ commandEndPoints.add(endPoints);
}
- try
+ for (QuorumResponseHandler<Row> quorumResponseHandler: quorumResponseHandlers)
{
- MessagingService.getMessagingInstance().sendRR(messages, endPoints, quorumResponseHandler);
+ Row row = null;
+ ReadCommand command = commands.get(commandIndex);
+ try
+ {
+ long startTime2 = System.currentTimeMillis();
+ row = quorumResponseHandler.get();
+ if (row != null)
+ rows.add(row);
- long startTime2 = System.currentTimeMillis();
- row = quorumResponseHandler.get();
- if (logger.isDebugEnabled())
- logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
- }
- catch (DigestMismatchException ex)
- {
- if ( DatabaseDescriptor.getConsistencyCheck())
+ if (logger.isDebugEnabled())
+ logger.debug("quorumResponseHandler: " + (System.currentTimeMillis() - startTime2) + " ms.");
+ }
+ catch (DigestMismatchException ex)
{
- IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
- QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
- DatabaseDescriptor.getQuorum(),
- readResponseResolverRepair);
- logger.info("DigestMismatchException: " + command.key);
- Message messageRepair = command.makeReadMessage();
- MessagingService.getMessagingInstance().sendRR(messageRepair, endPoints,
- quorumResponseHandlerRepair);
- try
- {
- row = quorumResponseHandlerRepair.get();
- }
- catch (DigestMismatchException e)
+ if ( DatabaseDescriptor.getConsistencyCheck())
{
- // TODO should this be a thrift exception?
- throw new RuntimeException("digest mismatch reading key " + command.key, e);
+ IResponseResolver<Row> readResponseResolverRepair = new ReadResponseResolver();
+ QuorumResponseHandler<Row> quorumResponseHandlerRepair = new QuorumResponseHandler<Row>(
+ DatabaseDescriptor.getQuorum(),
+ readResponseResolverRepair);
+ logger.info("DigestMismatchException: " + command.key);
+ Message messageRepair = command.makeReadMessage();
+ MessagingService.getMessagingInstance().sendRR(messageRepair, commandEndPoints.get(commandIndex),
+ quorumResponseHandlerRepair);
+ try
+ {
+ row = quorumResponseHandlerRepair.get();
+ if (row != null)
+ rows.add(row);
+ }
+ catch (DigestMismatchException e)
+ {
+ // TODO should this be a thrift exception?
+ throw new RuntimeException("digest mismatch reading key " + command.key, e);
+ }
}
}
+ commandIndex++;
}
- return row;
+ return rows;
}
private static Map<String, Message[]> constructReplicaMessages(Map<String, ReadCommand[]> readMessages) throws IOException
@@ -557,69 +507,6 @@
MessagingService.getMessagingInstance().sendRR(msgList, epList, quorumResponseHandlers);
return quorumResponseHandlers;
}
-
- /**
- * This method performs the read from the replicas for a bunch of keys.
- * @param readMessages map of key --> readMessage[] of two entries where
- * the first entry is the readMessage for the data and the second
- * is the entry for the digest
- * @return map containing key ---> Row
- * @throws IOException, TimeoutException
- */
- private static Map<String, Row> doStrongReadProtocol(Map<String, ReadCommand[]> readMessages) throws IOException
- {
- Map<String, Row> rows = new HashMap<String, Row>();
- /* Construct the messages to be sent to the replicas */
- Map<String, Message[]> replicaMessages = constructReplicaMessages(readMessages);
- /* Dispatch the messages to the different replicas */
- MultiQuorumResponseHandler cb = dispatchMessagesMulti(readMessages, replicaMessages);
- try
- {
- Row[] rows2 = cb.get();
- for ( Row row : rows2 )
- {
- rows.put(row.key(), row);
- }
- }
- catch (TimeoutException e)
- {
- throw new RuntimeException("timeout reading keys " + StringUtils.join(rows.keySet(), ", "), e);
- }
- return rows;
- }
-
- /**
- * This version is used when results for multiple keys needs to be
- * retrieved.
- *
- * @return a mapping of key --> Row
- * @throws Exception
- */
- public static Map<String, Row> weakReadProtocol(String[] keys, ReadCommand readCommand) throws Exception
- {
- Row row = null;
- Map<String, ReadCommand> readMessages = new HashMap<String, ReadCommand>();
- for ( String key : keys )
- {
- ReadCommand readCmd = readCommand.copy();
- readMessages.put(key, readCmd);
- }
- /* Performs the multiget in parallel */
- Map<String, Row> rows = doReadProtocol(readMessages);
- /*
- * Do the consistency checks for the keys that are being queried
- * in the background.
- */
- for ( String key : keys )
- {
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove( StorageService.getLocalStorageEndPoint() );
- if ( endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, readMessages.get(key));
- }
- return rows;
- }
/*
* This function executes the read protocol locally and should be used only if consistency is not a concern.
@@ -627,25 +514,33 @@
* one of the other replicas (in the same data center if possible) till we get the data. In the event we get
* the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
*/
- private static Row weakReadLocal(ReadCommand command) throws IOException
+ private static List<Row> weakReadLocal(List<ReadCommand> commands) throws IOException
{
- if (logger.isDebugEnabled())
- logger.debug("weakreadlocal reading " + command);
- List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove(StorageService.getLocalStorageEndPoint());
- // TODO: throw a thrift exception if we do not have N nodes
-
- Table table = Table.open(command.table);
- Row row = command.getRow(table);
-
- /*
- * Do the consistency checks in the background and return the
- * non NULL row.
- */
- if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, command);
- return row;
+ List<Row> rows = new ArrayList<Row>();
+ for (ReadCommand command: commands)
+ {
+ List<EndPoint> endpoints = StorageService.instance().getNLiveStorageEndPoint(command.key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove(StorageService.getLocalStorageEndPoint());
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ if (logger.isDebugEnabled())
+ logger.debug("weakreadlocal reading " + command);
+
+ Table table = Table.open(command.table);
+ Row row = command.getRow(table);
+ if (row != null)
+ rows.add(row);
+ /*
+ * Do the consistency checks in the background and return the
+ * non NULL row.
+ */
+ if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, command);
+
+ }
+
+ return rows;
}
static List<String> getKeyRange(RangeCommand rawCommand) throws IOException
Modified: incubator/cassandra/trunk/test/system/test_server.py
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/test/system/test_server.py?rev=808589&r1=808588&r2=808589&view=diff
==============================================================================
--- incubator/cassandra/trunk/test/system/test_server.py (original)
+++ incubator/cassandra/trunk/test/system/test_server.py Thu Aug 27 20:07:12 2009
@@ -35,14 +35,22 @@
Column(_i64(6), 'value6', 0)])]
def _insert_simple(block=True):
+ return _insert_multi(['key1'], block)
+
+def _insert_batch(block):
+ return _insert_multi_batch(['key1'], block)
+
+def _insert_multi(keys, block=True):
if block:
consistencyLevel = ConsistencyLevel.ONE
else:
consistencyLevel = ConsistencyLevel.ZERO
- client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c1'), 'value1', 0, consistencyLevel)
- client.insert('Keyspace1', 'key1', ColumnPath('Standard1', column='c2'), 'value2', 0, consistencyLevel)
-def _insert_batch(block):
+ for key in keys:
+ client.insert('Keyspace1', key, ColumnPath('Standard1', column='c1'), 'value1', 0, consistencyLevel)
+ client.insert('Keyspace1', key, ColumnPath('Standard1', column='c2'), 'value2', 0, consistencyLevel)
+
+def _insert_multi_batch(keys, block):
cfmap = {'Standard1': [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS],
'Standard2': [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]}
if block:
@@ -50,12 +58,17 @@
else:
consistencyLevel = ConsistencyLevel.ZERO
- client.batch_insert('Keyspace1', BatchMutation(key='key1', cfmap=cfmap), consistencyLevel)
+ for key in keys:
+ client.batch_insert('Keyspace1', BatchMutation(key=key, cfmap=cfmap), consistencyLevel)
def _big_slice(keyspace, key, column_parent):
p = SlicePredicate(slice_range=SliceRange('', '', False, 1000))
return client.get_slice(keyspace, key, column_parent, p, ConsistencyLevel.ONE)
+def _big_multislice(keyspace, keys, column_parent):
+ p = SlicePredicate(slice_range=SliceRange('', '', False, 1000))
+ return client.multiget_slice(keyspace, keys, column_parent, p, ConsistencyLevel.ONE)
+
def _verify_batch():
_verify_simple()
L = [result.column
@@ -492,3 +505,59 @@
result = client.get_slice('Keyspace1', 'key1', ColumnParent('Super1', 'sc1'), p, ConsistencyLevel.ONE)
assert len(result) == 1
assert result[0].column.name == _i64(4)
+
+ def test_multiget(self):
+ """Insert multiple keys and retrieve them using the multiget interface"""
+
+ """Generate a list of 10 keys and insert them"""
+ num_keys = 10
+ keys = ['key'+str(i) for i in range(1, num_keys+1)]
+ _insert_multi(keys)
+
+ """Retrieve all 10 keys"""
+ rows = client.multiget('Keyspace1', keys, ColumnPath('Standard1', column='c1'), ConsistencyLevel.ONE)
+ keys1 = rows.keys().sort()
+ keys2 = keys.sort()
+
+ """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted"""
+ for key in keys:
+ assert rows.has_key(key) == True
+ assert rows[key] == ColumnOrSuperColumn(column=Column(timestamp=0, name='c1', value='value1'))
+
+ def test_multiget_slice(self):
+ """Insert multiple keys and retrieve them using the multiget_slice interface"""
+
+ """Generate a list of 10 keys and insert them"""
+ num_keys = 10
+ keys = ['key'+str(i) for i in range(1, num_keys+1)]
+ _insert_multi(keys)
+
+ """Retrieve all 10 key slices"""
+ rows = _big_multislice('Keyspace1', keys, ColumnParent('Standard1'))
+ keys1 = rows.keys().sort()
+ keys2 = keys.sort()
+
+ columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]
+ """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted"""
+ for key in keys:
+ assert rows.has_key(key) == True
+ assert columns == rows[key]
+
+ def test_multiget_count(self):
+ """Insert multiple keys and retrieve them using the multiget_count interface"""
+
+ """Generate a list of 10 keys and insert them"""
+ num_keys = 10
+ keys = ['key'+str(i) for i in range(1, num_keys+1)]
+ _insert_multi(keys)
+
+ """Retrieve all 10 key slices"""
+ rows = client.multiget_count('Keyspace1', keys, ColumnParent('Standard1'), ConsistencyLevel.ONE)
+ keys1 = rows.keys().sort()
+ keys2 = keys.sort()
+
+ columns = [ColumnOrSuperColumn(c) for c in _SIMPLE_COLUMNS]
+ """Validate if the returned rows have the keys requested and if the ColumnOrSuperColumn is what was inserted"""
+ for key in keys:
+ assert rows.has_key(key) == True
+ assert rows[key] == 2
\ No newline at end of file