You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2015/01/20 22:59:48 UTC
[1/2] cassandra git commit: Clean up ARE sendXRequests() methods and
ReadCommand#setDigestQuery()
Repository: cassandra
Updated Branches:
refs/heads/trunk a792a7bae -> 9c6cf3cdf
Clean up ARE sendXRequests() methods and ReadCommand#setDigestQuery()
patch by Aleksey Yeschenko; reveiwed by Jonathan Ellis for
CASSANDRA-8647
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e2d140ff
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e2d140ff
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e2d140ff
Branch: refs/heads/trunk
Commit: e2d140fff752e757f40c812dcd1b7bed3ea5fed2
Parents: 2445d4d
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Jan 21 00:52:53 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jan 21 00:54:34 2015 +0300
----------------------------------------------------------------------
.../org/apache/cassandra/db/ReadCommand.java | 3 +-
.../db/RetriedSliceFromReadCommand.java | 4 +-
.../cassandra/db/SliceByNamesReadCommand.java | 8 +--
.../cassandra/db/SliceFromReadCommand.java | 8 +--
.../cassandra/service/AbstractReadExecutor.java | 56 +++++++++-----------
5 files changed, 31 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/ReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java
index 299693e..dedff6f 100644
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@ -89,9 +89,10 @@ public abstract class ReadCommand implements IReadCommand, Pageable
return isDigestQuery;
}
- public void setDigestQuery(boolean isDigestQuery)
+ public ReadCommand setIsDigestQuery(boolean isDigestQuery)
{
this.isDigestQuery = isDigestQuery;
+ return this;
}
public String getColumnFamilyName()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
index fe54917..41f5a50 100644
--- a/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/RetriedSliceFromReadCommand.java
@@ -38,9 +38,7 @@ public class RetriedSliceFromReadCommand extends SliceFromReadCommand
@Override
public ReadCommand copy()
{
- ReadCommand readCommand = new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount);
- readCommand.setDigestQuery(isDigestQuery());
- return readCommand;
+ return new RetriedSliceFromReadCommand(ksName, key, cfName, timestamp, filter, originalCount).setIsDigestQuery(isDigestQuery());
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
index b1829f3..22f795e 100644
--- a/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceByNamesReadCommand.java
@@ -44,9 +44,7 @@ public class SliceByNamesReadCommand extends ReadCommand
public ReadCommand copy()
{
- ReadCommand readCommand= new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter);
- readCommand.setDigestQuery(isDigestQuery());
- return readCommand;
+ return new SliceByNamesReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
}
public Row getRow(Keyspace keyspace)
@@ -97,9 +95,7 @@ class SliceByNamesReadCommandSerializer implements IVersionedSerializer<ReadComm
long timestamp = in.readLong();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
NamesQueryFilter filter = metadata.comparator.namesQueryFilterSerializer().deserialize(in, version);
- ReadCommand command = new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter);
- command.setDigestQuery(isDigest);
- return command;
+ return new SliceByNamesReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
}
public long serializedSize(ReadCommand cmd, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
index f06b9dc..2259f22 100644
--- a/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SliceFromReadCommand.java
@@ -48,9 +48,7 @@ public class SliceFromReadCommand extends ReadCommand
public ReadCommand copy()
{
- ReadCommand readCommand = new SliceFromReadCommand(ksName, key, cfName, timestamp, filter);
- readCommand.setDigestQuery(isDigestQuery());
- return readCommand;
+ return new SliceFromReadCommand(ksName, key, cfName, timestamp, filter).setIsDigestQuery(isDigestQuery());
}
public Row getRow(Keyspace keyspace)
@@ -151,9 +149,7 @@ class SliceFromReadCommandSerializer implements IVersionedSerializer<ReadCommand
long timestamp = in.readLong();
CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
SliceQueryFilter filter = metadata.comparator.sliceQueryFilterSerializer().deserialize(in, version);
- ReadCommand command = new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter);
- command.setDigestQuery(isDigest);
- return command;
+ return new SliceFromReadCommand(keyspaceName, key, cfName, timestamp, filter).setIsDigestQuery(isDigest);
}
public long serializedSize(ReadCommand cmd, int version)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/e2d140ff/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 2c3261f..0546e27 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -77,43 +77,38 @@ public abstract class AbstractReadExecutor
protected void makeDataRequests(Iterable<InetAddress> endpoints)
{
- boolean readLocal = false;
- for (InetAddress endpoint : endpoints)
- {
- if (isLocalRequest(endpoint))
- {
- readLocal = true;
- }
- else
- {
- logger.trace("reading data from {}", endpoint);
- MessagingService.instance().sendRR(command.createMessage(), endpoint, handler);
- }
- }
- if (readLocal)
- {
- logger.trace("reading data locally");
- StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
- }
+ makeRequests(command, endpoints);
}
protected void makeDigestRequests(Iterable<InetAddress> endpoints)
{
- ReadCommand digestCommand = command.copy();
- digestCommand.setDigestQuery(true);
- MessageOut<?> message = digestCommand.createMessage();
+ makeRequests(command.copy().setIsDigestQuery(true), endpoints);
+ }
+
+ private void makeRequests(ReadCommand readCommand, Iterable<InetAddress> endpoints)
+ {
+ MessageOut<ReadCommand> message = null;
+ boolean hasLocalEndpoint = false;
+
for (InetAddress endpoint : endpoints)
{
if (isLocalRequest(endpoint))
{
- logger.trace("reading digest locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- logger.trace("reading digest from {}", endpoint);
- MessagingService.instance().sendRR(message, endpoint, handler);
+ hasLocalEndpoint = true;
+ continue;
}
+
+ logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+ if (message == null)
+ message = readCommand.createMessage();
+ MessagingService.instance().sendRR(message, endpoint, handler);
+ }
+
+ // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
+ if (hasLocalEndpoint)
+ {
+ logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}
@@ -278,10 +273,7 @@ public abstract class AbstractReadExecutor
// Could be waiting on the data, or on enough digests.
ReadCommand retryCommand = command;
if (resolver.getData() != null)
- {
- retryCommand = command.copy();
- retryCommand.setDigestQuery(true);
- }
+ retryCommand = command.copy().setIsDigestQuery(true);
InetAddress extraReplica = Iterables.getLast(targetReplicas);
logger.trace("speculating read retry on {}", extraReplica);
[2/2] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/service/AbstractReadExecutor.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c6cf3cd
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c6cf3cd
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c6cf3cd
Branch: refs/heads/trunk
Commit: 9c6cf3cdf4267b4aebb0f432d0776314f3c40c8c
Parents: a792a7b e2d140f
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Jan 21 00:59:40 2015 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Jan 21 00:59:40 2015 +0300
----------------------------------------------------------------------
.../org/apache/cassandra/db/ReadCommand.java | 3 +-
.../db/RetriedSliceFromReadCommand.java | 4 +-
.../cassandra/db/SliceByNamesReadCommand.java | 8 +--
.../cassandra/db/SliceFromReadCommand.java | 8 +--
.../cassandra/service/AbstractReadExecutor.java | 56 +++++++++-----------
5 files changed, 31 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c6cf3cd/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index d76a2cc,0546e27..ec96d81
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@@ -107,14 -94,21 +95,21 @@@ public abstract class AbstractReadExecu
{
if (isLocalRequest(endpoint))
{
- logger.trace("reading digest locally");
- StageManager.getStage(Stage.READ).execute(new LocalReadRunnable(digestCommand, handler));
- }
- else
- {
- logger.trace("reading digest from {}", endpoint);
- MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+ hasLocalEndpoint = true;
+ continue;
}
+
+ logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint);
+ if (message == null)
+ message = readCommand.createMessage();
- MessagingService.instance().sendRR(message, endpoint, handler);
++ MessagingService.instance().sendRRWithFailure(message, endpoint, handler);
+ }
+
+ // We delay the local (potentially blocking) read till the end to avoid stalling remote requests.
+ if (hasLocalEndpoint)
+ {
+ logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data");
+ StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler));
}
}