You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sn...@apache.org on 2015/06/28 10:40:27 UTC
[1/3] cassandra git commit: Improve trace messages for RR
Repository: cassandra
Updated Branches:
refs/heads/cassandra-2.2 2a4ab8716 -> 14d7a63b8
Improve trace messages for RR
patch by Robert Stupp; reviewed by Jason Brown for CASSANDRA-9479
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/353d4a05
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/353d4a05
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/353d4a05
Branch: refs/heads/cassandra-2.2
Commit: 353d4a052c866cb230e06e69e99d9c5c8c8d955c
Parents: f2db756
Author: Robert Stupp <sn...@snazy.de>
Authored: Sun Jun 28 10:24:34 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Sun Jun 28 10:24:34 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 2 +-
.../cassandra/net/OutboundTcpConnection.java | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 17 +++++++++++++++++
.../apache/cassandra/service/ReadCallback.java | 19 ++++++++++++++++++-
.../cassandra/service/RowDataResolver.java | 2 ++
6 files changed, 40 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 32f0873..6a137a3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.17
+ * Improve trace messages for RR (CASSANDRA-9479)
* Fix suboptimal secondary index selection when restricted
clustering column is also indexed (CASSANDRA-9631)
* (cqlsh) Add min_threshold to DTCS option autocomplete (CASSANDRA-9385)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index d570faf..ee6b87b 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -722,7 +722,7 @@ public final class MessagingService implements MessagingServiceMBean
{
TraceState state = Tracing.instance.initializeFromMessage(message);
if (state != null)
- state.trace("Message received from {}", message.from);
+ state.trace("{} message received from {}", message.verb, message.from);
Verb verb = message.verb;
message = SinkManager.processInboundMessage(message, id);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 5559df2..af61dd4 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -186,7 +186,7 @@ public class OutboundTcpConnection extends Thread
{
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState state = Tracing.instance.get(sessionId);
- String message = String.format("Sending message to %s", poolReference.endPoint());
+ String message = String.format("Sending %s message to %s", qm.message.verb, poolReference.endPoint());
// session may have already finished; see CASSANDRA-5668
if (state == null)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 3f57e73..2f2370d 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -43,6 +43,8 @@ import org.apache.cassandra.metrics.ReadRepairMetrics;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -61,12 +63,14 @@ public abstract class AbstractReadExecutor
protected final List<InetAddress> targetReplicas;
protected final RowDigestResolver resolver;
protected final ReadCallback<ReadResponse, Row> handler;
+ protected final TraceState traceState;
AbstractReadExecutor(ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
{
this.command = command;
this.targetReplicas = targetReplicas;
resolver = new RowDigestResolver(command.ksName, command.key);
+ traceState = Tracing.instance.get();
handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas);
}
@@ -81,11 +85,15 @@ public abstract class AbstractReadExecutor
{
if (isLocalRequest(endpoint))
{
+ if (traceState != null)
+ traceState.trace("reading data locally");
logger.trace("reading data locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
}
else
{
+ if (traceState != null)
+ traceState.trace("reading data from {}", endpoint);
logger.trace("reading data from {}", endpoint);
MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
}
@@ -101,11 +109,15 @@ public abstract class AbstractReadExecutor
{
if (isLocalRequest(endpoint))
{
+ if (traceState != null)
+ traceState.trace("reading digest locally");
logger.trace("reading digest locally");
StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
}
else
{
+ if (traceState != null)
+ traceState.trace("reading digest from {}", endpoint);
logger.trace("reading digest from {}", endpoint);
MessagingService.instance().sendRR(message, endpoint, handler);
}
@@ -158,7 +170,10 @@ public abstract class AbstractReadExecutor
return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
if (repairDecision != ReadRepairDecision.NONE)
+ {
+ Tracing.trace("Read-repair {}", repairDecision);
ReadRepairMetrics.attempted.mark();
+ }
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
@@ -279,6 +294,8 @@ public abstract class AbstractReadExecutor
}
InetAddress extraReplica = Iterables.getLast(targetReplicas);
+ if (traceState != null)
+ traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
speculated = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 150fabe..1315102 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -40,6 +40,8 @@ import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.db.ConsistencyLevel;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SimpleCondition;
@@ -99,6 +101,7 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
{
// Same as for writes, see AbstractWriteResponseHandler
ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent());
+ Tracing.trace("Read timeout: {}", ex.toString());
if (logger.isDebugEnabled())
logger.debug("Read timeout: {}", ex.toString());
throw ex;
@@ -119,7 +122,12 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
// kick off a background digest comparison if this is a result that (may have) arrived after
// the original resolve that get() kicks off as soon as the condition is signaled
if (blockfor < endpoints.size() && n == endpoints.size())
- StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner());
+ {
+ TraceState traceState = Tracing.instance.get();
+ if (traceState != null)
+ traceState.trace("Initiating read-repair");
+ StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState));
+ }
}
}
@@ -163,6 +171,13 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
private class AsyncRepairRunner implements Runnable
{
+ private final TraceState traceState;
+
+ public AsyncRepairRunner(TraceState traceState)
+ {
+ this.traceState = traceState;
+ }
+
public void run()
{
// If the resolver is a RowDigestResolver, we need to do a full data read if there is a mismatch.
@@ -176,6 +191,8 @@ public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessag
{
assert resolver instanceof RowDigestResolver;
+ if (traceState != null)
+ traceState.trace("Digest mismatch: {}", e.toString());
if (logger.isDebugEnabled())
logger.debug("Digest mismatch:", e);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/353d4a05/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/RowDataResolver.java b/src/java/org/apache/cassandra/service/RowDataResolver.java
index 00f8753..bf4afbe 100644
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.filter.IDiskAtomFilter;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.net.*;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.FBUtilities;
@@ -120,6 +121,7 @@ public class RowDataResolver extends AbstractRowResolver
MessageOut repairMessage;
// use a separate verb here because we don't want these to be get the white glove hint-
// on-timeout behavior that a "real" mutation gets
+ Tracing.trace("Sending read-repair-mutation to {}", endpoints.get(i));
repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
}
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into
cassandra-2.2
Posted by sn...@apache.org.
Merge branch 'cassandra-2.1' into cassandra-2.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/14d7a63b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/14d7a63b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/14d7a63b
Branch: refs/heads/cassandra-2.2
Commit: 14d7a63b8a29b15831d035182d12cfacc7518687
Parents: 2a4ab87 8a56868
Author: Robert Stupp <sn...@snazy.de>
Authored: Sun Jun 28 10:36:54 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Sun Jun 28 10:36:54 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +++
.../apache/cassandra/net/MessagingService.java | 2 +-
.../cassandra/net/OutboundTcpConnection.java | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 12 ++++++++++++
.../apache/cassandra/service/ReadCallback.java | 20 ++++++++++++++++++--
.../cassandra/service/RowDataResolver.java | 2 ++
6 files changed, 37 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d7a63b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 811e955,3e4fd36..8f4f752
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,14 +1,33 @@@
-2.1.8
+2.2
+ * Allow JMX over SSL directly from nodetool (CASSANDRA-9090)
+ * Update cqlsh for UDFs (CASSANDRA-7556)
+ * Change Windows kernel default timer resolution (CASSANDRA-9634)
+ * Deprected sstable2json and json2sstable (CASSANDRA-9618)
+ * Allow native functions in user-defined aggregates (CASSANDRA-9542)
+ * Don't repair system_distributed by default (CASSANDRA-9621)
+ * Fix mixing min, max, and count aggregates for blob type (CASSANRA-9622)
+ * Rename class for DATE type in Java driver (CASSANDRA-9563)
+ * Duplicate compilation of UDFs on coordinator (CASSANDRA-9475)
+ * Fix connection leak in CqlRecordWriter (CASSANDRA-9576)
+ * Mlockall before opening system sstables & remove boot_without_jna option (CASSANDRA-9573)
+ * Add functions to convert timeuuid to date or time, deprecate dateOf and unixTimestampOf (CASSANDRA-9229)
+ * Make sure we cancel non-compacting sstables from LifecycleTransaction (CASSANDRA-9566)
+ * Fix deprecated repair JMX API (CASSANDRA-9570)
+ * Add logback metrics (CASSANDRA-9378)
+ * Update and refactor ant test/test-compression to run the tests in parallel (CASSANDRA-9583)
+Merged from 2.1:
* Fix IndexOutOfBoundsException when inserting tuple with too many
elements using the string literal notation (CASSANDRA-9559)
- * Allow JMX over SSL directly from nodetool (CASSANDRA-9090)
- * Fix incorrect result for IN queries where column not found (CASSANDRA-9540)
* Enable describe on indices (CASSANDRA-7814)
+ * Fix incorrect result for IN queries where column not found (CASSANDRA-9540)
* ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637)
+ * Fix bug in cardinality check when compacting (CASSANDRA-9580)
+ * Fix memory leak in Ref due to ConcurrentLinkedQueue.remove() behaviour (CASSANDRA-9549)
+ * Make rebuild only run one at a time (CASSANDRA-9119)
Merged from 2.0
+ * Improve trace messages for RR (CASSANDRA-9479)
+ * Fix suboptimal secondary index selection when restricted
+ clustering column is also indexed (CASSANDRA-9631)
* (cqlsh) Add min_threshold to DTCS option autocomplete (CASSANDRA-9385)
* Fix error message when attempting to create an index on a column
in a COMPACT STORAGE table with clustering columns (CASSANDRA-9527)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d7a63b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 293a27c,1820c5c..83bc337
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -745,12 -728,15 +745,12 @@@ public final class MessagingService imp
{
TraceState state = Tracing.instance.initializeFromMessage(message);
if (state != null)
- state.trace("Message received from {}", message.from);
+ state.trace("{} message received from {}", message.verb, message.from);
- Verb verb = message.verb;
- message = SinkManager.processInboundMessage(message, id);
- if (message == null)
- {
- incrementRejectedMessages(verb);
- return;
- }
+ // message sinks are a testing hook
+ for (IMessageSink ms : messageSinks)
+ if (!ms.allowIncomingMessage(message, id))
+ return;
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d7a63b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d7a63b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index ec96d81,2d02e34..3aab12f
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@@ -67,7 -69,8 +70,8 @@@ public abstract class AbstractReadExecu
{
this.command = command;
this.targetReplicas = targetReplicas;
- resolver = new RowDigestResolver(command.ksName, command.key);
+ resolver = new RowDigestResolver(command.ksName, command.key, targetReplicas.size());
+ traceState = Tracing.instance.get();
handler = new ReadCallback<>(resolver, consistencyLevel, command, targetReplicas);
}
@@@ -153,8 -159,16 +160,11 @@@
// Throw UAE early if we don't have enough replicas.
consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
- // Fat client. Speculating read executors need access to cfs metrics and sampled latency, and fat clients
- // can't provide that. So, for now, fat clients will always use NeverSpeculatingReadExecutor.
- if (StorageService.instance.isClientMode())
- return new NeverSpeculatingReadExecutor(command, consistencyLevel, targetReplicas);
-
if (repairDecision != ReadRepairDecision.NONE)
+ {
+ Tracing.trace("Read-repair {}", repairDecision);
ReadRepairMetrics.attempted.mark();
+ }
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.cfName);
RetryType retryType = cfs.metadata.getSpeculativeRetry().type;
@@@ -272,8 -286,10 +282,10 @@@
retryCommand = command.copy().setIsDigestQuery(true);
InetAddress extraReplica = Iterables.getLast(targetReplicas);
+ if (traceState != null)
+ traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
- MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
+ MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler);
speculated = true;
cfs.metric.speculativeRetries.inc();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d7a63b/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/14d7a63b/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by sn...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/8a56868b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/8a56868b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/8a56868b
Branch: refs/heads/cassandra-2.2
Commit: 8a56868bcaa7d58c907410a1821e83ada72ee0a9
Parents: 2c58581 353d4a0
Author: Robert Stupp <sn...@snazy.de>
Authored: Sun Jun 28 10:27:20 2015 +0200
Committer: Robert Stupp <sn...@snazy.de>
Committed: Sun Jun 28 10:33:59 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/net/MessagingService.java | 2 +-
.../cassandra/net/OutboundTcpConnection.java | 2 +-
.../cassandra/service/AbstractReadExecutor.java | 12 ++++++++++++
.../apache/cassandra/service/ReadCallback.java | 20 ++++++++++++++++++--
.../cassandra/service/RowDataResolver.java | 2 ++
6 files changed, 35 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 0b0cf83,6a137a3..3e4fd36
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
-2.0.17
+2.1.8
+ * Fix IndexOutOfBoundsException when inserting tuple with too many
+ elements using the string literal notation (CASSANDRA-9559)
+ * Allow JMX over SSL directly from nodetool (CASSANDRA-9090)
+ * Fix incorrect result for IN queries where column not found (CASSANDRA-9540)
+ * Enable describe on indices (CASSANDRA-7814)
+ * ColumnFamilyStore.selectAndReference may block during compaction (CASSANDRA-9637)
+Merged from 2.0
+ * Improve trace messages for RR (CASSANDRA-9479)
* Fix suboptimal secondary index selection when restricted
clustering column is also indexed (CASSANDRA-9631)
* (cqlsh) Add min_threshold to DTCS option autocomplete (CASSANDRA-9385)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 0546e27,2f2370d..2d02e34
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@@ -77,7 -81,23 +81,8 @@@ public abstract class AbstractReadExecu
protected void makeDataRequests(Iterable<InetAddress> endpoints)
{
- for (InetAddress endpoint : endpoints)
- {
- if (isLocalRequest(endpoint))
- {
- if (traceState != null)
- traceState.trace("reading data locally");
- logger.trace("reading data locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(command, handler));
- }
- else
- {
- if (traceState != null)
- traceState.trace("reading data from {}", endpoint);
- logger.trace("reading data from {}", endpoint);
- MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
- }
- }
+ makeRequests(command, endpoints);
++
}
protected void makeDigestRequests(Iterable<InetAddress> endpoints)
@@@ -94,21 -109,18 +99,23 @@@
{
if (isLocalRequest(endpoint))
{
- if (traceState != null)
- traceState.trace("reading digest locally");
- logger.trace("reading digest locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- if (traceState != null)
- traceState.trace("reading digest from {}", endpoint);
- logger.trace("reading digest from {}", endpoint);
- MessagingService.instance().sendRR(message, endpoint, handler);
+ hasLocalEndpoint = true;
+ continue;
}
+
++ if (traceState != null)
++ traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+ logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+ if (message == null)
+ message = readCommand.createMessage();
+ MessagingService.instance().sendRR(message, endpoint, handler);
+ }
+
+ // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
+ if (hasLocalEndpoint)
+ {
+ logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}
@@@ -273,9 -288,14 +283,11 @@@
// Could be waiting on the data, or on enough digests.
ReadCommand retryCommand = command;
if (resolver.getData() != null)
- {
- retryCommand = command.copy();
- retryCommand.setDigestQuery(true);
- }
+ retryCommand = command.copy().setIsDigestQuery(true);
InetAddress extraReplica = Iterables.getLast(targetReplicas);
+ if (traceState != null)
+ traceState.trace("speculating read retry on {}", extraReplica);
logger.trace("speculating read retry on {}", extraReplica);
MessagingService.instance().sendRR(retryCommand.createMessage(), extraReplica, handler);
speculated = true;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ReadCallback.java
index 29eaadf,1315102..cf9be55
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@@ -40,8 -40,10 +40,10 @@@ import org.apache.cassandra.net.Message
import org.apache.cassandra.net.MessageOut;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.db.ConsistencyLevel;
+ import org.apache.cassandra.tracing.TraceState;
+ import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
-import org.apache.cassandra.utils.SimpleCondition;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
public class ReadCallback<TMessage, TResolved> implements IAsyncCallback<TMessage>
{
@@@ -100,8 -100,8 +102,8 @@@
if (!await(command.getTimeout(), TimeUnit.MILLISECONDS))
{
// Same as for writes, see AbstractWriteResponseHandler
- ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received.get(), blockfor, resolver.isDataPresent());
+ ReadTimeoutException ex = new ReadTimeoutException(consistencyLevel, received, blockfor, resolver.isDataPresent());
-
+ Tracing.trace("Read timeout: {}", ex.toString());
if (logger.isDebugEnabled())
logger.debug("Read timeout: {}", ex.toString());
throw ex;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/8a56868b/src/java/org/apache/cassandra/service/RowDataResolver.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/RowDataResolver.java
index e92dad7,bf4afbe..394a4c4
--- a/src/java/org/apache/cassandra/service/RowDataResolver.java
+++ b/src/java/org/apache/cassandra/service/RowDataResolver.java
@@@ -116,12 -116,14 +117,13 @@@ public class RowDataResolver extends Ab
if (diffCf == null) // no repair needs to happen
continue;
- // create and send the row mutation message based on the diff
- RowMutation rowMutation = new RowMutation(keyspaceName, key.key, diffCf);
- MessageOut repairMessage;
+ // create and send the mutation message based on the diff
+ Mutation mutation = new Mutation(keyspaceName, key.getKey(), diffCf);
// use a separate verb here because we don't want these to be get the white glove hint-
// on-timeout behavior that a "real" mutation gets
+ Tracing.trace("Sending read-repair-mutation to {}", endpoints.get(i));
- repairMessage = rowMutation.createMessage(MessagingService.Verb.READ_REPAIR);
- results.add(MessagingService.instance().sendRR(repairMessage, endpoints.get(i)));
+ results.add(MessagingService.instance().sendRR(mutation.createMessage(MessagingService.Verb.READ_REPAIR),
+ endpoints.get(i)));
}
return results;