You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/26 23:05:25 UTC
svn commit: r979441 - in /cassandra/branches/cassandra-0.6: CHANGES.txt
src/java/org/apache/cassandra/service/StorageProxy.java
Author: jbellis
Date: Mon Jul 26 21:05:24 2010
New Revision: 979441
URL: http://svn.apache.org/viewvc?rev=979441&view=rev
Log:
parallelize local and remote reads during multiget, and respect snitch when determining whether to do local read for CL.ONE. patch by jbellis; tested by Tupshin Harper for CASSANDRA-1317
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=979441&r1=979440&r2=979441&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Mon Jul 26 21:05:24 2010
@@ -20,6 +20,8 @@
* add ack to Binary write verb and update CassandraBulkLoader
to wait for acks for each row (CASSANDRA-1093)
* remove gossip message size limit (CASSANDRA-1138)
+ * parallelize local and remote reads during multiget, and respect snitch
+ when determining whether to do local read for CL.ONE (CASSANDRA-1317)
0.6.3
Modified: cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java?rev=979441&r1=979440&r2=979441&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jul 26 21:05:24 2010
@@ -36,6 +36,7 @@ import org.apache.commons.lang.StringUti
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.Multimap;
+
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
@@ -49,10 +50,10 @@ import org.apache.cassandra.net.IAsyncRe
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
-import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -335,88 +336,94 @@ public class StorageProxy implements Sto
}
/**
- * Read the data from one replica. When we get
- * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
- * @param commands a set of commands to perform reads
- * @return the row associated with command.key
- * @throws Exception
+ * Performs the actual reading of a row out of the StorageService, fetching
+ * a specific set of column names from a given column family.
*/
- private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
+ public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level)
+ throws IOException, UnavailableException, TimeoutException, InvalidRequestException
{
- if (logger.isDebugEnabled())
- logger.debug("weakreadremote reading " + StringUtils.join(commands, ", "));
-
- List<Row> rows = new ArrayList<Row>();
- List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
+ if (StorageService.instance.isBootstrapMode())
+ throw new InvalidRequestException("This node cannot accept reads until it has bootstrapped");
+ long startTime = System.nanoTime();
- for (ReadCommand command: commands)
+ List<Row> rows;
+ if (consistency_level == ConsistencyLevel.ONE)
{
- InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
- Message message = command.makeReadMessage();
-
- if (logger.isDebugEnabled())
- logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endPoint);
- if (DatabaseDescriptor.getConsistencyCheck())
- message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
- iars.add(MessagingService.instance.sendRR(message, endPoint));
+ rows = weakRead(commands);
}
-
- for (IAsyncResult iar: iars)
+ else
{
- byte[] body;
- body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (response.row() != null)
- rows.add(response.row());
+ assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
+ rows = strongRead(commands, consistency_level);
}
+
+ readStats.addNano(System.nanoTime() - startTime);
return rows;
}
- /**
- * Performs the actual reading of a row out of the StorageService, fetching
- * a specific set of column names from a given column family.
- */
- public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level)
- throws IOException, UnavailableException, TimeoutException
+ private static List<Row> weakRead(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
{
- long startTime = System.nanoTime();
-
List<Row> rows = new ArrayList<Row>();
- if (consistency_level == ConsistencyLevel.ONE)
+ // send off all the commands asynchronously
+ List<Future<Object>> localFutures = null;
+ List<IAsyncResult> remoteResults = null;
+ for (ReadCommand command: commands)
{
- List<ReadCommand> localCommands = new ArrayList<ReadCommand>();
- List<ReadCommand> remoteCommands = new ArrayList<ReadCommand>();
+ InetAddress endPoint = StorageService.instance.findSuitableEndPoint(command.table, command.key);
+ if (endPoint.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("weakread reading " + command + " locally");
- for (ReadCommand command: commands)
+ if (localFutures == null)
+ localFutures = new ArrayList<Future<Object>>();
+ Callable<Object> callable = new weakReadLocalCallable(command);
+ localFutures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
+ }
+ else
{
- List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.table, command.key);
- boolean foundLocal = endpoints.contains(FBUtilities.getLocalAddress());
- //TODO: Throw InvalidRequest if we're in bootstrap mode?
- if (foundLocal && !StorageService.instance.isBootstrapMode())
+ if (remoteResults == null)
+ remoteResults = new ArrayList<IAsyncResult>();
+ Message message = command.makeReadMessage();
+ if (logger.isDebugEnabled())
+ logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint);
+ if (DatabaseDescriptor.getConsistencyCheck())
+ message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+ remoteResults.add(MessagingService.instance.sendRR(message, endPoint));
+ }
+ }
+
+ // wait for results
+ if (localFutures != null)
+ {
+ for (Future<Object> future : localFutures)
+ {
+ Row row;
+ try
{
- localCommands.add(command);
+ row = (Row) future.get();
}
- else
+ catch (Exception e)
{
- remoteCommands.add(command);
+ throw new RuntimeException(e);
}
+ rows.add(row);
}
- if (localCommands.size() > 0)
- rows.addAll(weakReadLocal(localCommands));
-
- if (remoteCommands.size() > 0)
- rows.addAll(weakReadRemote(remoteCommands));
}
- else
+ if (remoteResults != null)
{
- assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
- rows = strongRead(commands, consistency_level);
+ for (IAsyncResult iar: remoteResults)
+ {
+ byte[] body;
+ body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ if (response.row() != null)
+ rows.add(response.row());
+ }
}
- readStats.addNano(System.nanoTime() - startTime);
-
return rows;
}
@@ -521,31 +528,6 @@ public class StorageProxy implements Sto
/*
* This function executes the read protocol locally. Consistency checks are performed in the background.
*/
- private static List<Row> weakReadLocal(List<ReadCommand> commands)
- {
- List<Row> rows = new ArrayList<Row>();
- List<Future<Object>> futures = new ArrayList<Future<Object>>();
-
- for (ReadCommand command: commands)
- {
- Callable<Object> callable = new weakReadLocalCallable(command);
- futures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
- }
- for (Future<Object> future : futures)
- {
- Row row;
- try
- {
- row = (Row) future.get();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- rows.add(row);
- }
- return rows;
- }
public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
throws IOException, UnavailableException, TimeoutException