You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by go...@apache.org on 2009/11/14 22:05:57 UTC
svn commit: r836264 -
/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Author: goffinet
Date: Sat Nov 14 21:05:56 2009
New Revision: 836264
URL: http://svn.apache.org/viewvc?rev=836264&view=rev
Log:
Fixed multi-get to put localCommands onto the StorageService.readStage when a node gets commands to process, instead of running them serially.
patch by goffinet, minor cleanup and review by jbellis for CASSANDRA-555
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=836264&r1=836263&r2=836264&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Sat Nov 14 21:05:56 2009
@@ -21,6 +21,8 @@
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
import java.lang.management.ManagementFactory;
import org.apache.commons.lang.StringUtils;
@@ -37,6 +39,7 @@
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.concurrent.StageManager;
import org.apache.log4j.Logger;
@@ -253,7 +256,7 @@
private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException, UnavailableException
{
if (logger.isDebugEnabled())
- logger.debug("weakreadlocal reading " + StringUtils.join(commands, ", "));
+ logger.debug("weakreadremote reading " + StringUtils.join(commands, ", "));
List<Row> rows = new ArrayList<Row>();
List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
@@ -449,29 +452,26 @@
private static List<Row> weakReadLocal(List<ReadCommand> commands) throws IOException
{
List<Row> rows = new ArrayList<Row>();
+ List<Future<Object>> futures = new ArrayList<Future<Object>>();
+
for (ReadCommand command: commands)
{
- List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
- /* Remove the local storage endpoint from the list. */
- endpoints.remove(FBUtilities.getLocalAddress());
- // TODO: throw a thrift exception if we do not have N nodes
-
- if (logger.isDebugEnabled())
- logger.debug("weakreadlocal reading " + command);
-
- Table table = Table.open(command.table);
- Row row = command.getRow(table);
- if (row != null)
- rows.add(row);
- /*
- * Do the consistency checks in the background and return the
- * non NULL row.
- */
- if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
- StorageService.instance().doConsistencyCheck(row, endpoints, command);
-
+ Callable<Object> callable = new weakReadLocalCallable(command);
+ futures.add(StageManager.getStage(StorageService.readStage_).execute(callable));
+ }
+ for (Future<Object> future : futures)
+ {
+ Row row;
+ try
+ {
+ row = (Row) future.get();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ rows.add(row);
}
-
return rows;
}
@@ -608,4 +608,34 @@
{
return writeStats.size();
}
+
+ static class weakReadLocalCallable implements Callable<Object>
+ {
+ private ReadCommand command;
+
+ weakReadLocalCallable(ReadCommand command)
+ {
+ this.command = command;
+ }
+
+ public Object call() throws IOException
+ {
+ List<InetAddress> endpoints = StorageService.instance().getLiveNaturalEndpoints(command.key);
+ /* Remove the local storage endpoint from the list. */
+ endpoints.remove(FBUtilities.getLocalAddress());
+ // TODO: throw a thrift exception if we do not have N nodes
+
+ if (logger.isDebugEnabled())
+ logger.debug("weakreadlocal reading " + command);
+
+ Table table = Table.open(command.table);
+ Row row = command.getRow(table);
+
+ // Do the consistency checks in the background and return the non NULL row
+ if (endpoints.size() > 0 && DatabaseDescriptor.getConsistencyCheck())
+ StorageService.instance().doConsistencyCheck(row, endpoints, command);
+
+ return row;
+ }
+ }
}