You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/08/16 19:35:54 UTC
[2/3] cassandra git commit: Count entire coordinated request against
timeout
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b1b7b10..2309e87 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -35,9 +35,10 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
ConsistencyLevel consistencyLevel,
Keyspace keyspace,
Runnable callback,
- WriteType writeType)
+ WriteType writeType,
+ long queryStartNanoTime)
{
- super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType);
+ super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType, queryStartNanoTime);
assert consistencyLevel.isDatacenterLocal();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 47eacdf..3f1ff3c 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -52,7 +52,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
public final ResponseResolver resolver;
private final SimpleCondition condition = new SimpleCondition();
- private final long start;
+ private final long queryStartNanoTime;
final int blockfor;
final List<InetAddress> endpoints;
private final ReadCommand command;
@@ -69,24 +69,25 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
/**
* Constructor when response count has to be calculated and blocked for.
*/
- public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints)
+ public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints, long queryStartNanoTime)
{
this(resolver,
consistencyLevel,
consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)),
command,
Keyspace.open(command.metadata().ksName),
- filteredEndpoints);
+ filteredEndpoints,
+ queryStartNanoTime);
}
- public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints)
+ public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints, long queryStartNanoTime)
{
this.command = command;
this.keyspace = keyspace;
this.blockfor = blockfor;
this.consistencyLevel = consistencyLevel;
this.resolver = resolver;
- this.start = System.nanoTime();
+ this.queryStartNanoTime = queryStartNanoTime;
this.endpoints = endpoints;
// we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
@@ -97,7 +98,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
public boolean await(long timePastStart, TimeUnit unit)
{
- long time = unit.toNanos(timePastStart) - (System.nanoTime() - start);
+ long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime);
try
{
return condition.await(time, TimeUnit.NANOSECONDS);
@@ -138,7 +139,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
if (logger.isTraceEnabled())
- logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime));
return result;
}
@@ -163,7 +164,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
TraceState traceState = Tracing.instance.get();
if (traceState != null)
traceState.trace("Initiating read-repair");
- StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState));
+ StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState, queryStartNanoTime));
}
}
}
@@ -210,10 +211,12 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
private class AsyncRepairRunner implements Runnable
{
private final TraceState traceState;
+ private final long queryStartNanoTime;
- public AsyncRepairRunner(TraceState traceState)
+ public AsyncRepairRunner(TraceState traceState, long queryStartNanoTime)
{
this.traceState = traceState;
+ this.queryStartNanoTime = queryStartNanoTime;
}
public void run()
@@ -236,7 +239,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
ReadRepairMetrics.repairedBackground.mark();
- final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size());
+ final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size(), queryStartNanoTime);
AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
for (InetAddress endpoint : endpoints)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9283c04..9bf90dc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -223,10 +223,11 @@ public class StorageProxy implements StorageProxyMBean
CASRequest request,
ConsistencyLevel consistencyForPaxos,
ConsistencyLevel consistencyForCommit,
- ClientState state)
+ ClientState state,
+ long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
{
- final long start = System.nanoTime();
+ final long startTimeForMetrics = System.nanoTime();
int contentions = 0;
try
{
@@ -236,14 +237,14 @@ public class StorageProxy implements StorageProxyMBean
CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
- while (System.nanoTime() - start < timeout)
+ while (System.nanoTime() - queryStartNanoTime < timeout)
{
// for simplicity, we'll do a single liveness check at the start of each attempt
Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
List<InetAddress> liveEndpoints = p.left;
int requiredParticipants = p.right;
- final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
+ final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
final UUID ballot = pair.left;
contentions += pair.right;
@@ -253,7 +254,7 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
FilteredPartition current;
- try (RowIterator rowIter = readOne(readCommand, readConsistency))
+ try (RowIterator rowIter = readOne(readCommand, readConsistency, queryStartNanoTime))
{
current = FilteredPartition.create(rowIter);
}
@@ -281,9 +282,9 @@ public class StorageProxy implements StorageProxyMBean
Commit proposal = Commit.newProposal(ballot, updates);
Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
- if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
+ if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
{
- commitPaxos(proposal, consistencyForCommit, true);
+ commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
Tracing.trace("CAS successful");
return null;
}
@@ -318,7 +319,7 @@ public class StorageProxy implements StorageProxyMBean
{
if(contentions > 0)
casWriteMetrics.contention.update(contentions);
- final long latency = System.nanoTime() - start;
+ final long latency = System.nanoTime() - startTimeForMetrics;
casWriteMetrics.addNano(latency);
writeMetricsMap.get(consistencyForPaxos).addNano(latency);
}
@@ -373,7 +374,7 @@ public class StorageProxy implements StorageProxyMBean
* @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of
* nodes have seen the mostRecentCommit. Otherwise, return null.
*/
- private static Pair<UUID, Integer> beginAndRepairPaxos(long start,
+ private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime,
DecoratedKey key,
CFMetaData metadata,
List<InetAddress> liveEndpoints,
@@ -388,7 +389,7 @@ public class StorageProxy implements StorageProxyMBean
PrepareCallback summary = null;
int contentions = 0;
- while (System.nanoTime() - start < timeout)
+ while (System.nanoTime() - queryStartNanoTime < timeout)
{
// We want a timestamp that is guaranteed to be unique for that node (so that the ballot is globally unique), but if we've got a prepare rejected
// already we also want to make sure we pick a timestamp that has a chance to be promised, i.e. one that is greater that the most recently known
@@ -403,7 +404,7 @@ public class StorageProxy implements StorageProxyMBean
// prepare
Tracing.trace("Preparing {}", ballot);
Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
- summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos);
+ summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos, queryStartNanoTime);
if (!summary.promised)
{
Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
@@ -426,11 +427,11 @@ public class StorageProxy implements StorageProxyMBean
else
casReadMetrics.unfinishedCommit.inc();
Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
- if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos))
+ if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos, queryStartNanoTime))
{
try
{
- commitPaxos(refreshedInProgress, consistencyForCommit, false);
+ commitPaxos(refreshedInProgress, consistencyForCommit, false, queryStartNanoTime);
}
catch (WriteTimeoutException e)
{
@@ -481,10 +482,10 @@ public class StorageProxy implements StorageProxyMBean
MessagingService.instance().sendOneWay(message, target);
}
- private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos)
+ private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime)
throws WriteTimeoutException
{
- PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos);
+ PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos, queryStartNanoTime);
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
for (InetAddress target : endpoints)
MessagingService.instance().sendRR(message, target, callback);
@@ -492,10 +493,10 @@ public class StorageProxy implements StorageProxyMBean
return callback;
}
- private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel)
+ private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws WriteTimeoutException
{
- ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel);
+ ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime);
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
for (InetAddress target : endpoints)
MessagingService.instance().sendRR(message, target, callback);
@@ -511,7 +512,7 @@ public class StorageProxy implements StorageProxyMBean
return false;
}
- private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint) throws WriteTimeoutException
+ private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint, long queryStartNanoTime) throws WriteTimeoutException
{
boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
@@ -524,7 +525,7 @@ public class StorageProxy implements StorageProxyMBean
if (shouldBlock)
{
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
- responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
+ responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime);
}
MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
@@ -597,8 +598,9 @@ public class StorageProxy implements StorageProxyMBean
*
* @param mutations the mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
+ * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
*/
- public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level)
+ public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime)
throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
{
Tracing.trace("Determining replicas for mutation");
@@ -613,12 +615,12 @@ public class StorageProxy implements StorageProxyMBean
{
if (mutation instanceof CounterMutation)
{
- responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
+ responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
}
else
{
WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH;
- responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt));
+ responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt, queryStartNanoTime));
}
}
@@ -728,8 +730,9 @@ public class StorageProxy implements StorageProxyMBean
* @param mutations the mutations to be applied across the replicas
* @param writeCommitLog if commitlog should be written
* @param baseComplete time from epoch in ms that the local base mutation was(or will be) completed
+ * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
*/
- public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete)
+ public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete, long queryStartNanoTime)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for mutation");
@@ -791,7 +794,8 @@ public class StorageProxy implements StorageProxyMBean
Collections.singletonList(pairedEndpoint.get()),
baseComplete,
WriteType.BATCH,
- cleanup));
+ cleanup,
+ queryStartNanoTime));
}
}
else
@@ -834,7 +838,8 @@ public class StorageProxy implements StorageProxyMBean
@SuppressWarnings("unchecked")
public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
ConsistencyLevel consistencyLevel,
- boolean mutateAtomically)
+ boolean mutateAtomically,
+ long queryStartNanoTime)
throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException
{
Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
@@ -844,13 +849,13 @@ public class StorageProxy implements StorageProxyMBean
.updatesAffectView(mutations, true);
if (augmented != null)
- mutateAtomically(augmented, consistencyLevel, updatesView);
+ mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime);
else
{
if (mutateAtomically || updatesView)
- mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView);
+ mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView, queryStartNanoTime);
else
- mutate(mutations, consistencyLevel);
+ mutate(mutations, consistencyLevel, queryStartNanoTime);
}
}
@@ -863,10 +868,12 @@ public class StorageProxy implements StorageProxyMBean
* @param mutations the Mutations to be applied across the replicas
* @param consistency_level the consistency level for the operation
* @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog
+ * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
*/
public static void mutateAtomically(Collection<Mutation> mutations,
ConsistencyLevel consistency_level,
- boolean requireQuorumForRemove)
+ boolean requireQuorumForRemove,
+ long queryStartNanoTime)
throws UnavailableException, OverloadedException, WriteTimeoutException
{
Tracing.trace("Determining replicas for atomic batch");
@@ -894,7 +901,7 @@ public class StorageProxy implements StorageProxyMBean
final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
final UUID batchUUID = UUIDGen.getTimeUUID();
BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
- () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+ () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID, queryStartNanoTime));
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
@@ -903,14 +910,15 @@ public class StorageProxy implements StorageProxyMBean
consistency_level,
batchConsistencyLevel,
WriteType.BATCH,
- cleanup);
+ cleanup,
+ queryStartNanoTime);
// exit early if we can't fulfill the CL at this time.
wrapper.handler.assureSufficientLiveNodes();
wrappers.add(wrapper);
}
// write to the batchlog
- syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID);
+ syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime);
// now actually perform the writes and wait for them to complete
syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION);
@@ -950,7 +958,7 @@ public class StorageProxy implements StorageProxyMBean
return replica.equals(FBUtilities.getBroadcastAddress());
}
- private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
+ private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
throws WriteTimeoutException, WriteFailureException
{
WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
@@ -958,7 +966,8 @@ public class StorageProxy implements StorageProxyMBean
endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
Keyspace.open(SystemKeyspace.NAME),
null,
- WriteType.BATCH_LOG);
+ WriteType.BATCH_LOG,
+ queryStartNanoTime);
Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
@@ -987,13 +996,13 @@ public class StorageProxy implements StorageProxyMBean
}
}
- private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
+ private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
{
if (!endpoints.current.isEmpty())
asyncRemoveFromBatchlog(endpoints.current, uuid);
if (!endpoints.legacy.isEmpty())
- LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
+ LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid, queryStartNanoTime);
}
private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
@@ -1054,14 +1063,15 @@ public class StorageProxy implements StorageProxyMBean
* given the list of write endpoints (either standardWritePerformer for
* standard writes or counterWritePerformer for counter writes).
* @param callback an optional callback to be run if and when the write is
- * successful.
+ * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
*/
public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation,
- ConsistencyLevel consistency_level,
- String localDataCenter,
- WritePerformer performer,
- Runnable callback,
- WriteType writeType)
+ ConsistencyLevel consistency_level,
+ String localDataCenter,
+ WritePerformer performer,
+ Runnable callback,
+ WriteType writeType,
+ long queryStartNanoTime)
throws UnavailableException, OverloadedException
{
String keyspaceName = mutation.getKeyspaceName();
@@ -1071,7 +1081,7 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
- AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
+ AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime);
// exit early if we can't fulfill the CL at this time
responseHandler.assureSufficientLiveNodes();
@@ -1085,7 +1095,8 @@ public class StorageProxy implements StorageProxyMBean
ConsistencyLevel consistency_level,
ConsistencyLevel batchConsistencyLevel,
WriteType writeType,
- BatchlogResponseHandler.BatchlogCleanup cleanup)
+ BatchlogResponseHandler.BatchlogCleanup cleanup,
+ long queryStartNanoTime)
{
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -1093,8 +1104,8 @@ public class StorageProxy implements StorageProxyMBean
Token tk = mutation.key().getToken();
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
- AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
- BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+ AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType, queryStartNanoTime);
+ BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
@@ -1108,7 +1119,8 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddress> naturalEndpoints,
AtomicLong baseComplete,
WriteType writeType,
- BatchlogResponseHandler.BatchlogCleanup cleanup)
+ BatchlogResponseHandler.BatchlogCleanup cleanup,
+ long queryStartNanoTime)
{
Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -1118,8 +1130,8 @@ public class StorageProxy implements StorageProxyMBean
AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, () -> {
long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
- }, writeType);
- BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+ }, writeType, queryStartNanoTime);
+ BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
return new WriteResponseHandlerWrapper(batchHandler, mutation);
}
@@ -1400,13 +1412,13 @@ public class StorageProxy implements StorageProxyMBean
* quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
* the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
*/
- public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
+ public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException
{
InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
if (endpoint.equals(FBUtilities.getBroadcastAddress()))
{
- return applyCounterMutationOnCoordinator(cm, localDataCenter);
+ return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime);
}
else
{
@@ -1417,10 +1429,10 @@ public class StorageProxy implements StorageProxyMBean
List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
- rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes();
+ rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes();
// Forward the actual update to the chosen leader replica
- AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER);
+ AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER, queryStartNanoTime);
Tracing.trace("Enqueuing counter update to {}", endpoint);
MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler, false);
@@ -1467,18 +1479,18 @@ public class StorageProxy implements StorageProxyMBean
// Must be called on a replica of the mutation. This replica becomes the
// leader of this mutation.
- public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback)
+ public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback, long queryStartNanoTime)
throws UnavailableException, OverloadedException
{
- return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER);
+ return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER, queryStartNanoTime);
}
// Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
// applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
- public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
+ public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter, long queryStartNanoTime)
throws UnavailableException, OverloadedException
{
- return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER);
+ return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER, queryStartNanoTime);
}
private static Runnable counterWriteTask(final IMutation mutation,
@@ -1512,31 +1524,31 @@ public class StorageProxy implements StorageProxyMBean
return true;
}
- public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel)
+ public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
- return readOne(command, consistencyLevel, null);
+ return readOne(command, consistencyLevel, null, queryStartNanoTime);
}
- public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state)
+ public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
- return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state), command);
+ return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state, queryStartNanoTime), command);
}
- public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
+ public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
// When using serial CL, the ClientState should be provided
assert !consistencyLevel.isSerialConsistency();
- return read(group, consistencyLevel, null);
+ return read(group, consistencyLevel, null, queryStartNanoTime);
}
/**
* Performs the actual reading of a row out of the StorageService, fetching
* a specific set of column names from a given column family.
*/
- public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
+ public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
{
if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands))
@@ -1547,11 +1559,11 @@ public class StorageProxy implements StorageProxyMBean
}
return consistencyLevel.isSerialConsistency()
- ? readWithPaxos(group, consistencyLevel, state)
- : readRegular(group, consistencyLevel);
+ ? readWithPaxos(group, consistencyLevel, state, queryStartNanoTime)
+ : readRegular(group, consistencyLevel, queryStartNanoTime);
}
- private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
+ private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
{
assert state != null;
@@ -1591,7 +1603,7 @@ public class StorageProxy implements StorageProxyMBean
throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false);
}
- result = fetchRows(group.commands, consistencyForCommitOrFetch);
+ result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime);
}
catch (UnavailableException e)
{
@@ -1627,13 +1639,13 @@ public class StorageProxy implements StorageProxyMBean
}
@SuppressWarnings("resource")
- private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
+ private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws UnavailableException, ReadFailureException, ReadTimeoutException
{
long start = System.nanoTime();
try
{
- PartitionIterator result = fetchRows(group.commands, consistencyLevel);
+ PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime);
// If we have more than one command, then despite each read command honoring the limit, the total result
// might not honor it and so we should enforce it
if (group.commands.size() > 1)
@@ -1680,14 +1692,14 @@ public class StorageProxy implements StorageProxyMBean
* 4. If the digests (if any) match the data return the data
* 5. else carry out read repair by getting data from all the nodes.
*/
- private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel)
+ private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
throws UnavailableException, ReadFailureException, ReadTimeoutException
{
int cmdCount = commands.size();
SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
for (int i = 0; i < cmdCount; i++)
- reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel);
+ reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel, queryStartNanoTime);
for (int i = 0; i < cmdCount; i++)
reads[i].doInitialQueries();
@@ -1717,15 +1729,17 @@ public class StorageProxy implements StorageProxyMBean
private final SinglePartitionReadCommand command;
private final AbstractReadExecutor executor;
private final ConsistencyLevel consistency;
+ private final long queryStartNanoTime;
private PartitionIterator result;
private ReadCallback repairHandler;
- SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency)
+ SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency, long queryStartNanoTime)
{
this.command = command;
- this.executor = AbstractReadExecutor.getReadExecutor(command, consistency);
+ this.executor = AbstractReadExecutor.getReadExecutor(command, consistency, queryStartNanoTime);
this.consistency = consistency;
+ this.queryStartNanoTime = queryStartNanoTime;
}
boolean isDone()
@@ -1757,13 +1771,14 @@ public class StorageProxy implements StorageProxyMBean
// Do a full data read to resolve the correct response (and repair node that need be)
Keyspace keyspace = Keyspace.open(command.metadata().ksName);
- DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size());
+ DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size(), queryStartNanoTime);
repairHandler = new ReadCallback(resolver,
ConsistencyLevel.ALL,
executor.getContactedReplicas().size(),
command,
keyspace,
- executor.handler.endpoints);
+ executor.handler.endpoints,
+ queryStartNanoTime);
for (InetAddress endpoint : executor.getContactedReplicas())
{
@@ -2052,6 +2067,7 @@ public class StorageProxy implements StorageProxyMBean
private final ConsistencyLevel consistency;
private final long startTime;
+ private final long queryStartNanoTime;
private DataLimits.Counter counter;
private PartitionIterator sentQueryIterator;
@@ -2061,7 +2077,7 @@ public class StorageProxy implements StorageProxyMBean
private int liveReturned;
private int rangesQueried;
- public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+ public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)
{
this.command = command;
this.concurrencyFactor = concurrencyFactor;
@@ -2070,6 +2086,7 @@ public class StorageProxy implements StorageProxyMBean
this.totalRangeCount = ranges.rangeCount();
this.consistency = consistency;
this.keyspace = keyspace;
+ this.queryStartNanoTime = queryStartNanoTime;
}
public RowIterator computeNext()
@@ -2145,12 +2162,12 @@ public class StorageProxy implements StorageProxyMBean
{
PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst);
- DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
+ DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime);
int blockFor = consistency.blockFor(keyspace);
int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
- ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
+ ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime);
handler.assureSufficientLiveNodes();
@@ -2204,7 +2221,7 @@ public class StorageProxy implements StorageProxyMBean
}
@SuppressWarnings("resource")
- public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
+ public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
{
Tracing.trace("Computing ranges to query");
@@ -2225,7 +2242,7 @@ public class StorageProxy implements StorageProxyMBean
// Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
- return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
+ return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)), command.nowInSec());
}
public Map<String, List<String>> getSchemaVersions()
@@ -2485,9 +2502,9 @@ public class StorageProxy implements StorageProxyMBean
*/
private static class ViewWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
{
- public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
+ public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
{
- super(writeHandler, i, cleanup);
+ super(writeHandler, i, cleanup, queryStartNanoTime);
viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 1dc03e0..46e4e93 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -47,20 +47,21 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
ConsistencyLevel consistencyLevel,
Keyspace keyspace,
Runnable callback,
- WriteType writeType)
+ WriteType writeType,
+ long queryStartNanoTime)
{
- super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+ super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime);
responses = totalBlockFor();
}
- public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
+ public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime)
{
- this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType);
+ this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime);
}
- public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
+ public WriteResponseHandler(InetAddress endpoint, WriteType writeType, long queryStartNanoTime)
{
- this(endpoint, writeType, null);
+ this(endpoint, writeType, null, queryStartNanoTime);
}
public void response(MessageIn<T> m)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 01a56c4..d9b3632 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -55,7 +55,7 @@ abstract class AbstractQueryPager implements QueryPager
return command.executionController();
}
- public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState)
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime)
{
if (isExhausted())
return EmptyIterators.partition();
@@ -63,7 +63,7 @@ abstract class AbstractQueryPager implements QueryPager
pageSize = Math.min(pageSize, remaining);
Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
- return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
+ return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
}
public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
index 5483d15..f9a8cda 100644
--- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
@@ -52,12 +52,13 @@ public final class AggregationQueryPager implements QueryPager
@Override
public PartitionIterator fetchPage(int pageSize,
ConsistencyLevel consistency,
- ClientState clientState)
+ ClientState clientState,
+ long queryStartNanoTime)
{
if (limits.isGroupByLimit())
- return new GroupByPartitionIterator(pageSize, consistency, clientState);
+ return new GroupByPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime);
- return new AggregationPartitionIterator(pageSize, consistency, clientState);
+ return new AggregationPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime);
}
@Override
@@ -70,9 +71,9 @@ public final class AggregationQueryPager implements QueryPager
public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
{
if (limits.isGroupByLimit())
- return new GroupByPartitionIterator(pageSize, executionController);
+ return new GroupByPartitionIterator(pageSize, executionController, System.nanoTime());
- return new AggregationPartitionIterator(pageSize, executionController);
+ return new AggregationPartitionIterator(pageSize, executionController, System.nanoTime());
}
@Override
@@ -152,28 +153,34 @@ public final class AggregationQueryPager implements QueryPager
*/
private int initialMaxRemaining;
+ private long queryStartNanoTime;
+
public GroupByPartitionIterator(int pageSize,
ConsistencyLevel consistency,
- ClientState clientState)
+ ClientState clientState,
+ long queryStartNanoTime)
{
- this(pageSize, consistency, clientState, null);
+ this(pageSize, consistency, clientState, null, queryStartNanoTime);
}
public GroupByPartitionIterator(int pageSize,
- ReadExecutionController executionController)
+ ReadExecutionController executionController,
+ long queryStartNanoTime)
{
- this(pageSize, null, null, executionController);
+ this(pageSize, null, null, executionController, queryStartNanoTime);
}
private GroupByPartitionIterator(int pageSize,
ConsistencyLevel consistency,
ClientState clientState,
- ReadExecutionController executionController)
+ ReadExecutionController executionController,
+ long queryStartNanoTime)
{
this.pageSize = handlePagingOff(pageSize);
this.consistency = consistency;
this.clientState = clientState;
this.executionController = executionController;
+ this.queryStartNanoTime = queryStartNanoTime;
}
private int handlePagingOff(int pageSize)
@@ -280,7 +287,7 @@ public final class AggregationQueryPager implements QueryPager
*/
private final PartitionIterator fetchSubPage(int subPageSize)
{
- return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState)
+ return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState, queryStartNanoTime)
: subPager.fetchPageInternal(subPageSize, executionController);
}
@@ -393,15 +400,17 @@ public final class AggregationQueryPager implements QueryPager
{
public AggregationPartitionIterator(int pageSize,
ConsistencyLevel consistency,
- ClientState clientState)
+ ClientState clientState,
+ long queryStartNanoTime)
{
- super(pageSize, consistency, clientState);
+ super(pageSize, consistency, clientState, queryStartNanoTime);
}
public AggregationPartitionIterator(int pageSize,
- ReadExecutionController executionController)
+ ReadExecutionController executionController,
+ long queryStartNanoTime)
{
- super(pageSize, executionController);
+ super(pageSize, executionController, queryStartNanoTime);
}
@Override
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 9670f28..75cc71f 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -142,17 +142,17 @@ public class MultiPartitionPager implements QueryPager
}
@SuppressWarnings("resource") // iter closed via countingIter
- public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
{
int toQuery = Math.min(remaining, pageSize);
- return new PagersIterator(toQuery, consistency, clientState, null);
+ return new PagersIterator(toQuery, consistency, clientState, null, queryStartNanoTime);
}
@SuppressWarnings("resource") // iter closed via countingIter
public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
{
int toQuery = Math.min(remaining, pageSize);
- return new PagersIterator(toQuery, null, null, executionController);
+ return new PagersIterator(toQuery, null, null, executionController, System.nanoTime());
}
private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
@@ -160,6 +160,7 @@ public class MultiPartitionPager implements QueryPager
private final int pageSize;
private PartitionIterator result;
private boolean closed;
+ private final long queryStartNanoTime;
// For "normal" queries
private final ConsistencyLevel consistency;
@@ -171,12 +172,13 @@ public class MultiPartitionPager implements QueryPager
private int pagerMaxRemaining;
private int counted;
- public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController)
+ public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController, long queryStartNanoTime)
{
this.pageSize = pageSize;
this.consistency = consistency;
this.clientState = clientState;
this.executionController = executionController;
+ this.queryStartNanoTime = queryStartNanoTime;
}
protected RowIterator computeNext()
@@ -205,7 +207,7 @@ public class MultiPartitionPager implements QueryPager
int toQuery = pageSize - counted;
result = consistency == null
? pagers[current].fetchPageInternal(toQuery, executionController)
- : pagers[current].fetchPage(toQuery, consistency, clientState);
+ : pagers[current].fetchPage(toQuery, consistency, clientState, queryStartNanoTime);
}
return result.next();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index edd2a55..5d23997 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -54,7 +54,7 @@ public interface QueryPager
return ReadExecutionController.empty();
}
- public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
{
return EmptyIterators.partition();
}
@@ -94,7 +94,7 @@ public interface QueryPager
* {@code consistency} is a serial consistency.
* @return the page of result.
*/
- public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException;
+ public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException;
/**
* Starts a new read operation.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 02b5de2..7fb4e70 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -45,7 +45,8 @@ public class QueryPagers
ClientState state,
final int pageSize,
int nowInSec,
- boolean isForThrift) throws RequestValidationException, RequestExecutionException
+ boolean isForThrift,
+ long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
{
SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION);
@@ -53,7 +54,7 @@ public class QueryPagers
int count = 0;
while (!pager.isExhausted())
{
- try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state))
+ try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime))
{
DataLimits.Counter counter = limits.newCounter(nowInSec, true);
PartitionIterators.consume(counter.applyTo(iter));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
index 37defde..90bfc5d 100644
--- a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
@@ -35,12 +35,14 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
protected final CountDownLatch latch;
protected final int targets;
private final ConsistencyLevel consistency;
+ private final long queryStartNanoTime;
- public AbstractPaxosCallback(int targets, ConsistencyLevel consistency)
+ public AbstractPaxosCallback(int targets, ConsistencyLevel consistency, long queryStartNanoTime)
{
this.targets = targets;
this.consistency = consistency;
latch = new CountDownLatch(targets);
+ this.queryStartNanoTime = queryStartNanoTime;
}
public boolean isLatencyForSnitch()
@@ -57,7 +59,8 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
{
try
{
- if (!latch.await(DatabaseDescriptor.getWriteRpcTimeout(), TimeUnit.MILLISECONDS))
+ long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout()) - (System.nanoTime() - queryStartNanoTime);
+ if (!latch.await(timeout, TimeUnit.NANOSECONDS))
throw new WriteTimeoutException(WriteType.CAS, consistency, getResponseCount(), targets);
}
catch (InterruptedException ex)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 544403a..5915eab 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -49,9 +49,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
- public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency)
+ public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
{
- super(targets, consistency);
+ super(targets, consistency, queryStartNanoTime);
// need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
mostRecentCommit = Commit.emptyCommit(key, metadata);
mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
index b0bd163..c9cb1f0 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
@@ -50,9 +50,9 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean>
private final int requiredAccepts;
private final boolean failFast;
- public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency)
+ public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency, long queryStartNanoTime)
{
- super(totalTargets, consistency);
+ super(totalTargets, consistency, queryStartNanoTime);
this.requiredAccepts = requiredTargets;
this.failFast = failFast;
}