You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2010/07/26 23:23:17 UTC
svn commit: r979449 - in /cassandra/trunk: ./
interface/thrift/gen-java/org/apache/cassandra/thrift/
src/java/org/apache/cassandra/service/
Author: jbellis
Date: Mon Jul 26 21:23:17 2010
New Revision: 979449
URL: http://svn.apache.org/viewvc?rev=979449&view=rev
Log:
merge from 0.6
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/CHANGES.txt
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java (props changed)
cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6:922689-979402
+/cassandra/branches/cassandra-0.6:922689-979441
/cassandra/trunk:978791
/incubator/cassandra/branches/cassandra-0.3:774578-796573
/incubator/cassandra/branches/cassandra-0.4:810145-834239,834349-834350
Modified: cassandra/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=979449&r1=979448&r2=979449&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Mon Jul 26 21:23:17 2010
@@ -65,6 +65,8 @@ dev
(CASSANDRA-1280, CASSANDRA-1047)
* log thread pool stats when GC is excessive (CASSANDRA-1275)
* remove gossip message size limit (CASSANDRA-1138)
+ * parallelize local and remote reads during multiget, and respect snitch
+ when determining whether to do local read for CL.ONE (CASSANDRA-1317)
0.6.3
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-979441
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/Cassandra.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Cassandra.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-979441
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/column_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/Column.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-979441
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/InvalidRequestException.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-979441
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:774578-796573
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/NotFoundException.java:810145-834239,834349-834350
Propchange: cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Mon Jul 26 21:23:17 2010
@@ -1,4 +1,4 @@
-/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-979402
+/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-979441
/cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:978791
/incubator/cassandra/branches/cassandra-0.3/interface/gen-java/org/apache/cassandra/service/superColumn_t.java:774578-792198
/incubator/cassandra/branches/cassandra-0.4/interface/gen-java/org/apache/cassandra/service/SuperColumn.java:810145-834239,834349-834350
Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java
URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java?rev=979449&r1=979448&r2=979449&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageProxy.java Mon Jul 26 21:23:17 2010
@@ -35,6 +35,7 @@ import org.apache.commons.lang.StringUti
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.DatabaseDescriptor;
@@ -51,6 +52,7 @@ import org.apache.cassandra.net.IAsyncRe
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.LatencyTracker;
@@ -284,88 +286,94 @@ public class StorageProxy implements Sto
}
/**
- * Read the data from one replica. When we get
- * the data we perform consistency checks and figure out if any repairs need to be done to the replicas.
- * @param commands a set of commands to perform reads
- * @return the row associated with command.key
- * @throws Exception
+ * Performs the actual reading of a row out of the StorageService, fetching
+ * a specific set of column names from a given column family.
*/
- private static List<Row> weakReadRemote(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
+ public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level)
+ throws IOException, UnavailableException, TimeoutException, InvalidRequestException
{
- if (logger.isDebugEnabled())
- logger.debug("weakreadremote reading " + StringUtils.join(commands, ", "));
-
- List<Row> rows = new ArrayList<Row>();
- List<IAsyncResult> iars = new ArrayList<IAsyncResult>();
+ if (StorageService.instance.isBootstrapMode())
+ throw new InvalidRequestException("This node cannot accept reads until it has bootstrapped");
+ long startTime = System.nanoTime();
- for (ReadCommand command: commands)
+ List<Row> rows;
+ if (consistency_level == ConsistencyLevel.ONE)
{
- InetAddress endpoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
- Message message = command.makeReadMessage();
-
- if (logger.isDebugEnabled())
- logger.debug("weakreadremote reading " + command + " from " + message.getMessageId() + "@" + endpoint);
- if (randomlyReadRepair(command))
- message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
- iars.add(MessagingService.instance.sendRR(message, endpoint));
+ rows = weakRead(commands);
}
-
- for (IAsyncResult iar: iars)
+ else
{
- byte[] body;
- body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
- ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
- ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
- if (response.row() != null)
- rows.add(response.row());
+ assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
+ rows = strongRead(commands, consistency_level);
}
+
+ readStats.addNano(System.nanoTime() - startTime);
return rows;
}
- /**
- * Performs the actual reading of a row out of the StorageService, fetching
- * a specific set of column names from a given column family.
- */
- public static List<Row> readProtocol(List<ReadCommand> commands, ConsistencyLevel consistency_level)
- throws IOException, UnavailableException, TimeoutException
+ private static List<Row> weakRead(List<ReadCommand> commands) throws IOException, UnavailableException, TimeoutException
{
- long startTime = System.nanoTime();
-
List<Row> rows = new ArrayList<Row>();
- if (consistency_level == ConsistencyLevel.ONE)
+ // send off all the commands asynchronously
+ List<Future<Object>> localFutures = null;
+ List<IAsyncResult> remoteResults = null;
+ for (ReadCommand command: commands)
{
- List<ReadCommand> localCommands = new ArrayList<ReadCommand>();
- List<ReadCommand> remoteCommands = new ArrayList<ReadCommand>();
+ InetAddress endPoint = StorageService.instance.findSuitableEndpoint(command.table, command.key);
+ if (endPoint.equals(FBUtilities.getLocalAddress()))
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("weakread reading " + command + " locally");
- for (ReadCommand command: commands)
+ if (localFutures == null)
+ localFutures = new ArrayList<Future<Object>>();
+ Callable<Object> callable = new weakReadLocalCallable(command);
+ localFutures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
+ }
+ else
{
- List<InetAddress> endpoints = StorageService.instance.getNaturalEndpoints(command.table, command.key);
- boolean foundLocal = endpoints.contains(FBUtilities.getLocalAddress());
- //TODO: Throw InvalidRequest if we're in bootstrap mode?
- if (foundLocal && !StorageService.instance.isBootstrapMode())
+ if (remoteResults == null)
+ remoteResults = new ArrayList<IAsyncResult>();
+ Message message = command.makeReadMessage();
+ if (logger.isDebugEnabled())
+ logger.debug("weakread reading " + command + " from " + message.getMessageId() + "@" + endPoint);
+ if (randomlyReadRepair(command))
+ message.setHeader(ReadCommand.DO_REPAIR, ReadCommand.DO_REPAIR.getBytes());
+ remoteResults.add(MessagingService.instance.sendRR(message, endPoint));
+ }
+ }
+
+ // wait for results
+ if (localFutures != null)
+ {
+ for (Future<Object> future : localFutures)
+ {
+ Row row;
+ try
{
- localCommands.add(command);
+ row = (Row) future.get();
}
- else
+ catch (Exception e)
{
- remoteCommands.add(command);
+ throw new RuntimeException(e);
}
+ rows.add(row);
}
- if (localCommands.size() > 0)
- rows.addAll(weakReadLocal(localCommands));
-
- if (remoteCommands.size() > 0)
- rows.addAll(weakReadRemote(remoteCommands));
}
- else
+ if (remoteResults != null)
{
- assert consistency_level.getValue() >= ConsistencyLevel.QUORUM.getValue();
- rows = strongRead(commands, consistency_level);
+ for (IAsyncResult iar: remoteResults)
+ {
+ byte[] body;
+ body = iar.get(DatabaseDescriptor.getRpcTimeout(), TimeUnit.MILLISECONDS);
+ ByteArrayInputStream bufIn = new ByteArrayInputStream(body);
+ ReadResponse response = ReadResponse.serializer().deserialize(new DataInputStream(bufIn));
+ if (response.row() != null)
+ rows.add(response.row());
+ }
}
- readStats.addNano(System.nanoTime() - startTime);
-
return rows;
}
@@ -467,31 +475,6 @@ public class StorageProxy implements Sto
/*
* This function executes the read protocol locally. Consistency checks are performed in the background.
*/
- private static List<Row> weakReadLocal(List<ReadCommand> commands)
- {
- List<Row> rows = new ArrayList<Row>();
- List<Future<Object>> futures = new ArrayList<Future<Object>>();
-
- for (ReadCommand command: commands)
- {
- Callable<Object> callable = new weakReadLocalCallable(command);
- futures.add(StageManager.getStage(StageManager.READ_STAGE).submit(callable));
- }
- for (Future<Object> future : futures)
- {
- Row row;
- try
- {
- row = (Row) future.get();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
- rows.add(row);
- }
- return rows;
- }
public static List<Row> getRangeSlice(RangeSliceCommand command, ConsistencyLevel consistency_level)
throws IOException, UnavailableException, TimeoutException