You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/08/16 22:43:12 UTC
[1/2] git commit: run local range scans on the read stage patch by
jbellis; reviewed by vijay for CASSANDRA-3687
Updated Branches:
refs/heads/trunk fe784f58e -> 5577ff626
run local range scans on the read stage
patch by jbellis; reviewed by vijay for CASSANDRA-3687
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5577ff62
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5577ff62
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5577ff62
Branch: refs/heads/trunk
Commit: 5577ff626bb38d419a3540e0c0ccb1a9d8b8680f
Parents: 29fed1f
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Aug 16 15:43:02 2012 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Thu Aug 16 15:43:02 2012 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/AbstractRowResolver.java | 11 --
.../org/apache/cassandra/service/ReadCallback.java | 27 ++---
.../org/apache/cassandra/service/StorageProxy.java | 91 ++++++++-------
4 files changed, 59 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5a2848d..75de54e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2-dev
+ * run local range scans on the read stage (CASSANDRA-3687)
* clean up ioexceptions (CASSANDRA-2116)
* Introduce new json format with row level deletion (CASSANDRA-4054)
* remove redundant "name" column from schema_keyspaces (CASSANDRA-4433)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/AbstractRowResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractRowResolver.java b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
index b1647a2..beaf73c 100644
--- a/src/java/org/apache/cassandra/service/AbstractRowResolver.java
+++ b/src/java/org/apache/cassandra/service/AbstractRowResolver.java
@@ -51,17 +51,6 @@ public abstract class AbstractRowResolver implements IResponseResolver<ReadRespo
replies.add(message);
}
- /** hack so local reads don't force de/serialization of an extra real Message */
- public void injectPreProcessed(ReadResponse result)
- {
- MessageIn<ReadResponse> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
- result,
- Collections.<String, byte[]>emptyMap(),
- MessagingService.Verb.INTERNAL_RESPONSE,
- MessagingService.current_version);
- replies.add(message);
- }
-
public Iterable<MessageIn<ReadResponse>> getMessages()
{
return replies;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index a3d273c..bfd0044 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.service;
import java.io.IOException;
import java.net.InetAddress;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -165,32 +166,20 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
/**
* @return true if the message counts towards the blockfor threshold
- * TODO turn the Message into a response so we don't need two versions of this method
*/
protected boolean waitingFor(MessageIn message)
{
return true;
}
- /**
- * @return true if the response counts towards the blockfor threshold
- */
- protected boolean waitingFor(ReadResponse response)
+ public void response(TMessage result)
{
- return true;
- }
-
- public void response(ReadResponse result)
- {
- ((RowDigestResolver) resolver).injectPreProcessed(result);
- int n = waitingFor(result)
- ? received.incrementAndGet()
- : received.get();
- if (n >= blockfor && resolver.isDataPresent())
- {
- condition.signal();
- maybeResolveForRepair();
- }
+ MessageIn<TMessage> message = MessageIn.create(FBUtilities.getBroadcastAddress(),
+ result,
+ Collections.<String, byte[]>emptyMap(),
+ MessagingService.Verb.INTERNAL_RESPONSE,
+ MessagingService.current_version);
+ response(message);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5577ff62/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9d55739..1fb84cd 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -831,6 +831,30 @@ public class StorageProxy implements StorageProxyMBean
}
}
+ static class LocalRangeSliceRunnable extends DroppableRunnable
+ {
+ private final RangeSliceCommand command;
+ private final ReadCallback<RangeSliceReply, Iterable<Row>> handler;
+ private final long start = System.currentTimeMillis();
+
+ LocalRangeSliceRunnable(RangeSliceCommand command, ReadCallback<RangeSliceReply, Iterable<Row>> handler)
+ {
+ super(MessagingService.Verb.READ);
+ this.command = command;
+ this.handler = handler;
+ }
+
+ protected void runMayThrow() throws ExecutionException, InterruptedException
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("LocalReadRunnable reading " + command);
+
+ RangeSliceReply result = new RangeSliceReply(RangeSliceVerbHandler.executeLocally(command));
+ MessagingService.instance().addLatency(FBUtilities.getBroadcastAddress(), System.currentTimeMillis() - start);
+ handler.response(result);
+ }
+ }
+
static <TMessage, TResolved> ReadCallback<TMessage, TResolved> getReadCallback(IResponseResolver<TMessage, TResolved> resolver, IReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> endpoints)
{
if (consistencyLevel == ConsistencyLevel.LOCAL_QUORUM || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
@@ -868,33 +892,18 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddress> liveEndpoints = StorageService.instance.getLiveNaturalEndpoints(nodeCmd.keyspace, range.right);
DatabaseDescriptor.getEndpointSnitch().sortByProximity(FBUtilities.getBroadcastAddress(), liveEndpoints);
- if (consistency_level == ConsistencyLevel.ONE && !liveEndpoints.isEmpty() && liveEndpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
+ // collect replies and resolve according to consistency level
+ RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
+ ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
+ handler.assureSufficientLiveNodes();
+ resolver.setSources(handler.endpoints);
+ if (handler.endpoints.size() == 1 && handler.endpoints.get(0).equals(FBUtilities.getBroadcastAddress()))
{
- if (logger.isDebugEnabled())
- logger.debug("local range slice");
-
- try
- {
- rows.addAll(RangeSliceVerbHandler.executeLocally(nodeCmd));
- for (Row row : rows)
- columnsCount += row.getLiveColumnCount();
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e.getCause());
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ logger.debug("reading data locally");
+ StageManager.getStage(Stage.READ).execute(new LocalRangeSliceRunnable(nodeCmd, handler));
}
else
{
- // collect replies and resolve according to consistency level
- RangeSliceResponseResolver resolver = new RangeSliceResponseResolver(nodeCmd.keyspace);
- ReadCallback<RangeSliceReply, Iterable<Row>> handler = getReadCallback(resolver, nodeCmd, consistency_level, liveEndpoints);
- handler.assureSufficientLiveNodes();
- resolver.setSources(handler.endpoints);
MessageOut<RangeSliceCommand> message = nodeCmd.createMessage();
for (InetAddress endpoint : handler.endpoints)
{
@@ -902,27 +911,27 @@ public class StorageProxy implements StorageProxyMBean
if (logger.isDebugEnabled())
logger.debug("reading " + nodeCmd + " from " + endpoint);
}
+ }
- try
- {
- for (Row row : handler.get())
- {
- rows.add(row);
- columnsCount += row.getLiveColumnCount();
- logger.debug("range slices read {}", row.key);
- }
- FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
- }
- catch (TimeoutException ex)
- {
- if (logger.isDebugEnabled())
- logger.debug("Range slice timeout: {}", ex.toString());
- throw ex;
- }
- catch (DigestMismatchException e)
+ try
+ {
+ for (Row row : handler.get())
{
- throw new AssertionError(e); // no digests in range slices yet
+ rows.add(row);
+ columnsCount += row.getLiveColumnCount();
+ logger.debug("range slices read {}", row.key);
}
+ FBUtilities.waitOnFutures(resolver.repairResults, DatabaseDescriptor.getWriteRpcTimeout());
+ }
+ catch (TimeoutException ex)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug("Range slice timeout: {}", ex.toString());
+ throw ex;
+ }
+ catch (DigestMismatchException e)
+ {
+ throw new AssertionError(e); // no digests in range slices yet
}
// if we're done, great, otherwise, move to the next range