You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2016/08/16 19:35:53 UTC

[1/3] cassandra git commit: Count entire coordinated request against timeout

Repository: cassandra
Updated Branches:
  refs/heads/trunk e83f9e69e -> aa83c942a


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index a189000..910f334 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -85,7 +85,7 @@ public class CassandraServer implements Cassandra.Iface
         return ThriftSessionManager.instance.currentSession();
     }
 
-    protected PartitionIterator read(List<SinglePartitionReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState)
+    protected PartitionIterator read(List<SinglePartitionReadCommand> commands, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState, long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
         try
@@ -93,7 +93,7 @@ public class CassandraServer implements Cassandra.Iface
             schedule(DatabaseDescriptor.getReadRpcTimeout());
             try
             {
-                return StorageProxy.read(new SinglePartitionReadCommand.Group(commands, DataLimits.NONE), consistency_level, cState);
+                return StorageProxy.read(new SinglePartitionReadCommand.Group(commands, DataLimits.NONE), consistency_level, cState, queryStartNanoTime);
             }
             finally
             {
@@ -257,10 +257,10 @@ public class CassandraServer implements Cassandra.Iface
              : result;
     }
 
-    private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<SinglePartitionReadCommand> commands, boolean subColumnsOnly, int cellLimit, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState)
+    private Map<ByteBuffer, List<ColumnOrSuperColumn>> getSlice(List<SinglePartitionReadCommand> commands, boolean subColumnsOnly, int cellLimit, org.apache.cassandra.db.ConsistencyLevel consistency_level, ClientState cState, long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
-        try (PartitionIterator results = read(commands, consistency_level, cState))
+        try (PartitionIterator results = read(commands, consistency_level, cState, queryStartNanoTime))
         {
             Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = new HashMap<>();
             while (results.hasNext())
@@ -278,6 +278,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -296,7 +297,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             state().hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
-            List<ColumnOrSuperColumn> result = getSliceInternal(keyspace, key, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState);
+            List<ColumnOrSuperColumn> result = getSliceInternal(keyspace, key, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState, queryStartNanoTime);
             return result == null ? Collections.<ColumnOrSuperColumn>emptyList() : result;
         }
         catch (RequestValidationException e)
@@ -315,15 +316,17 @@ public class CassandraServer implements Cassandra.Iface
                                                        int nowInSec,
                                                        SlicePredicate predicate,
                                                        ConsistencyLevel consistency_level,
-                                                       ClientState cState)
+                                                       ClientState cState,
+                                                       long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
-        return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, nowInSec, predicate, consistency_level, cState).get(key);
+        return multigetSliceInternal(keyspace, Collections.singletonList(key), column_parent, nowInSec, predicate, consistency_level, cState, queryStartNanoTime).get(key);
     }
 
     public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             List<String> keysList = Lists.newArrayList();
@@ -345,7 +348,7 @@ public class CassandraServer implements Cassandra.Iface
             ClientState cState = state();
             String keyspace = cState.getKeyspace();
             cState.hasColumnFamilyAccess(keyspace, column_parent.column_family, Permission.SELECT);
-            return multigetSliceInternal(keyspace, keys, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState);
+            return multigetSliceInternal(keyspace, keys, column_parent, FBUtilities.nowInSeconds(), predicate, consistency_level, cState, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -541,7 +544,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                              int nowInSec,
                                                                              SlicePredicate predicate,
                                                                              ConsistencyLevel consistency_level,
-                                                                             ClientState cState)
+                                                                             ClientState cState,
+                                                                             long queryStartNanoTime)
     throws org.apache.cassandra.exceptions.InvalidRequestException, UnavailableException, TimedOutException
     {
         CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
@@ -563,12 +567,13 @@ public class CassandraServer implements Cassandra.Iface
             commands.add(SinglePartitionReadCommand.create(true, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, dk, filter));
         }
 
-        return getSlice(commands, column_parent.isSetSuper_column(), limits.perPartitionCount(), consistencyLevel, cState);
+        return getSlice(commands, column_parent.isSetSuper_column(), limits.perPartitionCount(), consistencyLevel, cState, queryStartNanoTime);
     }
 
     public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -643,7 +648,7 @@ public class CassandraServer implements Cassandra.Iface
             DecoratedKey dk = metadata.decorateKey(key);
             SinglePartitionReadCommand command = SinglePartitionReadCommand.create(true, metadata, FBUtilities.nowInSeconds(), columns, RowFilter.NONE, DataLimits.NONE, dk, filter);
 
-            try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState), command))
+            try (RowIterator result = PartitionIterators.getOnlyElement(read(Arrays.asList(command), consistencyLevel, cState, queryStartNanoTime), command))
             {
                 if (!result.hasNext())
                     throw new NotFoundException();
@@ -672,6 +677,7 @@ public class CassandraServer implements Cassandra.Iface
     public int get_count(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -695,7 +701,7 @@ public class CassandraServer implements Cassandra.Iface
             int nowInSec = FBUtilities.nowInSeconds();
 
             if (predicate.column_names != null)
-                return getSliceInternal(keyspace, key, column_parent, nowInSec, predicate, consistency_level, cState).size();
+                return getSliceInternal(keyspace, key, column_parent, nowInSec, predicate, consistency_level, cState, queryStartNanoTime).size();
 
             int pageSize;
             // request by page if this is a large row
@@ -742,7 +748,8 @@ public class CassandraServer implements Cassandra.Iface
                                           cState,
                                           pageSize,
                                           nowInSec,
-                                          true);
+                                          true,
+                                          queryStartNanoTime);
         }
         catch (IllegalArgumentException e)
         {
@@ -766,6 +773,7 @@ public class CassandraServer implements Cassandra.Iface
     public Map<ByteBuffer, Integer> multiget_count(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             List<String> keysList = Lists.newArrayList();
@@ -797,7 +805,8 @@ public class CassandraServer implements Cassandra.Iface
                                                                                                  FBUtilities.nowInSeconds(),
                                                                                                  predicate,
                                                                                                  consistency_level,
-                                                                                                 cState);
+                                                                                                 cState,
+                                                                                                 queryStartNanoTime);
 
             for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet())
                 counts.put(cf.getKey(), cf.getValue().size());
@@ -833,7 +842,7 @@ public class CassandraServer implements Cassandra.Iface
         return column.ttl;
     }
 
-    private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
+    private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level, long queryStartNanoTime)
     throws RequestValidationException, UnavailableException, TimedOutException
     {
         ThriftClientState cState = state();
@@ -870,12 +879,13 @@ public class CassandraServer implements Cassandra.Iface
         {
             throw new org.apache.cassandra.exceptions.InvalidRequestException(e.getMessage());
         }
-        doInsert(consistency_level, Collections.singletonList(mutation));
+        doInsert(consistency_level, Collections.singletonList(mutation), queryStartNanoTime);
     }
 
     public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -891,7 +901,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_insert(key, column_parent, column, consistency_level);
+            internal_insert(key, column_parent, column, consistency_level, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -911,6 +921,7 @@ public class CassandraServer implements Cassandra.Iface
                          ConsistencyLevel commit_consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             ImmutableMap.Builder<String,String> builder = ImmutableMap.builder();
@@ -964,7 +975,8 @@ public class CassandraServer implements Cassandra.Iface
                                                        new ThriftCASRequest(toLegacyCells(metadata, expected, nowInSec), partitionUpdates, nowInSec),
                                                        ThriftConversion.fromThrift(serial_consistency_level),
                                                        ThriftConversion.fromThrift(commit_consistency_level),
-                                                       cState))
+                                                       cState,
+                                                       queryStartNanoTime))
             {
                 return result == null
                      ? new CASResult(true)
@@ -1276,6 +1288,7 @@ public class CassandraServer implements Cassandra.Iface
     public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = Maps.newLinkedHashMap();
@@ -1294,7 +1307,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true));
+            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, true), queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -1309,6 +1322,7 @@ public class CassandraServer implements Cassandra.Iface
     public void atomic_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = Maps.newLinkedHashMap();
@@ -1327,7 +1341,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true);
+            doInsert(consistency_level, createMutationList(consistency_level, mutation_map, false), true, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -1339,7 +1353,7 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp)
+    private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp, long queryStartNanoTime)
     throws RequestValidationException, UnavailableException, TimedOutException
     {
         ThriftClientState cState = state();
@@ -1386,14 +1400,15 @@ public class CassandraServer implements Cassandra.Iface
         org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
 
         if (isCommutativeOp)
-            doInsert(consistency_level, Collections.singletonList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
+            doInsert(consistency_level, Collections.singletonList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))), queryStartNanoTime);
         else
-            doInsert(consistency_level, Collections.singletonList(mutation));
+            doInsert(consistency_level, Collections.singletonList(mutation), queryStartNanoTime);
     }
 
     public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -1409,7 +1424,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_remove(key, column_path, timestamp, consistency_level, false);
+            internal_remove(key, column_path, timestamp, consistency_level, false, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -1421,13 +1436,13 @@ public class CassandraServer implements Cassandra.Iface
         }
     }
 
-    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations)
+    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, long queryStartNanoTime)
     throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
     {
-        doInsert(consistency_level, mutations, false);
+        doInsert(consistency_level, mutations, false, queryStartNanoTime);
     }
 
-    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically)
+    private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations, boolean mutateAtomically, long queryStartNanoTime)
     throws UnavailableException, TimedOutException, org.apache.cassandra.exceptions.InvalidRequestException
     {
         org.apache.cassandra.db.ConsistencyLevel consistencyLevel = ThriftConversion.fromThrift(consistency_level);
@@ -1442,7 +1457,7 @@ public class CassandraServer implements Cassandra.Iface
         schedule(timeout);
         try
         {
-            StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically);
+            StorageProxy.mutateWithTriggers(mutations, consistencyLevel, mutateAtomically, queryStartNanoTime);
         }
         catch (RequestExecutionException e)
         {
@@ -1480,6 +1495,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of(
@@ -1541,7 +1557,7 @@ public class CassandraServer implements Cassandra.Iface
                                                                               limits,
                                                                               new DataRange(bounds, filter),
                                                                               Optional.empty());
-                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
+                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
                 {
                     assert results != null;
                     return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
@@ -1569,6 +1585,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<KeySlice> get_paged_slice(String column_family, KeyRange range, ByteBuffer start_column, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("column_family", column_family,
@@ -1635,7 +1652,7 @@ public class CassandraServer implements Cassandra.Iface
                                                                               limits,
                                                                               new DataRange(bounds, filter).forPaging(bounds, metadata.comparator, pageFrom, true),
                                                                               Optional.empty());
-                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
+                try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
                 {
                     return thriftifyKeySlices(results, new ColumnParent(column_family), limits.perPartitionCount());
                 }
@@ -1684,6 +1701,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("column_parent", column_parent.toString(),
@@ -1734,7 +1752,7 @@ public class CassandraServer implements Cassandra.Iface
             // further lookups.
             cmd.maybeValidateIndex();
 
-            try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel))
+            try (PartitionIterator results = StorageProxy.getRangeSlice(cmd, consistencyLevel, queryStartNanoTime))
             {
                 return thriftifyKeySlices(results, column_parent, limits.perPartitionCount());
             }
@@ -2139,6 +2157,7 @@ public class CassandraServer implements Cassandra.Iface
     public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)
             throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("column_parent", column_parent.toString(),
@@ -2183,7 +2202,7 @@ public class CassandraServer implements Cassandra.Iface
                 PartitionUpdate update = PartitionUpdate.singleRowUpdate(metadata, key, BTreeRow.singleCellRow(name.clustering, cell));
 
                 org.apache.cassandra.db.Mutation mutation = new org.apache.cassandra.db.Mutation(update);
-                doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))));
+                doInsert(consistency_level, Arrays.asList(new CounterMutation(mutation, ThriftConversion.fromThrift(consistency_level))), queryStartNanoTime);
             }
             catch (MarshalException|UnknownColumnException e)
             {
@@ -2203,6 +2222,7 @@ public class CassandraServer implements Cassandra.Iface
     public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)
     throws InvalidRequestException, UnavailableException, TimedOutException, TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
@@ -2217,7 +2237,7 @@ public class CassandraServer implements Cassandra.Iface
 
         try
         {
-            internal_remove(key, path, FBUtilities.timestampMicros(), consistency_level, true);
+            internal_remove(key, path, FBUtilities.timestampMicros(), consistency_level, true, queryStartNanoTime);
         }
         catch (RequestValidationException e)
         {
@@ -2296,6 +2316,7 @@ public class CassandraServer implements Cassandra.Iface
     {
         try
         {
+            long queryStartNanoTime = System.nanoTime();
             String queryString = uncompress(query, compression);
             if (startSessionIfRequested())
             {
@@ -2313,7 +2334,8 @@ public class CassandraServer implements Cassandra.Iface
                                                             cState.getQueryState(),
                                                             QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel),
                                                             Collections.<ByteBuffer>emptyList()),
-                                                            null).toThriftResult();
+                                                            null,
+                                                            queryStartNanoTime).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -2359,6 +2381,7 @@ public class CassandraServer implements Cassandra.Iface
 
     public CqlResult execute_prepared_cql3_query(int itemId, List<ByteBuffer> bindVariables, ConsistencyLevel cLevel) throws TException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             // TODO we don't have [typed] access to CQL bind variables here.  CASSANDRA-4560 is open to add support.
@@ -2384,7 +2407,8 @@ public class CassandraServer implements Cassandra.Iface
             return ClientState.getCQLQueryHandler().processPrepared(prepared.statement,
                                                                     cState.getQueryState(),
                                                                     QueryOptions.fromThrift(ThriftConversion.fromThrift(cLevel), bindVariables),
-                                                                    null).toThriftResult();
+                                                                    null,
+                                                                    queryStartNanoTime).toThriftResult();
         }
         catch (RequestExecutionException e)
         {
@@ -2404,6 +2428,7 @@ public class CassandraServer implements Cassandra.Iface
     public List<ColumnOrSuperColumn> get_multi_slice(MultiSliceRequest request)
             throws InvalidRequestException, UnavailableException, TimedOutException
     {
+        long queryStartNanoTime = System.nanoTime();
         if (startSessionIfRequested())
         {
             Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(request.key),
@@ -2457,7 +2482,8 @@ public class CassandraServer implements Cassandra.Iface
                             false,
                             limits.perPartitionCount(),
                             consistencyLevel,
-                            cState).entrySet().iterator().next().getValue();
+                            cState,
+                            queryStartNanoTime).entrySet().iterator().next().getValue();
         }
         catch (RequestValidationException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
index e20f99b..fe78e64 100644
--- a/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
+++ b/src/java/org/apache/cassandra/tracing/TraceStateImpl.java
@@ -116,7 +116,7 @@ public class TraceStateImpl extends TraceState
     {
         try
         {
-            StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY);
+            StorageProxy.mutate(Collections.singletonList(mutation), ConsistencyLevel.ANY, System.nanoTime());
         }
         catch (OverloadedException e)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/Message.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 66e0014..2463306 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -210,7 +210,7 @@ public abstract class Message
                 throw new IllegalArgumentException();
         }
 
-        public abstract Response execute(QueryState queryState);
+        public abstract Response execute(QueryState queryState, long queryStartNanoTime);
 
         public void setTracingRequested()
         {
@@ -501,6 +501,7 @@ public abstract class Message
 
             final Response response;
             final ServerConnection connection;
+            long queryStartNanoTime = System.nanoTime();
 
             try
             {
@@ -512,7 +513,7 @@ public abstract class Message
                 QueryState qstate = connection.validateNewMessage(request.type, connection.getVersion(), request.getStreamId());
 
                 logger.trace("Received: {}, v={}", request, connection.getVersion());
-                response = request.execute(qstate);
+                response = request.execute(qstate, queryStartNanoTime);
                 response.setStreamId(request.getStreamId());
                 response.setWarnings(ClientWarn.instance.getWarnings());
                 response.attach(connection);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
index 8b3e866..e90f740 100644
--- a/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
+++ b/src/java/org/apache/cassandra/transport/messages/AuthResponse.java
@@ -68,7 +68,7 @@ public class AuthResponse extends Message.Request
     }
 
     @Override
-    public Response execute(QueryState queryState)
+    public Response execute(QueryState queryState, long queryStartNanoTime)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
index 9d1047f..6675565 100644
--- a/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/BatchMessage.java
@@ -147,7 +147,7 @@ public class BatchMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {
@@ -214,7 +214,7 @@ public class BatchMessage extends Message.Request
             // Note: It's ok at this point to pass a bogus value for the number of bound terms in the BatchState ctor
             // (and no value would be really correct, so we prefer passing a clearly wrong one).
             BatchStatement batch = new BatchStatement(-1, batchType, statements, Attributes.none());
-            Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload());
+            Message.Response response = handler.processBatch(batch, state, batchOptions, getCustomPayload(), queryStartNanoTime);
 
             if (tracingId != null)
                 response.setTracingId(tracingId);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
index 4c51cce..4ecaffd 100644
--- a/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/CredentialsMessage.java
@@ -71,7 +71,7 @@ public class CredentialsMessage extends Message.Request
         this.credentials = credentials;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
index a5348a4..088f278 100644
--- a/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/ExecuteMessage.java
@@ -87,7 +87,7 @@ public class ExecuteMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {
@@ -143,7 +143,7 @@ public class ExecuteMessage extends Message.Request
             // Some custom QueryHandlers are interested by the bound names. We provide them this information
             // by wrapping the QueryOptions.
             QueryOptions queryOptions = QueryOptions.addColumnSpecifications(options, prepared.boundNames);
-            Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload());
+            Message.Response response = handler.processPrepared(statement, state, queryOptions, getCustomPayload(), queryStartNanoTime);
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
index 2f6e3da..4e95342 100644
--- a/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/OptionsMessage.java
@@ -56,7 +56,7 @@ public class OptionsMessage extends Message.Request
         super(Message.Type.OPTIONS);
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         List<String> cqlVersions = new ArrayList<String>();
         cqlVersions.add(QueryProcessor.CQL_VERSION.toString());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
index f54d1d9..f5192de 100644
--- a/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/PrepareMessage.java
@@ -58,7 +58,7 @@ public class PrepareMessage extends Message.Request
         this.query = query;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 3b48d52..2bd5efc 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -82,7 +82,7 @@ public class QueryMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         try
         {
@@ -112,7 +112,7 @@ public class QueryMessage extends Message.Request
                 Tracing.instance.begin("Execute CQL3 query", state.getClientAddress(), builder.build());
             }
 
-            Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload());
+            Message.Response response = ClientState.getCQLQueryHandler().process(query, state, options, getCustomPayload(), queryStartNanoTime);
             if (options.skipMetadata() && response instanceof ResultMessage.Rows)
                 ((ResultMessage.Rows)response).result.metadata.setSkipMetadata();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
index 928e676..c8e48b0 100644
--- a/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/RegisterMessage.java
@@ -62,7 +62,7 @@ public class RegisterMessage extends Message.Request
         this.eventTypes = eventTypes;
     }
 
-    public Response execute(QueryState state)
+    public Response execute(QueryState state, long queryStartNanoTime)
     {
         assert connection instanceof ServerConnection;
         Connection.Tracker tracker = connection.getTracker();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
index 04d8e62..8966aeb 100644
--- a/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/StartupMessage.java
@@ -62,7 +62,7 @@ public class StartupMessage extends Message.Request
         this.options = options;
     }
 
-    public Message.Response execute(QueryState state)
+    public Message.Response execute(QueryState state, long queryStartNanoTime)
     {
         String cqlVersion = options.get(CQL_VERSION);
         if (cqlVersion == null)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
index 17e426b..6a075c9 100644
--- a/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
+++ b/test/unit/org/apache/cassandra/cql3/PstmtPersistenceTest.java
@@ -125,6 +125,6 @@ public class PstmtPersistenceTest extends CQLTester
     {
         ParsedStatement.Prepared prepared = handler.getPrepared(stmtId);
         Assert.assertNotNull(prepared);
-        handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap());
+        handler.processPrepared(prepared.statement, QueryState.forInternalCalls(), options, Collections.emptyMap(), System.nanoTime());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/service/DataResolverTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/DataResolverTest.java b/test/unit/org/apache/cassandra/service/DataResolverTest.java
index 3fee5f9..156bd66 100644
--- a/test/unit/org/apache/cassandra/service/DataResolverTest.java
+++ b/test/unit/org/apache/cassandra/service/DataResolverTest.java
@@ -132,7 +132,7 @@ public class DataResolverTest
     @Test
     public void testResolveNewerSingleRow() throws UnknownHostException
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
                                                                                                        .add("c1", "v1")
@@ -161,7 +161,7 @@ public class DataResolverTest
     @Test
     public void testResolveDisjointSingleRow()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
                                                                                                        .add("c1", "v1")
@@ -196,7 +196,7 @@ public class DataResolverTest
     public void testResolveDisjointMultipleRows() throws UnknownHostException
     {
 
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
                                                                                                        .add("c1", "v1")
@@ -242,7 +242,7 @@ public class DataResolverTest
     @Test
     public void testResolveDisjointMultipleRowsWithRangeTombstones()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
 
         RangeTombstone tombstone1 = tombstone("1", "11", 1, nowInSec);
         RangeTombstone tombstone2 = tombstone("3", "31", 1, nowInSec);
@@ -322,7 +322,7 @@ public class DataResolverTest
     @Test
     public void testResolveWithOneEmpty()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 1L, dk).clustering("1")
                                                                                                        .add("c2", "v2")
@@ -349,7 +349,7 @@ public class DataResolverTest
     @Test
     public void testResolveWithBothEmpty()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false)));
         resolver.preprocess(readResponseMessage(peer(), EmptyIterators.unfilteredPartition(cfm, false)));
 
@@ -364,7 +364,7 @@ public class DataResolverTest
     @Test
     public void testResolveDeleted()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         // one response with columns timestamped before a delete in another response
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, iter(new RowUpdateBuilder(cfm, nowInSec, 0L, dk).clustering("1")
@@ -389,7 +389,7 @@ public class DataResolverTest
     @Test
     public void testResolveMultipleDeleted()
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 4, System.nanoTime());
         // deletes and columns with interleaved timestamp, with out of order return sequence
         InetAddress peer1 = peer();
         resolver.preprocess(readResponseMessage(peer1, fullPartitionDelete(cfm, dk, 0, nowInSec)));
@@ -471,7 +471,7 @@ public class DataResolverTest
      */
     private void resolveRangeTombstonesOnBoundary(long timestamp1, long timestamp2)
     {
-        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, command, ConsistencyLevel.ALL, 2, System.nanoTime());
         InetAddress peer1 = peer();
         InetAddress peer2 = peer();
 
@@ -562,7 +562,7 @@ public class DataResolverTest
     public void testResolveComplexDelete()
     {
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -611,7 +611,7 @@ public class DataResolverTest
     {
 
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -654,7 +654,7 @@ public class DataResolverTest
     public void testResolveNewCollection()
     {
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 
@@ -700,7 +700,7 @@ public class DataResolverTest
     public void testResolveNewCollectionOverwritingDeleted()
     {
         ReadCommand cmd = Util.cmd(cfs2, dk).withNowInSeconds(nowInSec).build();
-        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2);
+        DataResolver resolver = new DataResolver(ks, cmd, ConsistencyLevel.ALL, 2, System.nanoTime());
 
         long[] ts = {100, 200};
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
index 260c507..f8a77e1 100644
--- a/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
+++ b/test/unit/org/apache/cassandra/transport/MessagePayloadTest.java
@@ -316,12 +316,13 @@ public class MessagePayloadTest extends CQLTester
         public ResultMessage process(String query,
                                      QueryState state,
                                      QueryOptions options,
-                                     Map<String, ByteBuffer> customPayload)
-                                             throws RequestExecutionException, RequestValidationException
+                                     Map<String, ByteBuffer> customPayload,
+                                     long queryStartNanoTime)
+                                            throws RequestExecutionException, RequestValidationException
         {
             if (customPayload != null)
                 requestPayload = customPayload;
-            ResultMessage result = QueryProcessor.instance.process(query, state, options, customPayload);
+            ResultMessage result = QueryProcessor.instance.process(query, state, options, customPayload, queryStartNanoTime);
             if (customPayload != null)
             {
                 result.setCustomPayload(responsePayload);
@@ -333,12 +334,13 @@ public class MessagePayloadTest extends CQLTester
         public ResultMessage processBatch(BatchStatement statement,
                                           QueryState state,
                                           BatchQueryOptions options,
-                                          Map<String, ByteBuffer> customPayload)
+                                          Map<String, ByteBuffer> customPayload,
+                                          long queryStartNanoTime)
                                                   throws RequestExecutionException, RequestValidationException
         {
             if (customPayload != null)
                 requestPayload = customPayload;
-            ResultMessage result = QueryProcessor.instance.processBatch(statement, state, options, customPayload);
+            ResultMessage result = QueryProcessor.instance.processBatch(statement, state, options, customPayload, queryStartNanoTime);
             if (customPayload != null)
             {
                 result.setCustomPayload(responsePayload);
@@ -350,12 +352,13 @@ public class MessagePayloadTest extends CQLTester
         public ResultMessage processPrepared(CQLStatement statement,
                                              QueryState state,
                                              QueryOptions options,
-                                             Map<String, ByteBuffer> customPayload)
-                                                     throws RequestExecutionException, RequestValidationException
+                                             Map<String, ByteBuffer> customPayload,
+                                             long queryStartNanoTime)
+                                                    throws RequestExecutionException, RequestValidationException
         {
             if (customPayload != null)
                 requestPayload = customPayload;
-            ResultMessage result = QueryProcessor.instance.processPrepared(statement, state, options, customPayload);
+            ResultMessage result = QueryProcessor.instance.processPrepared(statement, state, options, customPayload, queryStartNanoTime);
             if (customPayload != null)
             {
                 result.setCustomPayload(responsePayload);


[3/3] cassandra git commit: Count entire coordinated request against timeout

Posted by ty...@apache.org.
Count entire coordinated request against timeout

Patch by Geoffrey Yu; reviewed by Tyler Hobbs for CASSANDRA-12256


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/aa83c942
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/aa83c942
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/aa83c942

Branch: refs/heads/trunk
Commit: aa83c942a51323d4a38bc023979ba70801c875b3
Parents: e83f9e6
Author: Geoffrey Yu <ge...@apple.com>
Authored: Tue Aug 16 14:35:11 2016 -0500
Committer: Tyler Hobbs <ty...@gmail.com>
Committed: Tue Aug 16 14:35:11 2016 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 NEWS.txt                                        |   9 +
 .../cassandra/auth/CassandraAuthorizer.java     |  11 +-
 .../cassandra/auth/CassandraRoleManager.java    |   3 +-
 .../cassandra/auth/PasswordAuthenticator.java   |   3 +-
 .../cassandra/batchlog/BatchlogManager.java     |   6 +-
 .../batchlog/LegacyBatchlogMigrator.java        |   5 +-
 .../org/apache/cassandra/cql3/CQLStatement.java |   3 +-
 .../CustomPayloadMirroringQueryHandler.java     |  15 +-
 .../org/apache/cassandra/cql3/QueryHandler.java |   9 +-
 .../apache/cassandra/cql3/QueryProcessor.java   |  44 ++---
 .../statements/AuthenticationStatement.java     |   2 +-
 .../cql3/statements/AuthorizationStatement.java |   2 +-
 .../cql3/statements/BatchStatement.java         |  33 ++--
 .../cql3/statements/DropIndexStatement.java     |   2 +-
 .../cql3/statements/ModificationStatement.java  |  54 +++---
 .../statements/SchemaAlteringStatement.java     |   2 +-
 .../cql3/statements/SelectStatement.java        |  29 +--
 .../cql3/statements/TruncateStatement.java      |   2 +-
 .../cassandra/cql3/statements/UseStatement.java |   4 +-
 .../db/CounterMutationVerbHandler.java          |   3 +-
 .../cassandra/db/PartitionRangeReadCommand.java |   4 +-
 src/java/org/apache/cassandra/db/ReadQuery.java |   4 +-
 .../db/SinglePartitionReadCommand.java          |   8 +-
 .../org/apache/cassandra/db/SystemKeyspace.java |   2 +-
 .../apache/cassandra/db/view/TableViews.java    |   3 +-
 .../apache/cassandra/db/view/ViewBuilder.java   |   2 +-
 .../locator/AbstractReplicationStrategy.java    |  15 +-
 .../apache/cassandra/repair/RepairRunnable.java |   2 +-
 .../cassandra/service/AbstractReadExecutor.java |  28 +--
 .../service/AbstractWriteResponseHandler.java   |  10 +-
 .../service/BatchlogResponseHandler.java        |   4 +-
 .../apache/cassandra/service/DataResolver.java  |  14 +-
 .../DatacenterSyncWriteResponseHandler.java     |   5 +-
 .../service/DatacenterWriteResponseHandler.java |   5 +-
 .../apache/cassandra/service/ReadCallback.java  |  23 ++-
 .../apache/cassandra/service/StorageProxy.java  | 183 ++++++++++---------
 .../cassandra/service/WriteResponseHandler.java |  13 +-
 .../service/pager/AbstractQueryPager.java       |   4 +-
 .../service/pager/AggregationQueryPager.java    |  39 ++--
 .../service/pager/MultiPartitionPager.java      |  12 +-
 .../cassandra/service/pager/QueryPager.java     |   4 +-
 .../cassandra/service/pager/QueryPagers.java    |   5 +-
 .../service/paxos/AbstractPaxosCallback.java    |   7 +-
 .../service/paxos/PrepareCallback.java          |   4 +-
 .../service/paxos/ProposeCallback.java          |   4 +-
 .../cassandra/thrift/CassandraServer.java       |  98 ++++++----
 .../cassandra/tracing/TraceStateImpl.java       |   2 +-
 .../org/apache/cassandra/transport/Message.java |   5 +-
 .../transport/messages/AuthResponse.java        |   2 +-
 .../transport/messages/BatchMessage.java        |   4 +-
 .../transport/messages/CredentialsMessage.java  |   2 +-
 .../transport/messages/ExecuteMessage.java      |   4 +-
 .../transport/messages/OptionsMessage.java      |   2 +-
 .../transport/messages/PrepareMessage.java      |   2 +-
 .../transport/messages/QueryMessage.java        |   4 +-
 .../transport/messages/RegisterMessage.java     |   2 +-
 .../transport/messages/StartupMessage.java      |   2 +-
 .../cassandra/cql3/PstmtPersistenceTest.java    |   2 +-
 .../cassandra/service/DataResolverTest.java     |  26 +--
 .../cassandra/transport/MessagePayloadTest.java |  19 +-
 61 files changed, 468 insertions(+), 354 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 700dd48..5fbcc6b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.10
+ * Count full coordinated request against timeout (CASSANDRA-12256)
  * Allow TTL with null value on insert and update (CASSANDRA-12216)
  * Make decommission operation resumable (CASSANDRA-12008)
  * Add support to one-way targeted repair (CASSANDRA-9876)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/NEWS.txt
----------------------------------------------------------------------
diff --git a/NEWS.txt b/NEWS.txt
index 9cfc58b..d25be0d 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -62,6 +62,15 @@ New features
 
 Upgrading
 ---------
+    - Request timeouts in cassandra.yaml (read_request_timeout_in_ms, etc) now apply to the
+      "full" request time on the coordinator.  Previously, they only covered the time from
+      when the coordinator sent a message to a replica until the time that the replica
+      responded.  Additionally, the previous behavior was to reset the timeout when performing
+      a read repair, making a second read to fix a short read, and when subranges were read
+      as part of a range scan or secondary index query.  In 3.10 and higher, the timeout
+      is no longer reset for these "subqueries".  The entire request must complete within
+      the specified timeout.  As a consequence, your timeouts may need to be adjusted
+      to account for this.  See CASSANDRA-12256 for more details.
     - Logs written to stdout are now consistent with logs written to files.
       Time is now local (it was UTC on the console and local in files). Date, thread, file
       and line info where added to stdout. (see CASSANDRA-12004)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
index a6c11d2..65ee7ec 100644
--- a/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
+++ b/src/java/org/apache/cassandra/auth/CassandraAuthorizer.java
@@ -201,7 +201,8 @@ public class CassandraAuthorizer implements IAuthorizer
                                                   Attributes.none());
         QueryProcessor.instance.processBatch(batch,
                                              QueryState.forInternalCalls(),
-                                             BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT));
+                                             BatchQueryOptions.withoutPerStatementVariables(QueryOptions.DEFAULT),
+                                             System.nanoTime());
 
     }
 
@@ -218,7 +219,7 @@ public class CassandraAuthorizer implements IAuthorizer
         SelectStatement statement = Schema.instance.getCFMetaData(AuthKeyspace.NAME, USER_PERMISSIONS) == null
                                     ? authorizeRoleStatement
                                     : legacyAuthorizeRoleStatement;
-        ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options) ;
+        ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
         UntypedResultSet result = UntypedResultSet.create(rows.result);
 
         if (!result.isEmpty() && result.one().has(PERMISSIONS))
@@ -428,12 +429,14 @@ public class CassandraAuthorizer implements IAuthorizer
                                             QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
                                                                           Lists.newArrayList(row.getBytes("username"),
                                                                                              row.getBytes("resource"),
-                                                                                             serializer.serialize(filteredPerms))));
+                                                                                             serializer.serialize(filteredPerms))),
+                                            System.nanoTime());
 
                     indexStatement.execute(QueryState.forInternalCalls(),
                                            QueryOptions.forInternalCalls(ConsistencyLevel.ONE,
                                                                          Lists.newArrayList(row.getBytes("resource"),
-                                                                                            row.getBytes("username"))));
+                                                                                            row.getBytes("username"))),
+                                           System.nanoTime());
 
                 }
                 logger.info("Completed conversion of legacy permissions");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
index 826e89d..f2a2cfb 100644
--- a/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
+++ b/src/java/org/apache/cassandra/auth/CassandraRoleManager.java
@@ -496,7 +496,8 @@ public class CassandraRoleManager implements IRoleManager
         ResultMessage.Rows rows =
             statement.execute(QueryState.forInternalCalls(),
                               QueryOptions.forInternalCalls(consistencyForRole(name),
-                                                            Collections.singletonList(ByteBufferUtil.bytes(name))));
+                                                            Collections.singletonList(ByteBufferUtil.bytes(name))),
+                              System.nanoTime());
         if (rows.result.isEmpty())
             return NULL_ROLE;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
index 74eb10d..0f79cd2 100644
--- a/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
+++ b/src/java/org/apache/cassandra/auth/PasswordAuthenticator.java
@@ -121,7 +121,8 @@ public class PasswordAuthenticator implements IAuthenticator
             ResultMessage.Rows rows =
                 authenticationStatement.execute(QueryState.forInternalCalls(),
                                                 QueryOptions.forInternalCalls(consistencyForRole(username),
-                                                                              Lists.newArrayList(ByteBufferUtil.bytes(username))));
+                                                                              Lists.newArrayList(ByteBufferUtil.bytes(username))),
+                                                System.nanoTime());
 
             // If either a non-existent role name was supplied, or no credentials
             // were found for that role we don't want to cache the result so we throw

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
index 0bc9185..ffff235 100644
--- a/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
+++ b/src/java/org/apache/cassandra/batchlog/BatchlogManager.java
@@ -442,7 +442,7 @@ public class BatchlogManager implements BatchlogManagerMBean
             if (liveEndpoints.isEmpty())
                 return null;
 
-            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints);
+            ReplayWriteResponseHandler<Mutation> handler = new ReplayWriteResponseHandler<>(liveEndpoints, System.nanoTime());
             MessageOut<Mutation> message = mutation.createMessage();
             for (InetAddress endpoint : liveEndpoints)
                 MessagingService.instance().sendRR(message, endpoint, handler, false);
@@ -465,9 +465,9 @@ public class BatchlogManager implements BatchlogManagerMBean
         {
             private final Set<InetAddress> undelivered = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
-            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints)
+            ReplayWriteResponseHandler(Collection<InetAddress> writeEndpoints, long queryStartNanoTime)
             {
-                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH);
+                super(writeEndpoints, Collections.<InetAddress>emptySet(), null, null, null, WriteType.UNLOGGED_BATCH, queryStartNanoTime);
                 undelivered.addAll(writeEndpoints);
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
index 3a8bf83..1a70f9f 100644
--- a/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
+++ b/src/java/org/apache/cassandra/batchlog/LegacyBatchlogMigrator.java
@@ -137,14 +137,15 @@ public final class LegacyBatchlogMigrator
         }
     }
 
-    public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
+    public static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid, long queryStartNanoTime)
     {
         AbstractWriteResponseHandler<IMutation> handler = new WriteResponseHandler<>(endpoints,
                                                                                      Collections.<InetAddress>emptyList(),
                                                                                      ConsistencyLevel.ANY,
                                                                                      Keyspace.open(SystemKeyspace.NAME),
                                                                                      null,
-                                                                                     WriteType.SIMPLE);
+                                                                                     WriteType.SIMPLE,
+                                                                                     queryStartNanoTime);
         Mutation mutation = getRemoveMutation(uuid);
 
         for (InetAddress target : endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/CQLStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CQLStatement.java b/src/java/org/apache/cassandra/cql3/CQLStatement.java
index 02292ad..901ecd4 100644
--- a/src/java/org/apache/cassandra/cql3/CQLStatement.java
+++ b/src/java/org/apache/cassandra/cql3/CQLStatement.java
@@ -50,8 +50,9 @@ public interface CQLStatement
      *
      * @param state the current query state
      * @param options options for this query (consistency, variables, pageSize, ...)
+     * @param queryStartNanoTime the timestamp returned by System.nanoTime() when this statement was received
      */
-    public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException, RequestExecutionException;
+    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Variant of execute used for internal query against the system tables, and thus only query the local node.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
index 02a6df9..643c54b 100644
--- a/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/CustomPayloadMirroringQueryHandler.java
@@ -38,9 +38,10 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler
     public ResultMessage process(String query,
                                  QueryState state,
                                  QueryOptions options,
-                                 Map<String, ByteBuffer> customPayload)
+                                 Map<String, ByteBuffer> customPayload,
+                                 long queryStartNanoTime)
     {
-        ResultMessage result = queryProcessor.process(query, state, options, customPayload);
+        ResultMessage result = queryProcessor.process(query, state, options, customPayload, queryStartNanoTime);
         result.setCustomPayload(customPayload);
         return result;
     }
@@ -65,9 +66,10 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler
     public ResultMessage processPrepared(CQLStatement statement,
                                          QueryState state,
                                          QueryOptions options,
-                                         Map<String, ByteBuffer> customPayload)
+                                         Map<String, ByteBuffer> customPayload,
+                                         long queryStartNanoTime)
     {
-        ResultMessage result = queryProcessor.processPrepared(statement, state, options, customPayload);
+        ResultMessage result = queryProcessor.processPrepared(statement, state, options, customPayload, queryStartNanoTime);
         result.setCustomPayload(customPayload);
         return result;
     }
@@ -75,9 +77,10 @@ public class CustomPayloadMirroringQueryHandler implements QueryHandler
     public ResultMessage processBatch(BatchStatement statement,
                                       QueryState state,
                                       BatchQueryOptions options,
-                                      Map<String, ByteBuffer> customPayload)
+                                      Map<String, ByteBuffer> customPayload,
+                                      long queryStartNanoTime)
     {
-        ResultMessage result = queryProcessor.processBatch(statement, state, options, customPayload);
+        ResultMessage result = queryProcessor.processBatch(statement, state, options, customPayload, queryStartNanoTime);
         result.setCustomPayload(customPayload);
         return result;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/QueryHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryHandler.java b/src/java/org/apache/cassandra/cql3/QueryHandler.java
index 3c11c0e..2108d4c 100644
--- a/src/java/org/apache/cassandra/cql3/QueryHandler.java
+++ b/src/java/org/apache/cassandra/cql3/QueryHandler.java
@@ -33,7 +33,8 @@ public interface QueryHandler
     ResultMessage process(String query,
                           QueryState state,
                           QueryOptions options,
-                          Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException;
+                          Map<String, ByteBuffer> customPayload,
+                          long queryStartNanoTime) throws RequestExecutionException, RequestValidationException;
 
     ResultMessage.Prepared prepare(String query,
                                    QueryState state,
@@ -46,10 +47,12 @@ public interface QueryHandler
     ResultMessage processPrepared(CQLStatement statement,
                                   QueryState state,
                                   QueryOptions options,
-                                  Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException;
+                                  Map<String, ByteBuffer> customPayload,
+                                  long queryStartNanoTime) throws RequestExecutionException, RequestValidationException;
 
     ResultMessage processBatch(BatchStatement statement,
                                QueryState state,
                                BatchQueryOptions options,
-                               Map<String, ByteBuffer> customPayload) throws RequestExecutionException, RequestValidationException;
+                               Map<String, ByteBuffer> customPayload,
+                               long queryStartNanoTime) throws RequestExecutionException, RequestValidationException;
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index 899b36d..47462e4 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -203,7 +203,7 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
-    public ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options)
+    public ResultMessage processStatement(CQLStatement statement, QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         logger.trace("Process {} @CL.{}", statement, options.getConsistency());
@@ -211,26 +211,26 @@ public class QueryProcessor implements QueryHandler
         statement.checkAccess(clientState);
         statement.validate(clientState);
 
-        ResultMessage result = statement.execute(queryState, options);
+        ResultMessage result = statement.execute(queryState, options, queryStartNanoTime);
         return result == null ? new ResultMessage.Void() : result;
     }
 
-    public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState)
+    public static ResultMessage process(String queryString, ConsistencyLevel cl, QueryState queryState, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
-        return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()));
+        return instance.process(queryString, queryState, QueryOptions.forInternalCalls(cl, Collections.<ByteBuffer>emptyList()), queryStartNanoTime);
     }
 
     public ResultMessage process(String query,
                                  QueryState state,
                                  QueryOptions options,
-                                 Map<String, ByteBuffer> customPayload)
-                                         throws RequestExecutionException, RequestValidationException
+                                 Map<String, ByteBuffer> customPayload,
+                                 long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
     {
-        return process(query, state, options);
+        return process(query, state, options, queryStartNanoTime);
     }
 
-    public ResultMessage process(String queryString, QueryState queryState, QueryOptions options)
+    public ResultMessage process(String queryString, QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         ParsedStatement.Prepared p = getStatement(queryString, queryState.getClientState());
@@ -242,7 +242,7 @@ public class QueryProcessor implements QueryHandler
         if (!queryState.getClientState().isInternal)
             metrics.regularStatementsExecuted.inc();
 
-        return processStatement(prepared, queryState, options);
+        return processStatement(prepared, queryState, options, queryStartNanoTime);
     }
 
     public static ParsedStatement.Prepared parseStatement(String queryStr, QueryState queryState) throws RequestValidationException
@@ -257,7 +257,7 @@ public class QueryProcessor implements QueryHandler
 
     public static UntypedResultSet process(String query, ConsistencyLevel cl, List<ByteBuffer> values) throws RequestExecutionException
     {
-        ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, values));
+        ResultMessage result = instance.process(query, QueryState.forInternalCalls(), QueryOptions.forInternalCalls(cl, values), System.nanoTime());
         if (result instanceof ResultMessage.Rows)
             return UntypedResultSet.create(((ResultMessage.Rows)result).result);
         else
@@ -319,7 +319,7 @@ public class QueryProcessor implements QueryHandler
         try
         {
             ParsedStatement.Prepared prepared = prepareInternal(query);
-            ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared, values, cl));
+            ResultMessage result = prepared.statement.execute(state, makeInternalOptions(prepared, values, cl), System.nanoTime());
             if (result instanceof ResultMessage.Rows)
                 return UntypedResultSet.create(((ResultMessage.Rows)result).result);
             else
@@ -362,12 +362,12 @@ public class QueryProcessor implements QueryHandler
      * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
      * cases.
      */
-    public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values)
+    public static UntypedResultSet executeInternalWithNow(int nowInSec, long queryStartNanoTime, String query, Object... values)
     {
         ParsedStatement.Prepared prepared = prepareInternal(query);
         assert prepared.statement instanceof SelectStatement;
         SelectStatement select = (SelectStatement)prepared.statement;
-        ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec);
+        ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec, queryStartNanoTime);
         assert result instanceof ResultMessage.Rows;
         return UntypedResultSet.create(((ResultMessage.Rows)result).result);
     }
@@ -480,13 +480,14 @@ public class QueryProcessor implements QueryHandler
     public ResultMessage processPrepared(CQLStatement statement,
                                          QueryState state,
                                          QueryOptions options,
-                                         Map<String, ByteBuffer> customPayload)
+                                         Map<String, ByteBuffer> customPayload,
+                                         long queryStartNanoTime)
                                                  throws RequestExecutionException, RequestValidationException
     {
-        return processPrepared(statement, state, options);
+        return processPrepared(statement, state, options, queryStartNanoTime);
     }
 
-    public ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options)
+    public ResultMessage processPrepared(CQLStatement statement, QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         List<ByteBuffer> variables = options.getValues();
@@ -506,26 +507,27 @@ public class QueryProcessor implements QueryHandler
         }
 
         metrics.preparedStatementsExecuted.inc();
-        return processStatement(statement, queryState, options);
+        return processStatement(statement, queryState, options, queryStartNanoTime);
     }
 
     public ResultMessage processBatch(BatchStatement statement,
                                       QueryState state,
                                       BatchQueryOptions options,
-                                      Map<String, ByteBuffer> customPayload)
+                                      Map<String, ByteBuffer> customPayload,
+                                      long queryStartNanoTime)
                                               throws RequestExecutionException, RequestValidationException
     {
-        return processBatch(statement, state, options);
+        return processBatch(statement, state, options, queryStartNanoTime);
     }
 
-    public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options)
+    public ResultMessage processBatch(BatchStatement batch, QueryState queryState, BatchQueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         ClientState clientState = queryState.getClientState();
         batch.checkAccess(clientState);
         batch.validate();
         batch.validate(clientState);
-        return batch.execute(queryState, options);
+        return batch.execute(queryState, options, queryStartNanoTime);
     }
 
     public static ParsedStatement.Prepared getStatement(String queryStr, ClientState clientState)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
index 151e4f0..0283009 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthenticationStatement.java
@@ -41,7 +41,7 @@ public abstract class AuthenticationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options)
+    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
index 098e22c..83081c8 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AuthorizationStatement.java
@@ -42,7 +42,7 @@ public abstract class AuthorizationStatement extends ParsedStatement implements
         return 0;
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options)
+    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime)
     throws RequestValidationException, RequestExecutionException
     {
         return execute(state.getClientState());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 14638e2..ae64e7a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -217,7 +217,7 @@ public class BatchStatement implements CQLStatement
         return statements;
     }
 
-    private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now)
+    private Collection<? extends IMutation> getMutations(BatchQueryOptions options, boolean local, long now, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         Set<String> tablesWithZeroGcGs = null;
@@ -233,7 +233,7 @@ public class BatchStatement implements CQLStatement
             }
             QueryOptions statementOptions = options.forStatement(i);
             long timestamp = attrs.getTimestamp(now, statementOptions);
-            statement.addUpdates(collector, statementOptions, local, timestamp);
+            statement.addUpdates(collector, statementOptions, local, timestamp, queryStartNanoTime);
         }
 
         if (tablesWithZeroGcGs != null)
@@ -335,17 +335,17 @@ public class BatchStatement implements CQLStatement
     }
 
 
-    public ResultMessage execute(QueryState queryState, QueryOptions options) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
     {
-        return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options));
+        return execute(queryState, BatchQueryOptions.withoutPerStatementVariables(options), queryStartNanoTime);
     }
 
-    public ResultMessage execute(QueryState queryState, BatchQueryOptions options) throws RequestExecutionException, RequestValidationException
+    public ResultMessage execute(QueryState queryState, BatchQueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
     {
-        return execute(queryState, options, false, options.getTimestamp(queryState));
+        return execute(queryState, options, false, options.getTimestamp(queryState), queryStartNanoTime);
     }
 
-    private ResultMessage execute(QueryState queryState, BatchQueryOptions options, boolean local, long now)
+    private ResultMessage execute(QueryState queryState, BatchQueryOptions options, boolean local, long now, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
@@ -354,13 +354,13 @@ public class BatchStatement implements CQLStatement
             throw new InvalidRequestException("Invalid empty serial consistency level");
 
         if (hasConditions)
-            return executeWithConditions(options, queryState);
+            return executeWithConditions(options, queryState, queryStartNanoTime);
 
-        executeWithoutConditions(getMutations(options, local, now), options.getConsistency());
+        executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
         return new ResultMessage.Void();
     }
 
-    private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl) throws RequestExecutionException, RequestValidationException
+    private void executeWithoutConditions(Collection<? extends IMutation> mutations, ConsistencyLevel cl, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
     {
         if (mutations.isEmpty())
             return;
@@ -369,10 +369,10 @@ public class BatchStatement implements CQLStatement
         verifyBatchType(mutations);
 
         boolean mutateAtomic = (isLogged() && mutations.size() > 1);
-        StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic);
+        StorageProxy.mutateWithTriggers(mutations, cl, mutateAtomic, queryStartNanoTime);
     }
 
-    private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state)
+    private ResultMessage executeWithConditions(BatchQueryOptions options, QueryState state, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         Pair<CQL3CasRequest, Set<ColumnDefinition>> p = makeCasRequest(options, state);
@@ -388,7 +388,8 @@ public class BatchStatement implements CQLStatement
                                                    casRequest,
                                                    options.getSerialConsistency(),
                                                    options.getConsistency(),
-                                                   state.getClientState()))
+                                                   state.getClientState(),
+                                                   queryStartNanoTime))
         {
             return new ResultMessage.Rows(ModificationStatement.buildCasResultSet(ksName, tableName, result, columnsWithConditions, true, options.forStatement(0)));
         }
@@ -447,13 +448,13 @@ public class BatchStatement implements CQLStatement
         if (hasConditions)
             return executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options), queryState);
 
-        executeInternalWithoutCondition(queryState, options);
+        executeInternalWithoutCondition(queryState, options, System.nanoTime());
         return new ResultMessage.Void();
     }
 
-    private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
+    private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryStartNanoTime))
             mutation.apply();
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
index 85f5f0d..70a86db 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropIndexStatement.java
@@ -64,7 +64,7 @@ public class DropIndexStatement extends SchemaAlteringStatement
     }
 
     @Override
-    public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException
+    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException
     {
         Event.SchemaChange ce = announceMigration(false);
         return ce == null ? null : new ResultMessage.SchemaChange(ce);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 822664a..d32a689 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -345,7 +345,8 @@ public abstract class ModificationStatement implements CQLStatement
                                                            ClusteringIndexFilter filter,
                                                            DataLimits limits,
                                                            boolean local,
-                                                           ConsistencyLevel cl)
+                                                           ConsistencyLevel cl,
+                                                           long queryStartNanoTime)
     {
         if (!requiresRead())
             return null;
@@ -381,7 +382,7 @@ public abstract class ModificationStatement implements CQLStatement
             }
         }
 
-        try (PartitionIterator iter = group.execute(cl, null))
+        try (PartitionIterator iter = group.execute(cl, null, queryStartNanoTime))
         {
             return asMaterializedMap(iter);
         }
@@ -405,18 +406,18 @@ public abstract class ModificationStatement implements CQLStatement
         return !conditions.isEmpty();
     }
 
-    public ResultMessage execute(QueryState queryState, QueryOptions options)
+    public ResultMessage execute(QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         if (options.getConsistency() == null)
             throw new InvalidRequestException("Invalid empty consistency level");
 
         return hasConditions()
-             ? executeWithCondition(queryState, options)
-             : executeWithoutCondition(queryState, options);
+             ? executeWithCondition(queryState, options, queryStartNanoTime)
+             : executeWithoutCondition(queryState, options, queryStartNanoTime);
     }
 
-    private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options)
+    private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         ConsistencyLevel cl = options.getConsistency();
@@ -425,14 +426,14 @@ public abstract class ModificationStatement implements CQLStatement
         else
             cl.validateForWrite(cfm.ksName);
 
-        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState));
+        Collection<? extends IMutation> mutations = getMutations(options, false, options.getTimestamp(queryState), queryStartNanoTime);
         if (!mutations.isEmpty())
-            StorageProxy.mutateWithTriggers(mutations, cl, false);
+            StorageProxy.mutateWithTriggers(mutations, cl, false, queryStartNanoTime);
 
         return null;
     }
 
-    public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options)
+    public ResultMessage executeWithCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
         CQL3CasRequest request = makeCasRequest(queryState, options);
@@ -443,7 +444,8 @@ public abstract class ModificationStatement implements CQLStatement
                                                    request,
                                                    options.getSerialConsistency(),
                                                    options.getConsistency(),
-                                                   queryState.getClientState()))
+                                                   queryState.getClientState(),
+                                                   queryStartNanoTime))
         {
             return new ResultMessage.Rows(buildCasResultSet(result, options));
         }
@@ -561,12 +563,12 @@ public abstract class ModificationStatement implements CQLStatement
     {
         return hasConditions()
                ? executeInternalWithCondition(queryState, options)
-               : executeInternalWithoutCondition(queryState, options);
+               : executeInternalWithoutCondition(queryState, options, System.nanoTime());
     }
 
-    public ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException
+    public ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : getMutations(options, true, queryState.getTimestamp()))
+        for (IMutation mutation : getMutations(options, true, queryState.getTimestamp(), queryStartNanoTime))
             mutation.apply();
         return null;
     }
@@ -612,10 +614,10 @@ public abstract class ModificationStatement implements CQLStatement
      *
      * @return list of the mutations
      */
-    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now)
+    private Collection<? extends IMutation> getMutations(QueryOptions options, boolean local, long now, long queryStartNanoTime)
     {
         UpdatesCollector collector = new UpdatesCollector(Collections.singletonMap(cfm.cfId, updatedColumns), 1);
-        addUpdates(collector, options, local, now);
+        addUpdates(collector, options, local, now, queryStartNanoTime);
         collector.validateIndexedColumns();
 
         return collector.toMutations();
@@ -624,7 +626,8 @@ public abstract class ModificationStatement implements CQLStatement
     final void addUpdates(UpdatesCollector collector,
                           QueryOptions options,
                           boolean local,
-                          long now)
+                          long now,
+                          long queryStartNanoTime)
     {
         List<ByteBuffer> keys = buildPartitionKeyNames(options);
 
@@ -643,7 +646,8 @@ public abstract class ModificationStatement implements CQLStatement
                                                            options,
                                                            DataLimits.NONE,
                                                            local,
-                                                           now);
+                                                           now,
+                                                           queryStartNanoTime);
             for (ByteBuffer key : keys)
             {
                 ThriftValidation.validateKey(cfm, key);
@@ -659,7 +663,7 @@ public abstract class ModificationStatement implements CQLStatement
         {
             NavigableSet<Clustering> clusterings = createClustering(options);
 
-            UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now);
+            UpdateParameters params = makeUpdateParameters(keys, clusterings, options, local, now, queryStartNanoTime);
 
             for (ByteBuffer key : keys)
             {
@@ -703,7 +707,8 @@ public abstract class ModificationStatement implements CQLStatement
                                                   NavigableSet<Clustering> clusterings,
                                                   QueryOptions options,
                                                   boolean local,
-                                                  long now)
+                                                  long now,
+                                                  long queryStartNanoTime)
     {
         if (clusterings.contains(Clustering.STATIC_CLUSTERING))
             return makeUpdateParameters(keys,
@@ -711,14 +716,16 @@ public abstract class ModificationStatement implements CQLStatement
                                         options,
                                         DataLimits.cqlLimits(1),
                                         local,
-                                        now);
+                                        now,
+                                        queryStartNanoTime);
 
         return makeUpdateParameters(keys,
                                     new ClusteringIndexNamesFilter(clusterings, false),
                                     options,
                                     DataLimits.NONE,
                                     local,
-                                    now);
+                                    now,
+                                    queryStartNanoTime);
     }
 
     private UpdateParameters makeUpdateParameters(Collection<ByteBuffer> keys,
@@ -726,10 +733,11 @@ public abstract class ModificationStatement implements CQLStatement
                                                   QueryOptions options,
                                                   DataLimits limits,
                                                   boolean local,
-                                                  long now)
+                                                  long now,
+                                                  long queryStartNanoTime)
     {
         // Some lists operation requires reading
-        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency());
+        Map<DecoratedKey, Partition> lists = readRequiredLists(keys, filter, limits, local, options.getConsistency(), queryStartNanoTime);
         return new UpdateParameters(cfm, updatedColumns(), options, getTimestamp(now, options), getTimeToLive(options), lists);
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
index 10c004c..139c566 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SchemaAlteringStatement.java
@@ -86,7 +86,7 @@ public abstract class SchemaAlteringStatement extends CFStatement implements CQL
      */
     public abstract Event.SchemaChange announceMigration(boolean isLocalOnly) throws RequestValidationException;
 
-    public ResultMessage execute(QueryState state, QueryOptions options) throws RequestValidationException
+    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestValidationException
     {
         // If an IF [NOT] EXISTS clause was used, this may not result in an actual schema change.  To avoid doing
         // extra work in the drivers to handle schema changes, we return an empty message in this case. (CASSANDRA-7600)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7cd2be0..6afaa20 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -228,7 +228,7 @@ public class SelectStatement implements CQLStatement
         // Nothing to do, all validation has been done by RawStatement.prepare()
     }
 
-    public ResultMessage.Rows execute(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
     {
         ConsistencyLevel cl = options.getConsistency();
         checkNotNull(cl, "Invalid empty consistency level");
@@ -242,11 +242,11 @@ public class SelectStatement implements CQLStatement
         ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit, pageSize);
 
         if (aggregationSpec == null && (pageSize <= 0 || (query.limits().count() <= pageSize)))
-            return execute(query, options, state, nowInSec, userLimit);
+            return execute(query, options, state, nowInSec, userLimit, queryStartNanoTime);
 
         QueryPager pager = getPager(query, options);
 
-        return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit);
+        return execute(Pager.forDistributedQuery(pager, cl, state.getClientState()), options, pageSize, nowInSec, userLimit, queryStartNanoTime);
     }
 
     public ReadQuery getQuery(QueryOptions options, int nowInSec) throws RequestValidationException
@@ -270,9 +270,9 @@ public class SelectStatement implements CQLStatement
                                        QueryOptions options,
                                        QueryState state,
                                        int nowInSec,
-                                       int userLimit) throws RequestValidationException, RequestExecutionException
+                                       int userLimit, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
-        try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState()))
+        try (PartitionIterator data = query.execute(options.getConsistency(), state.getClientState(), queryStartNanoTime))
         {
             return processResults(data, options, nowInSec, userLimit);
         }
@@ -308,7 +308,7 @@ public class SelectStatement implements CQLStatement
             return pager.state();
         }
 
-        public abstract PartitionIterator fetchPage(int pageSize);
+        public abstract PartitionIterator fetchPage(int pageSize, long queryStartNanoTime);
 
         public static class NormalPager extends Pager
         {
@@ -322,9 +322,9 @@ public class SelectStatement implements CQLStatement
                 this.clientState = clientState;
             }
 
-            public PartitionIterator fetchPage(int pageSize)
+            public PartitionIterator fetchPage(int pageSize, long queryStartNanoTime)
             {
-                return pager.fetchPage(pageSize, consistency, clientState);
+                return pager.fetchPage(pageSize, consistency, clientState, queryStartNanoTime);
             }
         }
 
@@ -338,7 +338,7 @@ public class SelectStatement implements CQLStatement
                 this.executionController = executionController;
             }
 
-            public PartitionIterator fetchPage(int pageSize)
+            public PartitionIterator fetchPage(int pageSize, long queryStartNanoTime)
             {
                 return pager.fetchPageInternal(pageSize, executionController);
             }
@@ -349,7 +349,8 @@ public class SelectStatement implements CQLStatement
                                        QueryOptions options,
                                        int pageSize,
                                        int nowInSec,
-                                       int userLimit) throws RequestValidationException, RequestExecutionException
+                                       int userLimit,
+                                       long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
         if (aggregationSpec != null)
         {
@@ -370,7 +371,7 @@ public class SelectStatement implements CQLStatement
                   + " you must either remove the ORDER BY or the IN and sort client side, or disable paging for this query");
 
         ResultMessage.Rows msg;
-        try (PartitionIterator page = pager.fetchPage(pageSize))
+        try (PartitionIterator page = pager.fetchPage(pageSize, queryStartNanoTime))
         {
             msg = processResults(page, options, nowInSec, userLimit);
         }
@@ -400,10 +401,10 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        return executeInternal(state, options, FBUtilities.nowInSeconds());
+        return executeInternal(state, options, FBUtilities.nowInSeconds(), System.nanoTime());
     }
 
-    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException
     {
         int userLimit = getLimit(options);
         int userPerPartitionLimit = getPerPartitionLimit(options);
@@ -423,7 +424,7 @@ public class SelectStatement implements CQLStatement
             {
                 QueryPager pager = getPager(query, options);
 
-                return execute(Pager.forInternalQuery(pager, executionController), options, pageSize, nowInSec, userLimit);
+                return execute(Pager.forInternalQuery(pager, executionController), options, pageSize, nowInSec, userLimit, queryStartNanoTime);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index fa3c0f3..1478efd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -59,7 +59,7 @@ public class TruncateStatement extends CFStatement implements CQLStatement
         ThriftValidation.validateColumnFamily(keyspace(), columnFamily());
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options) throws InvalidRequestException, TruncateException
+    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws InvalidRequestException, TruncateException
     {
         try
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
index fe3d518..02a678a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/UseStatement.java
@@ -53,7 +53,7 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
     }
 
-    public ResultMessage execute(QueryState state, QueryOptions options) throws InvalidRequestException
+    public ResultMessage execute(QueryState state, QueryOptions options, long queryStartNanoTime) throws InvalidRequestException
     {
         state.getClientState().setKeyspace(keyspace);
         return new ResultMessage.SetKeyspace(keyspace);
@@ -63,6 +63,6 @@ public class UseStatement extends ParsedStatement implements CQLStatement
     {
         // In production, internal queries are exclusively on the system keyspace and 'use' is thus useless
         // but for some unit tests we need to set the keyspace (e.g. for tests with DROP INDEX)
-        return execute(state, options);
+        return execute(state, options, System.nanoTime());
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
index d1b1fa2..bd273e4 100644
--- a/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/CounterMutationVerbHandler.java
@@ -33,6 +33,7 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
 
     public void doVerb(final MessageIn<CounterMutation> message, final int id)
     {
+        long queryStartNanoTime = System.nanoTime();
         final CounterMutation cm = message.payload;
         logger.trace("Applying forwarded {}", cm);
 
@@ -50,6 +51,6 @@ public class CounterMutationVerbHandler implements IVerbHandler<CounterMutation>
             {
                 MessagingService.instance().sendReply(WriteResponse.createMessage(), id, message.from);
             }
-        });
+        }, queryStartNanoTime);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 439dc30..9e7a9d0 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -179,9 +179,9 @@ public class PartitionRangeReadCommand extends ReadCommand
         return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
     }
 
-    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
     {
-        return StorageProxy.getRangeSlice(this, consistency);
+        return StorageProxy.getRangeSlice(this, consistency, queryStartNanoTime);
     }
 
     public QueryPager getPager(PagingState pagingState, int protocolVersion)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/ReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ReadQuery.java b/src/java/org/apache/cassandra/db/ReadQuery.java
index 39b0662..d74834c 100644
--- a/src/java/org/apache/cassandra/db/ReadQuery.java
+++ b/src/java/org/apache/cassandra/db/ReadQuery.java
@@ -40,7 +40,7 @@ public interface ReadQuery
             return ReadExecutionController.empty();
         }
 
-        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
         {
             return EmptyIterators.partition();
         }
@@ -94,7 +94,7 @@ public interface ReadQuery
      *
      * @return the result of the query.
      */
-    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException;
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException;
 
     /**
      * Execute the query for internal queries (that is, it basically executes the query locally).

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
index c6cbdb4..cea8c0c 100644
--- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
+++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
@@ -343,9 +343,9 @@ public class SinglePartitionReadCommand extends ReadCommand
                                               clusteringIndexFilter);
     }
 
-    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+    public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
     {
-        return StorageProxy.read(Group.one(this), consistency, clientState);
+        return StorageProxy.read(Group.one(this), consistency, clientState, queryStartNanoTime);
     }
 
     public SinglePartitionPager getPager(PagingState pagingState, int protocolVersion)
@@ -983,9 +983,9 @@ public class SinglePartitionReadCommand extends ReadCommand
             return new Group(Collections.singletonList(command), command.limits());
         }
 
-        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException
+        public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException
         {
-            return StorageProxy.read(this, consistency, clientState);
+            return StorageProxy.read(this, consistency, clientState, queryStartNanoTime);
         }
 
         public int nowInSec()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 2083d54..def21bf 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1131,7 +1131,7 @@ public final class SystemKeyspace
     public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId);
+        UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, System.nanoTime(), String.format(req, PAXOS), key.getKey(), metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/view/TableViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TableViews.java b/src/java/org/apache/cassandra/db/view/TableViews.java
index e4cdde3..a57a949 100644
--- a/src/java/org/apache/cassandra/db/view/TableViews.java
+++ b/src/java/org/apache/cassandra/db/view/TableViews.java
@@ -126,6 +126,7 @@ public class TableViews extends AbstractCollection<View>
 
         // Read modified rows
         int nowInSec = FBUtilities.nowInSeconds();
+        long queryStartNanoTime = System.nanoTime();
         SinglePartitionReadCommand command = readExistingRowsCommand(update, views, nowInSec);
         if (command == null)
             return;
@@ -142,7 +143,7 @@ public class TableViews extends AbstractCollection<View>
         Keyspace.openAndGetStore(update.metadata()).metric.viewReadTime.update(System.nanoTime() - start, TimeUnit.NANOSECONDS);
 
         if (!mutations.isEmpty())
-            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete);
+            StorageProxy.mutateMV(update.partitionKey().getKey(), mutations, writeCommitLog, baseComplete, queryStartNanoTime);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/db/view/ViewBuilder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/ViewBuilder.java b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
index 4bd95d4..8ce3d9f 100644
--- a/src/java/org/apache/cassandra/db/view/ViewBuilder.java
+++ b/src/java/org/apache/cassandra/db/view/ViewBuilder.java
@@ -91,7 +91,7 @@ public class ViewBuilder extends CompactionInfo.Holder
         if (!mutations.isEmpty())
         {
             AtomicLong noBase = new AtomicLong(Long.MAX_VALUE);
-            StorageProxy.mutateMV(key.getKey(), mutations, true, noBase);
+            StorageProxy.mutateMV(key.getKey(), mutations, true, noBase, System.nanoTime());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
index c90c6a1..038ac9f 100644
--- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
+++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java
@@ -129,21 +129,22 @@ public abstract class AbstractReplicationStrategy
     public abstract List<InetAddress> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata);
 
     public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddress> naturalEndpoints,
-                                                                Collection<InetAddress> pendingEndpoints,
-                                                                ConsistencyLevel consistency_level,
-                                                                Runnable callback,
-                                                                WriteType writeType)
+                                                                       Collection<InetAddress> pendingEndpoints,
+                                                                       ConsistencyLevel consistency_level,
+                                                                       Runnable callback,
+                                                                       WriteType writeType,
+                                                                       long queryStartNanoTime)
     {
         if (consistency_level.isDatacenterLocal())
         {
             // block for in this context will be localnodes block.
-            return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+            return new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
         }
         else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy))
         {
-            return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+            return new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
         }
-        return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType);
+        return new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime);
     }
 
     private Keyspace getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index b69d8ce..a34401a 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -410,7 +410,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti
                     QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes,
                                                                                                                   tminBytes,
                                                                                                                   tmaxBytes));
-                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
+                    ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options, System.nanoTime());
                     UntypedResultSet result = UntypedResultSet.create(rows.result);
 
                     for (UntypedResultSet.Row r : result)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index cae1f1a..7aa926e 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -63,11 +63,11 @@ public abstract class AbstractReadExecutor
     protected final ReadCallback handler;
     protected final TraceState traceState;
 
-    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+    AbstractReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime)
     {
         this.command = command;
         this.targetReplicas = targetReplicas;
-        this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas);
+        this.handler = new ReadCallback(new DigestResolver(keyspace, command, consistencyLevel, targetReplicas.size()), consistencyLevel, command, targetReplicas, queryStartNanoTime);
         this.traceState = Tracing.instance.get();
 
         // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes
@@ -148,7 +148,7 @@ public abstract class AbstractReadExecutor
     /**
      * @return an executor appropriate for the configured speculative read policy
      */
-    public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel) throws UnavailableException
+    public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException
     {
         Keyspace keyspace = Keyspace.open(command.metadata().ksName);
         List<InetAddress> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
@@ -175,14 +175,14 @@ public abstract class AbstractReadExecutor
         if (retry.equals(SpeculativeRetryParam.NONE)
             || consistencyLevel == ConsistencyLevel.EACH_QUORUM
             || consistencyLevel.blockFor(keyspace) == allReplicas.size())
-            return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel, targetReplicas);
+            return new NeverSpeculatingReadExecutor(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime);
 
         if (targetReplicas.size() == allReplicas.size())
         {
             // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
             // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy
             // (same amount of requests in total, but we turn 1 digest request into a full blown data request).
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
         }
 
         // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
@@ -203,16 +203,16 @@ public abstract class AbstractReadExecutor
         targetReplicas.add(extraReplica);
 
         if (retry.equals(SpeculativeRetryParam.ALWAYS))
-            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
+            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
         else // PERCENTILE or CUSTOM.
-            return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas);
+            return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime);
     }
 
     public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor
     {
-        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
+        public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas, long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime);
         }
 
         public void executeAsync()
@@ -242,9 +242,10 @@ public abstract class AbstractReadExecutor
                                        ColumnFamilyStore cfs,
                                        ReadCommand command,
                                        ConsistencyLevel consistencyLevel,
-                                       List<InetAddress> targetReplicas)
+                                       List<InetAddress> targetReplicas,
+                                       long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime);
             this.cfs = cfs;
         }
 
@@ -314,9 +315,10 @@ public abstract class AbstractReadExecutor
                                              ColumnFamilyStore cfs,
                                              ReadCommand command,
                                              ConsistencyLevel consistencyLevel,
-                                             List<InetAddress> targetReplicas)
+                                             List<InetAddress> targetReplicas,
+                                             long queryStartNanoTime)
         {
-            super(keyspace, command, consistencyLevel, targetReplicas);
+            super(keyspace, command, consistencyLevel, targetReplicas, queryStartNanoTime);
             this.cfs = cfs;
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index 19b3de0..f412515 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -41,7 +41,6 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
 
     private final SimpleCondition condition = new SimpleCondition();
     protected final Keyspace keyspace;
-    protected final long start;
     protected final Collection<InetAddress> naturalEndpoints;
     public final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
@@ -50,24 +49,27 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
     private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
         = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures");
     private volatile int failures = 0;
+    private final long queryStartNanoTime;
 
     /**
      * @param callback A callback to be called when the write is successful.
+     * @param queryStartNanoTime
      */
     protected AbstractWriteResponseHandler(Keyspace keyspace,
                                            Collection<InetAddress> naturalEndpoints,
                                            Collection<InetAddress> pendingEndpoints,
                                            ConsistencyLevel consistencyLevel,
                                            Runnable callback,
-                                           WriteType writeType)
+                                           WriteType writeType,
+                                           long queryStartNanoTime)
     {
         this.keyspace = keyspace;
         this.pendingEndpoints = pendingEndpoints;
-        this.start = System.nanoTime();
         this.consistencyLevel = consistencyLevel;
         this.naturalEndpoints = naturalEndpoints;
         this.callback = callback;
         this.writeType = writeType;
+        this.queryStartNanoTime = queryStartNanoTime;
     }
 
     public void get() throws WriteTimeoutException, WriteFailureException
@@ -76,7 +78,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW
                             ? DatabaseDescriptor.getCounterWriteRpcTimeout()
                             : DatabaseDescriptor.getWriteRpcTimeout();
 
-        long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - start);
+        long timeout = TimeUnit.MILLISECONDS.toNanos(requestTimeout) - (System.nanoTime() - queryStartNanoTime);
 
         boolean success;
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
index ac44923..3b31794 100644
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
@@ -33,9 +33,9 @@ public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
     private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> requiredBeforeFinishUpdater
             = AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, "requiredBeforeFinish");
 
-    public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup)
+    public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup, long queryStartNanoTime)
     {
-        super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType);
+        super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType, queryStartNanoTime);
         this.wrapped = wrapped;
         this.requiredBeforeFinish = requiredBeforeFinish;
         this.cleanup = cleanup;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DataResolver.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DataResolver.java b/src/java/org/apache/cassandra/service/DataResolver.java
index b9ae933..be8eca1 100644
--- a/src/java/org/apache/cassandra/service/DataResolver.java
+++ b/src/java/org/apache/cassandra/service/DataResolver.java
@@ -43,10 +43,12 @@ import org.apache.cassandra.utils.FBUtilities;
 public class DataResolver extends ResponseResolver
 {
     private final List<AsyncOneResponse> repairResults = Collections.synchronizedList(new ArrayList<>());
+    private final long queryStartNanoTime;
 
-    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount)
+    public DataResolver(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistency, int maxResponseCount, long queryStartNanoTime)
     {
         super(keyspace, command, consistency, maxResponseCount);
+        this.queryStartNanoTime = queryStartNanoTime;
     }
 
     public PartitionIterator getData()
@@ -88,7 +90,7 @@ public class DataResolver extends ResponseResolver
         if (!command.limits().isUnlimited())
         {
             for (int i = 0; i < results.size(); i++)
-                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter)));
+                results.set(i, Transformation.apply(results.get(i), new ShortReadProtection(sources[i], resultCounter, queryStartNanoTime)));
         }
 
         return UnfilteredPartitionIterators.mergeAndFilter(results, command.nowInSec(), listener);
@@ -385,12 +387,14 @@ public class DataResolver extends ResponseResolver
         private final InetAddress source;
         private final DataLimits.Counter counter;
         private final DataLimits.Counter postReconciliationCounter;
+        private final long queryStartNanoTime;
 
-        private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter)
+        private ShortReadProtection(InetAddress source, DataLimits.Counter postReconciliationCounter, long queryStartNanoTime)
         {
             this.source = source;
             this.counter = command.limits().newCounter(command.nowInSec(), false).onlyCount();
             this.postReconciliationCounter = postReconciliationCounter;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         @Override
@@ -503,8 +507,8 @@ public class DataResolver extends ResponseResolver
 
             private UnfilteredRowIterator doShortReadRetry(SinglePartitionReadCommand retryCommand)
             {
-                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1);
-                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source));
+                DataResolver resolver = new DataResolver(keyspace, retryCommand, ConsistencyLevel.ONE, 1, queryStartNanoTime);
+                ReadCallback handler = new ReadCallback(resolver, ConsistencyLevel.ONE, retryCommand, Collections.singletonList(source), queryStartNanoTime);
                 if (StorageProxy.canDoLocalRequest(source))
                       StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(retryCommand, handler));
                 else

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
index b095c7f..9584611 100644
--- a/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterSyncWriteResponseHandler.java
@@ -46,10 +46,11 @@ public class DatacenterSyncWriteResponseHandler<T> extends AbstractWriteResponse
                                               ConsistencyLevel consistencyLevel,
                                               Keyspace keyspace,
                                               Runnable callback,
-                                              WriteType writeType)
+                                              WriteType writeType,
+                                              long queryStartNanoTime)
     {
         // Response is been managed by the map so make it 1 for the superclass.
-        super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+        super(keyspace, naturalEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime);
         assert consistencyLevel == ConsistencyLevel.EACH_QUORUM;
 
         NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();


[2/3] cassandra git commit: Count entire coordinated request against timeout

Posted by ty...@apache.org.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
index b1b7b10..2309e87 100644
--- a/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/DatacenterWriteResponseHandler.java
@@ -35,9 +35,10 @@ public class DatacenterWriteResponseHandler<T> extends WriteResponseHandler<T>
                                           ConsistencyLevel consistencyLevel,
                                           Keyspace keyspace,
                                           Runnable callback,
-                                          WriteType writeType)
+                                          WriteType writeType,
+                                          long queryStartNanoTime)
     {
-        super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType);
+        super(naturalEndpoints, pendingEndpoints, consistencyLevel, keyspace, callback, writeType, queryStartNanoTime);
         assert consistencyLevel.isDatacenterLocal();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/ReadCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ReadCallback.java b/src/java/org/apache/cassandra/service/ReadCallback.java
index 47eacdf..3f1ff3c 100644
--- a/src/java/org/apache/cassandra/service/ReadCallback.java
+++ b/src/java/org/apache/cassandra/service/ReadCallback.java
@@ -52,7 +52,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
     public final ResponseResolver resolver;
     private final SimpleCondition condition = new SimpleCondition();
-    private final long start;
+    private final long queryStartNanoTime;
     final int blockfor;
     final List<InetAddress> endpoints;
     private final ReadCommand command;
@@ -69,24 +69,25 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
     /**
      * Constructor when response count has to be calculated and blocked for.
      */
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, ReadCommand command, List<InetAddress> filteredEndpoints, long queryStartNanoTime)
     {
         this(resolver,
              consistencyLevel,
              consistencyLevel.blockFor(Keyspace.open(command.metadata().ksName)),
              command,
              Keyspace.open(command.metadata().ksName),
-             filteredEndpoints);
+             filteredEndpoints,
+             queryStartNanoTime);
     }
 
-    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints)
+    public ReadCallback(ResponseResolver resolver, ConsistencyLevel consistencyLevel, int blockfor, ReadCommand command, Keyspace keyspace, List<InetAddress> endpoints, long queryStartNanoTime)
     {
         this.command = command;
         this.keyspace = keyspace;
         this.blockfor = blockfor;
         this.consistencyLevel = consistencyLevel;
         this.resolver = resolver;
-        this.start = System.nanoTime();
+        this.queryStartNanoTime = queryStartNanoTime;
         this.endpoints = endpoints;
         // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897)
         assert !(command instanceof PartitionRangeReadCommand) || blockfor >= endpoints.size();
@@ -97,7 +98,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
     public boolean await(long timePastStart, TimeUnit unit)
     {
-        long time = unit.toNanos(timePastStart) - (System.nanoTime() - start);
+        long time = unit.toNanos(timePastStart) - (System.nanoTime() - queryStartNanoTime);
         try
         {
             return condition.await(time, TimeUnit.NANOSECONDS);
@@ -138,7 +139,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
 
         PartitionIterator result = blockfor == 1 ? resolver.getData() : resolver.resolve();
         if (logger.isTraceEnabled())
-            logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+            logger.trace("Read: {} ms.", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - queryStartNanoTime));
         return result;
     }
 
@@ -163,7 +164,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                 TraceState traceState = Tracing.instance.get();
                 if (traceState != null)
                     traceState.trace("Initiating read-repair");
-                StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState));
+                StageManager.getStage(Stage.READ_REPAIR).execute(new AsyncRepairRunner(traceState, queryStartNanoTime));
             }
         }
     }
@@ -210,10 +211,12 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
     private class AsyncRepairRunner implements Runnable
     {
         private final TraceState traceState;
+        private final long queryStartNanoTime;
 
-        public AsyncRepairRunner(TraceState traceState)
+        public AsyncRepairRunner(TraceState traceState, long queryStartNanoTime)
         {
             this.traceState = traceState;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         public void run()
@@ -236,7 +239,7 @@ public class ReadCallback implements IAsyncCallbackWithFailure<ReadResponse>
                 
                 ReadRepairMetrics.repairedBackground.mark();
                 
-                final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size());
+                final DataResolver repairResolver = new DataResolver(keyspace, command, consistencyLevel, endpoints.size(), queryStartNanoTime);
                 AsyncRepairCallback repairHandler = new AsyncRepairCallback(repairResolver, endpoints.size());
 
                 for (InetAddress endpoint : endpoints)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 9283c04..9bf90dc 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -223,10 +223,11 @@ public class StorageProxy implements StorageProxyMBean
                                   CASRequest request,
                                   ConsistencyLevel consistencyForPaxos,
                                   ConsistencyLevel consistencyForCommit,
-                                  ClientState state)
+                                  ClientState state,
+                                  long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, RequestFailureException, RequestTimeoutException, InvalidRequestException
     {
-        final long start = System.nanoTime();
+        final long startTimeForMetrics = System.nanoTime();
         int contentions = 0;
         try
         {
@@ -236,14 +237,14 @@ public class StorageProxy implements StorageProxyMBean
             CFMetaData metadata = Schema.instance.getCFMetaData(keyspaceName, cfName);
 
             long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getCasContentionTimeout());
-            while (System.nanoTime() - start < timeout)
+            while (System.nanoTime() - queryStartNanoTime < timeout)
             {
                 // for simplicity, we'll do a single liveness check at the start of each attempt
                 Pair<List<InetAddress>, Integer> p = getPaxosParticipants(metadata, key, consistencyForPaxos);
                 List<InetAddress> liveEndpoints = p.left;
                 int requiredParticipants = p.right;
 
-                final Pair<UUID, Integer> pair = beginAndRepairPaxos(start, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
+                final Pair<UUID, Integer> pair = beginAndRepairPaxos(queryStartNanoTime, key, metadata, liveEndpoints, requiredParticipants, consistencyForPaxos, consistencyForCommit, true, state);
                 final UUID ballot = pair.left;
                 contentions += pair.right;
 
@@ -253,7 +254,7 @@ public class StorageProxy implements StorageProxyMBean
                 ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM;
 
                 FilteredPartition current;
-                try (RowIterator rowIter = readOne(readCommand, readConsistency))
+                try (RowIterator rowIter = readOne(readCommand, readConsistency, queryStartNanoTime))
                 {
                     current = FilteredPartition.create(rowIter);
                 }
@@ -281,9 +282,9 @@ public class StorageProxy implements StorageProxyMBean
 
                 Commit proposal = Commit.newProposal(ballot, updates);
                 Tracing.trace("CAS precondition is met; proposing client-requested updates for {}", ballot);
-                if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos))
+                if (proposePaxos(proposal, liveEndpoints, requiredParticipants, true, consistencyForPaxos, queryStartNanoTime))
                 {
-                    commitPaxos(proposal, consistencyForCommit, true);
+                    commitPaxos(proposal, consistencyForCommit, true, queryStartNanoTime);
                     Tracing.trace("CAS successful");
                     return null;
                 }
@@ -318,7 +319,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             if(contentions > 0)
                 casWriteMetrics.contention.update(contentions);
-            final long latency = System.nanoTime() - start;
+            final long latency = System.nanoTime() - startTimeForMetrics;
             casWriteMetrics.addNano(latency);
             writeMetricsMap.get(consistencyForPaxos).addNano(latency);
         }
@@ -373,7 +374,7 @@ public class StorageProxy implements StorageProxyMBean
      * @return the Paxos ballot promised by the replicas if no in-progress requests were seen and a quorum of
      * nodes have seen the mostRecentCommit.  Otherwise, return null.
      */
-    private static Pair<UUID, Integer> beginAndRepairPaxos(long start,
+    private static Pair<UUID, Integer> beginAndRepairPaxos(long queryStartNanoTime,
                                                            DecoratedKey key,
                                                            CFMetaData metadata,
                                                            List<InetAddress> liveEndpoints,
@@ -388,7 +389,7 @@ public class StorageProxy implements StorageProxyMBean
 
         PrepareCallback summary = null;
         int contentions = 0;
-        while (System.nanoTime() - start < timeout)
+        while (System.nanoTime() - queryStartNanoTime < timeout)
         {
             // We want a timestamp that is guaranteed to be unique for that node (so that the ballot is globally unique), but if we've got a prepare rejected
             // already we also want to make sure we pick a timestamp that has a chance to be promised, i.e. one that is greater that the most recently known
@@ -403,7 +404,7 @@ public class StorageProxy implements StorageProxyMBean
             // prepare
             Tracing.trace("Preparing {}", ballot);
             Commit toPrepare = Commit.newPrepare(key, metadata, ballot);
-            summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos);
+            summary = preparePaxos(toPrepare, liveEndpoints, requiredParticipants, consistencyForPaxos, queryStartNanoTime);
             if (!summary.promised)
             {
                 Tracing.trace("Some replicas have already promised a higher ballot than ours; aborting");
@@ -426,11 +427,11 @@ public class StorageProxy implements StorageProxyMBean
                 else
                     casReadMetrics.unfinishedCommit.inc();
                 Commit refreshedInProgress = Commit.newProposal(ballot, inProgress.update);
-                if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos))
+                if (proposePaxos(refreshedInProgress, liveEndpoints, requiredParticipants, false, consistencyForPaxos, queryStartNanoTime))
                 {
                     try
                     {
-                        commitPaxos(refreshedInProgress, consistencyForCommit, false);
+                        commitPaxos(refreshedInProgress, consistencyForCommit, false, queryStartNanoTime);
                     }
                     catch (WriteTimeoutException e)
                     {
@@ -481,10 +482,10 @@ public class StorageProxy implements StorageProxyMBean
             MessagingService.instance().sendOneWay(message, target);
     }
 
-    private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos)
+    private static PrepareCallback preparePaxos(Commit toPrepare, List<InetAddress> endpoints, int requiredParticipants, ConsistencyLevel consistencyForPaxos, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos);
+        PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), requiredParticipants, consistencyForPaxos, queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PREPARE, toPrepare, Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
@@ -492,10 +493,10 @@ public class StorageProxy implements StorageProxyMBean
         return callback;
     }
 
-    private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel)
+    private static boolean proposePaxos(Commit proposal, List<InetAddress> endpoints, int requiredParticipants, boolean timeoutIfPartial, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws WriteTimeoutException
     {
-        ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel);
+        ProposeCallback callback = new ProposeCallback(endpoints.size(), requiredParticipants, !timeoutIfPartial, consistencyLevel, queryStartNanoTime);
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_PROPOSE, proposal, Commit.serializer);
         for (InetAddress target : endpoints)
             MessagingService.instance().sendRR(message, target, callback);
@@ -511,7 +512,7 @@ public class StorageProxy implements StorageProxyMBean
         return false;
     }
 
-    private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint) throws WriteTimeoutException
+    private static void commitPaxos(Commit proposal, ConsistencyLevel consistencyLevel, boolean shouldHint, long queryStartNanoTime) throws WriteTimeoutException
     {
         boolean shouldBlock = consistencyLevel != ConsistencyLevel.ANY;
         Keyspace keyspace = Keyspace.open(proposal.update.metadata().ksName);
@@ -524,7 +525,7 @@ public class StorageProxy implements StorageProxyMBean
         if (shouldBlock)
         {
             AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-            responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE);
+            responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistencyLevel, null, WriteType.SIMPLE, queryStartNanoTime);
         }
 
         MessageOut<Commit> message = new MessageOut<Commit>(MessagingService.Verb.PAXOS_COMMIT, proposal, Commit.serializer);
@@ -597,8 +598,9 @@ public class StorageProxy implements StorageProxyMBean
      *
      * @param mutations the mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
-    public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level)
+    public static void mutate(Collection<? extends IMutation> mutations, ConsistencyLevel consistency_level, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException, WriteFailureException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -613,12 +615,12 @@ public class StorageProxy implements StorageProxyMBean
             {
                 if (mutation instanceof CounterMutation)
                 {
-                    responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter));
+                    responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
                 }
                 else
                 {
                     WriteType wt = mutations.size() <= 1 ? WriteType.SIMPLE : WriteType.UNLOGGED_BATCH;
-                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt));
+                    responseHandlers.add(performWrite(mutation, consistency_level, localDataCenter, standardWritePerformer, null, wt, queryStartNanoTime));
                 }
             }
 
@@ -728,8 +730,9 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the mutations to be applied across the replicas
      * @param writeCommitLog if commitlog should be written
      * @param baseComplete time from epoch in ms that the local base mutation was(or will be) completed
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
-    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete)
+    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations, boolean writeCommitLog, AtomicLong baseComplete, long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for mutation");
@@ -791,7 +794,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                       Collections.singletonList(pairedEndpoint.get()),
                                                                       baseComplete,
                                                                       WriteType.BATCH,
-                                                                      cleanup));
+                                                                      cleanup,
+                                                                      queryStartNanoTime));
                         }
                     }
                     else
@@ -834,7 +838,8 @@ public class StorageProxy implements StorageProxyMBean
     @SuppressWarnings("unchecked")
     public static void mutateWithTriggers(Collection<? extends IMutation> mutations,
                                           ConsistencyLevel consistencyLevel,
-                                          boolean mutateAtomically)
+                                          boolean mutateAtomically,
+                                          long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException, UnavailableException, OverloadedException, InvalidRequestException
     {
         Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations);
@@ -844,13 +849,13 @@ public class StorageProxy implements StorageProxyMBean
                               .updatesAffectView(mutations, true);
 
         if (augmented != null)
-            mutateAtomically(augmented, consistencyLevel, updatesView);
+            mutateAtomically(augmented, consistencyLevel, updatesView, queryStartNanoTime);
         else
         {
             if (mutateAtomically || updatesView)
-                mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView);
+                mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView, queryStartNanoTime);
             else
-                mutate(mutations, consistencyLevel);
+                mutate(mutations, consistencyLevel, queryStartNanoTime);
         }
     }
 
@@ -863,10 +868,12 @@ public class StorageProxy implements StorageProxyMBean
      * @param mutations the Mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
      * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
     public static void mutateAtomically(Collection<Mutation> mutations,
                                         ConsistencyLevel consistency_level,
-                                        boolean requireQuorumForRemove)
+                                        boolean requireQuorumForRemove,
+                                        long queryStartNanoTime)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for atomic batch");
@@ -894,7 +901,7 @@ public class StorageProxy implements StorageProxyMBean
             final BatchlogEndpoints batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
             final UUID batchUUID = UUIDGen.getTimeUUID();
             BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID));
+                                                                                                          () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID, queryStartNanoTime));
 
             // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
@@ -903,14 +910,15 @@ public class StorageProxy implements StorageProxyMBean
                                                                                consistency_level,
                                                                                batchConsistencyLevel,
                                                                                WriteType.BATCH,
-                                                                               cleanup);
+                                                                               cleanup,
+                                                                               queryStartNanoTime);
                 // exit early if we can't fulfill the CL at this time.
                 wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
             }
 
             // write to the batchlog
-            syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID);
+            syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID, queryStartNanoTime);
 
             // now actually perform the writes and wait for them to complete
             syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION);
@@ -950,7 +958,7 @@ public class StorageProxy implements StorageProxyMBean
         return replica.equals(FBUtilities.getBroadcastAddress());
     }
 
-    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid)
+    private static void syncWriteToBatchlog(Collection<Mutation> mutations, BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
     throws WriteTimeoutException, WriteFailureException
     {
         WriteResponseHandler<?> handler = new WriteResponseHandler<>(endpoints.all,
@@ -958,7 +966,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                      endpoints.all.size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO,
                                                                      Keyspace.open(SystemKeyspace.NAME),
                                                                      null,
-                                                                     WriteType.BATCH_LOG);
+                                                                     WriteType.BATCH_LOG,
+                                                                     queryStartNanoTime);
 
         Batch batch = Batch.createLocal(uuid, FBUtilities.timestampMicros(), mutations);
 
@@ -987,13 +996,13 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid)
+    private static void asyncRemoveFromBatchlog(BatchlogEndpoints endpoints, UUID uuid, long queryStartNanoTime)
     {
         if (!endpoints.current.isEmpty())
             asyncRemoveFromBatchlog(endpoints.current, uuid);
 
         if (!endpoints.legacy.isEmpty())
-            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid);
+            LegacyBatchlogMigrator.asyncRemoveFromBatchlog(endpoints.legacy, uuid, queryStartNanoTime);
     }
 
     private static void asyncRemoveFromBatchlog(Collection<InetAddress> endpoints, UUID uuid)
@@ -1054,14 +1063,15 @@ public class StorageProxy implements StorageProxyMBean
      * given the list of write endpoints (either standardWritePerformer for
      * standard writes or counterWritePerformer for counter writes).
      * @param callback an optional callback to be run if and when the write is
-     * successful.
+     * @param queryStartNanoTime the value of System.nanoTime() when the query started to be processed
      */
     public static AbstractWriteResponseHandler<IMutation> performWrite(IMutation mutation,
-                                                            ConsistencyLevel consistency_level,
-                                                            String localDataCenter,
-                                                            WritePerformer performer,
-                                                            Runnable callback,
-                                                            WriteType writeType)
+                                                                       ConsistencyLevel consistency_level,
+                                                                       String localDataCenter,
+                                                                       WritePerformer performer,
+                                                                       Runnable callback,
+                                                                       WriteType writeType,
+                                                                       long queryStartNanoTime)
     throws UnavailableException, OverloadedException
     {
         String keyspaceName = mutation.getKeyspaceName();
@@ -1071,7 +1081,7 @@ public class StorageProxy implements StorageProxyMBean
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
-        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType);
+        AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime);
 
         // exit early if we can't fulfill the CL at this time
         responseHandler.assureSufficientLiveNodes();
@@ -1085,7 +1095,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                         ConsistencyLevel consistency_level,
                                                                         ConsistencyLevel batchConsistencyLevel,
                                                                         WriteType writeType,
-                                                                        BatchlogResponseHandler.BatchlogCleanup cleanup)
+                                                                        BatchlogResponseHandler.BatchlogCleanup cleanup,
+                                                                        long queryStartNanoTime)
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -1093,8 +1104,8 @@ public class StorageProxy implements StorageProxyMBean
         Token tk = mutation.key().getToken();
         List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
-        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType);
-        BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+        AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType, queryStartNanoTime);
+        BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
@@ -1108,7 +1119,8 @@ public class StorageProxy implements StorageProxyMBean
                                                                             List<InetAddress> naturalEndpoints,
                                                                             AtomicLong baseComplete,
                                                                             WriteType writeType,
-                                                                            BatchlogResponseHandler.BatchlogCleanup cleanup)
+                                                                            BatchlogResponseHandler.BatchlogCleanup cleanup,
+                                                                            long queryStartNanoTime)
     {
         Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
         AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
@@ -1118,8 +1130,8 @@ public class StorageProxy implements StorageProxyMBean
         AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, () -> {
             long delay = Math.max(0, System.currentTimeMillis() - baseComplete.get());
             viewWriteMetrics.viewWriteLatency.update(delay, TimeUnit.MILLISECONDS);
-        }, writeType);
-        BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup);
+        }, writeType, queryStartNanoTime);
+        BatchlogResponseHandler<IMutation> batchHandler = new ViewWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup, queryStartNanoTime);
         return new WriteResponseHandlerWrapper(batchHandler, mutation);
     }
 
@@ -1400,13 +1412,13 @@ public class StorageProxy implements StorageProxyMBean
      * quicker response and because the WriteResponseHandlers don't make it easy to send back an error. We also always gather
      * the write latencies at the coordinator node to make gathering point similar to the case of standard writes.
      */
-    public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter) throws UnavailableException, OverloadedException
+    public static AbstractWriteResponseHandler<IMutation> mutateCounter(CounterMutation cm, String localDataCenter, long queryStartNanoTime) throws UnavailableException, OverloadedException
     {
         InetAddress endpoint = findSuitableEndpoint(cm.getKeyspaceName(), cm.key(), localDataCenter, cm.consistency());
 
         if (endpoint.equals(FBUtilities.getBroadcastAddress()))
         {
-            return applyCounterMutationOnCoordinator(cm, localDataCenter);
+            return applyCounterMutationOnCoordinator(cm, localDataCenter, queryStartNanoTime);
         }
         else
         {
@@ -1417,10 +1429,10 @@ public class StorageProxy implements StorageProxyMBean
             List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
             Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName);
 
-            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER).assureSufficientLiveNodes();
+            rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, cm.consistency(), null, WriteType.COUNTER, queryStartNanoTime).assureSufficientLiveNodes();
 
             // Forward the actual update to the chosen leader replica
-            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER);
+            AbstractWriteResponseHandler<IMutation> responseHandler = new WriteResponseHandler<>(endpoint, WriteType.COUNTER, queryStartNanoTime);
 
             Tracing.trace("Enqueuing counter update to {}", endpoint);
             MessagingService.instance().sendRR(cm.makeMutationMessage(), endpoint, responseHandler, false);
@@ -1467,18 +1479,18 @@ public class StorageProxy implements StorageProxyMBean
 
     // Must be called on a replica of the mutation. This replica becomes the
     // leader of this mutation.
-    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback)
+    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnLeader(CounterMutation cm, String localDataCenter, Runnable callback, long queryStartNanoTime)
     throws UnavailableException, OverloadedException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWritePerformer, callback, WriteType.COUNTER, queryStartNanoTime);
     }
 
     // Same as applyCounterMutationOnLeader but must with the difference that it use the MUTATION stage to execute the write (while
     // applyCounterMutationOnLeader assumes it is on the MUTATION stage already)
-    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter)
+    public static AbstractWriteResponseHandler<IMutation> applyCounterMutationOnCoordinator(CounterMutation cm, String localDataCenter, long queryStartNanoTime)
     throws UnavailableException, OverloadedException
     {
-        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER);
+        return performWrite(cm, cm.consistency(), localDataCenter, counterWriteOnCoordinatorPerformer, null, WriteType.COUNTER, queryStartNanoTime);
     }
 
     private static Runnable counterWriteTask(final IMutation mutation,
@@ -1512,31 +1524,31 @@ public class StorageProxy implements StorageProxyMBean
         return true;
     }
 
-    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel)
+    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
-        return readOne(command, consistencyLevel, null);
+        return readOne(command, consistencyLevel, null, queryStartNanoTime);
     }
 
-    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state)
+    public static RowIterator readOne(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
-        return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state), command);
+        return PartitionIterators.getOnlyElement(read(SinglePartitionReadCommand.Group.one(command), consistencyLevel, state, queryStartNanoTime), command);
     }
 
-    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
+    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
         // When using serial CL, the ClientState should be provided
         assert !consistencyLevel.isSerialConsistency();
-        return read(group, consistencyLevel, null);
+        return read(group, consistencyLevel, null, queryStartNanoTime);
     }
 
     /**
      * Performs the actual reading of a row out of the StorageService, fetching
      * a specific set of column names from a given column family.
      */
-    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
+    public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
     throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException
     {
         if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands))
@@ -1547,11 +1559,11 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         return consistencyLevel.isSerialConsistency()
-             ? readWithPaxos(group, consistencyLevel, state)
-             : readRegular(group, consistencyLevel);
+             ? readWithPaxos(group, consistencyLevel, state, queryStartNanoTime)
+             : readRegular(group, consistencyLevel, queryStartNanoTime);
     }
 
-    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state)
+    private static PartitionIterator readWithPaxos(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime)
     throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException
     {
         assert state != null;
@@ -1591,7 +1603,7 @@ public class StorageProxy implements StorageProxyMBean
                 throw new ReadFailureException(consistencyLevel, e.received, e.failures, e.blockFor, false);
             }
 
-            result = fetchRows(group.commands, consistencyForCommitOrFetch);
+            result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime);
         }
         catch (UnavailableException e)
         {
@@ -1627,13 +1639,13 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     @SuppressWarnings("resource")
-    private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel)
+    private static PartitionIterator readRegular(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         long start = System.nanoTime();
         try
         {
-            PartitionIterator result = fetchRows(group.commands, consistencyLevel);
+            PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime);
             // If we have more than one command, then despite each read command honoring the limit, the total result
             // might not honor it and so we should enforce it
             if (group.commands.size() > 1)
@@ -1680,14 +1692,14 @@ public class StorageProxy implements StorageProxyMBean
      * 4. If the digests (if any) match the data return the data
      * 5. else carry out read repair by getting data from all the nodes.
      */
-    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel)
+    private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> commands, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     throws UnavailableException, ReadFailureException, ReadTimeoutException
     {
         int cmdCount = commands.size();
 
         SinglePartitionReadLifecycle[] reads = new SinglePartitionReadLifecycle[cmdCount];
         for (int i = 0; i < cmdCount; i++)
-            reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel);
+            reads[i] = new SinglePartitionReadLifecycle(commands.get(i), consistencyLevel, queryStartNanoTime);
 
         for (int i = 0; i < cmdCount; i++)
             reads[i].doInitialQueries();
@@ -1717,15 +1729,17 @@ public class StorageProxy implements StorageProxyMBean
         private final SinglePartitionReadCommand command;
         private final AbstractReadExecutor executor;
         private final ConsistencyLevel consistency;
+        private final long queryStartNanoTime;
 
         private PartitionIterator result;
         private ReadCallback repairHandler;
 
-        SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency)
+        SinglePartitionReadLifecycle(SinglePartitionReadCommand command, ConsistencyLevel consistency, long queryStartNanoTime)
         {
             this.command = command;
-            this.executor = AbstractReadExecutor.getReadExecutor(command, consistency);
+            this.executor = AbstractReadExecutor.getReadExecutor(command, consistency, queryStartNanoTime);
             this.consistency = consistency;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         boolean isDone()
@@ -1757,13 +1771,14 @@ public class StorageProxy implements StorageProxyMBean
 
                 // Do a full data read to resolve the correct response (and repair node that need be)
                 Keyspace keyspace = Keyspace.open(command.metadata().ksName);
-                DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size());
+                DataResolver resolver = new DataResolver(keyspace, command, ConsistencyLevel.ALL, executor.handler.endpoints.size(), queryStartNanoTime);
                 repairHandler = new ReadCallback(resolver,
                                                  ConsistencyLevel.ALL,
                                                  executor.getContactedReplicas().size(),
                                                  command,
                                                  keyspace,
-                                                 executor.handler.endpoints);
+                                                 executor.handler.endpoints,
+                                                 queryStartNanoTime);
 
                 for (InetAddress endpoint : executor.getContactedReplicas())
                 {
@@ -2052,6 +2067,7 @@ public class StorageProxy implements StorageProxyMBean
         private final ConsistencyLevel consistency;
 
         private final long startTime;
+        private final long queryStartNanoTime;
         private DataLimits.Counter counter;
         private PartitionIterator sentQueryIterator;
 
@@ -2061,7 +2077,7 @@ public class StorageProxy implements StorageProxyMBean
         private int liveReturned;
         private int rangesQueried;
 
-        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency)
+        public RangeCommandIterator(RangeIterator ranges, PartitionRangeReadCommand command, int concurrencyFactor, Keyspace keyspace, ConsistencyLevel consistency, long queryStartNanoTime)
         {
             this.command = command;
             this.concurrencyFactor = concurrencyFactor;
@@ -2070,6 +2086,7 @@ public class StorageProxy implements StorageProxyMBean
             this.totalRangeCount = ranges.rangeCount();
             this.consistency = consistency;
             this.keyspace = keyspace;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         public RowIterator computeNext()
@@ -2145,12 +2162,12 @@ public class StorageProxy implements StorageProxyMBean
         {
             PartitionRangeReadCommand rangeCommand = command.forSubRange(toQuery.range, isFirst);
 
-            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size());
+            DataResolver resolver = new DataResolver(keyspace, rangeCommand, consistency, toQuery.filteredEndpoints.size(), queryStartNanoTime);
 
             int blockFor = consistency.blockFor(keyspace);
             int minResponses = Math.min(toQuery.filteredEndpoints.size(), blockFor);
             List<InetAddress> minimalEndpoints = toQuery.filteredEndpoints.subList(0, minResponses);
-            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints);
+            ReadCallback handler = new ReadCallback(resolver, consistency, rangeCommand, minimalEndpoints, queryStartNanoTime);
 
             handler.assureSufficientLiveNodes();
 
@@ -2204,7 +2221,7 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     @SuppressWarnings("resource")
-    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel)
+    public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime)
     {
         Tracing.trace("Computing ranges to query");
 
@@ -2225,7 +2242,7 @@ public class StorageProxy implements StorageProxyMBean
 
         // Note that in general, a RangeCommandIterator will honor the command limit for each range, but will not enforce it globally.
 
-        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel)), command.nowInSec());
+        return command.limits().filter(command.postReconciliationProcessing(new RangeCommandIterator(ranges, command, concurrencyFactor, keyspace, consistencyLevel, queryStartNanoTime)), command.nowInSec());
     }
 
     public Map<String, List<String>> getSchemaVersions()
@@ -2485,9 +2502,9 @@ public class StorageProxy implements StorageProxyMBean
      */
     private static class ViewWriteMetricsWrapped extends BatchlogResponseHandler<IMutation>
     {
-        public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup)
+        public ViewWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup, long queryStartNanoTime)
         {
-            super(writeHandler, i, cleanup);
+            super(writeHandler, i, cleanup, queryStartNanoTime);
             viewWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/WriteResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
index 1dc03e0..46e4e93 100644
--- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java
@@ -47,20 +47,21 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T>
                                 ConsistencyLevel consistencyLevel,
                                 Keyspace keyspace,
                                 Runnable callback,
-                                WriteType writeType)
+                                WriteType writeType,
+                                long queryStartNanoTime)
     {
-        super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType);
+        super(keyspace, writeEndpoints, pendingEndpoints, consistencyLevel, callback, writeType, queryStartNanoTime);
         responses = totalBlockFor();
     }
 
-    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback)
+    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, Runnable callback, long queryStartNanoTime)
     {
-        this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType);
+        this(Arrays.asList(endpoint), Collections.<InetAddress>emptyList(), ConsistencyLevel.ONE, null, callback, writeType, queryStartNanoTime);
     }
 
-    public WriteResponseHandler(InetAddress endpoint, WriteType writeType)
+    public WriteResponseHandler(InetAddress endpoint, WriteType writeType, long queryStartNanoTime)
     {
-        this(endpoint, writeType, null);
+        this(endpoint, writeType, null, queryStartNanoTime);
     }
 
     public void response(MessageIn<T> m)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
index 01a56c4..d9b3632 100644
--- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java
@@ -55,7 +55,7 @@ abstract class AbstractQueryPager implements QueryPager
         return command.executionController();
     }
 
-    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState)
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime)
     {
         if (isExhausted())
             return EmptyIterators.partition();
@@ -63,7 +63,7 @@ abstract class AbstractQueryPager implements QueryPager
         pageSize = Math.min(pageSize, remaining);
         Pager pager = new Pager(limits.forPaging(pageSize), command.nowInSec());
 
-        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState), pager);
+        return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager);
     }
 
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
index 5483d15..f9a8cda 100644
--- a/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/AggregationQueryPager.java
@@ -52,12 +52,13 @@ public final class AggregationQueryPager implements QueryPager
     @Override
     public PartitionIterator fetchPage(int pageSize,
                                        ConsistencyLevel consistency,
-                                       ClientState clientState)
+                                       ClientState clientState,
+                                       long queryStartNanoTime)
     {
         if (limits.isGroupByLimit())
-            return new GroupByPartitionIterator(pageSize, consistency, clientState);
+            return new GroupByPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime);
 
-        return new AggregationPartitionIterator(pageSize, consistency, clientState);
+        return new AggregationPartitionIterator(pageSize, consistency, clientState, queryStartNanoTime);
     }
 
     @Override
@@ -70,9 +71,9 @@ public final class AggregationQueryPager implements QueryPager
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController)
     {
         if (limits.isGroupByLimit())
-            return new GroupByPartitionIterator(pageSize, executionController);
+            return new GroupByPartitionIterator(pageSize, executionController, System.nanoTime());
 
-        return new AggregationPartitionIterator(pageSize, executionController);
+        return new AggregationPartitionIterator(pageSize, executionController, System.nanoTime());
     }
 
     @Override
@@ -152,28 +153,34 @@ public final class AggregationQueryPager implements QueryPager
          */
         private int initialMaxRemaining;
 
+        private long queryStartNanoTime;
+
         public GroupByPartitionIterator(int pageSize,
                                          ConsistencyLevel consistency,
-                                         ClientState clientState)
+                                         ClientState clientState,
+                                        long queryStartNanoTime)
         {
-            this(pageSize, consistency, clientState, null);
+            this(pageSize, consistency, clientState, null, queryStartNanoTime);
         }
 
         public GroupByPartitionIterator(int pageSize,
-                                        ReadExecutionController executionController)
+                                        ReadExecutionController executionController,
+                                        long queryStartNanoTime)
        {
-           this(pageSize, null, null, executionController);
+           this(pageSize, null, null, executionController, queryStartNanoTime);
        }
 
         private GroupByPartitionIterator(int pageSize,
                                          ConsistencyLevel consistency,
                                          ClientState clientState,
-                                         ReadExecutionController executionController)
+                                         ReadExecutionController executionController,
+                                         long queryStartNanoTime)
         {
             this.pageSize = handlePagingOff(pageSize);
             this.consistency = consistency;
             this.clientState = clientState;
             this.executionController = executionController;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         private int handlePagingOff(int pageSize)
@@ -280,7 +287,7 @@ public final class AggregationQueryPager implements QueryPager
          */
         private final PartitionIterator fetchSubPage(int subPageSize)
         {
-            return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState)
+            return consistency != null ? subPager.fetchPage(subPageSize, consistency, clientState, queryStartNanoTime)
                                        : subPager.fetchPageInternal(subPageSize, executionController);
         }
 
@@ -393,15 +400,17 @@ public final class AggregationQueryPager implements QueryPager
     {
         public AggregationPartitionIterator(int pageSize,
                                             ConsistencyLevel consistency,
-                                            ClientState clientState)
+                                            ClientState clientState,
+                                            long queryStartNanoTime)
         {
-            super(pageSize, consistency, clientState);
+            super(pageSize, consistency, clientState, queryStartNanoTime);
         }
 
         public AggregationPartitionIterator(int pageSize,
-                                            ReadExecutionController executionController)
+                                            ReadExecutionController executionController,
+                                            long queryStartNanoTime)
         {
-            super(pageSize, executionController);
+            super(pageSize, executionController, queryStartNanoTime);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
index 9670f28..75cc71f 100644
--- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
+++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java
@@ -142,17 +142,17 @@ public class MultiPartitionPager implements QueryPager
     }
 
     @SuppressWarnings("resource") // iter closed via countingIter
-    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
         int toQuery = Math.min(remaining, pageSize);
-        return new PagersIterator(toQuery, consistency, clientState, null);
+        return new PagersIterator(toQuery, consistency, clientState, null, queryStartNanoTime);
     }
 
     @SuppressWarnings("resource") // iter closed via countingIter
     public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) throws RequestValidationException, RequestExecutionException
     {
         int toQuery = Math.min(remaining, pageSize);
-        return new PagersIterator(toQuery, null, null, executionController);
+        return new PagersIterator(toQuery, null, null, executionController, System.nanoTime());
     }
 
     private class PagersIterator extends AbstractIterator<RowIterator> implements PartitionIterator
@@ -160,6 +160,7 @@ public class MultiPartitionPager implements QueryPager
         private final int pageSize;
         private PartitionIterator result;
         private boolean closed;
+        private final long queryStartNanoTime;
 
         // For "normal" queries
         private final ConsistencyLevel consistency;
@@ -171,12 +172,13 @@ public class MultiPartitionPager implements QueryPager
         private int pagerMaxRemaining;
         private int counted;
 
-        public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController)
+        public PagersIterator(int pageSize, ConsistencyLevel consistency, ClientState clientState, ReadExecutionController executionController, long queryStartNanoTime)
         {
             this.pageSize = pageSize;
             this.consistency = consistency;
             this.clientState = clientState;
             this.executionController = executionController;
+            this.queryStartNanoTime = queryStartNanoTime;
         }
 
         protected RowIterator computeNext()
@@ -205,7 +207,7 @@ public class MultiPartitionPager implements QueryPager
                 int toQuery = pageSize - counted;
                 result = consistency == null
                        ? pagers[current].fetchPageInternal(toQuery, executionController)
-                       : pagers[current].fetchPage(toQuery, consistency, clientState);
+                       : pagers[current].fetchPage(toQuery, consistency, clientState, queryStartNanoTime);
             }
             return result.next();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPager.java b/src/java/org/apache/cassandra/service/pager/QueryPager.java
index edd2a55..5d23997 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPager.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPager.java
@@ -54,7 +54,7 @@ public interface QueryPager
             return ReadExecutionController.empty();
         }
 
-        public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException
+        public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
         {
             return EmptyIterators.partition();
         }
@@ -94,7 +94,7 @@ public interface QueryPager
      * {@code consistency} is a serial consistency.
      * @return the page of result.
      */
-    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState) throws RequestValidationException, RequestExecutionException;
+    public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException;
 
     /**
      * Starts a new read operation.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/pager/QueryPagers.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/pager/QueryPagers.java b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
index 02b5de2..7fb4e70 100644
--- a/src/java/org/apache/cassandra/service/pager/QueryPagers.java
+++ b/src/java/org/apache/cassandra/service/pager/QueryPagers.java
@@ -45,7 +45,8 @@ public class QueryPagers
                                  ClientState state,
                                  final int pageSize,
                                  int nowInSec,
-                                 boolean isForThrift) throws RequestValidationException, RequestExecutionException
+                                 boolean isForThrift,
+                                 long queryStartNanoTime) throws RequestValidationException, RequestExecutionException
     {
         SinglePartitionReadCommand command = SinglePartitionReadCommand.create(isForThrift, metadata, nowInSec, columnFilter, RowFilter.NONE, limits, key, filter);
         final SinglePartitionPager pager = new SinglePartitionPager(command, null, Server.CURRENT_VERSION);
@@ -53,7 +54,7 @@ public class QueryPagers
         int count = 0;
         while (!pager.isExhausted())
         {
-            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state))
+            try (PartitionIterator iter = pager.fetchPage(pageSize, consistencyLevel, state, queryStartNanoTime))
             {
                 DataLimits.Counter counter = limits.newCounter(nowInSec, true);
                 PartitionIterators.consume(counter.applyTo(iter));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
index 37defde..90bfc5d 100644
--- a/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/AbstractPaxosCallback.java
@@ -35,12 +35,14 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
     protected final CountDownLatch latch;
     protected final int targets;
     private final ConsistencyLevel consistency;
+    private final long queryStartNanoTime;
 
-    public AbstractPaxosCallback(int targets, ConsistencyLevel consistency)
+    public AbstractPaxosCallback(int targets, ConsistencyLevel consistency, long queryStartNanoTime)
     {
         this.targets = targets;
         this.consistency = consistency;
         latch = new CountDownLatch(targets);
+        this.queryStartNanoTime = queryStartNanoTime;
     }
 
     public boolean isLatencyForSnitch()
@@ -57,7 +59,8 @@ public abstract class AbstractPaxosCallback<T> implements IAsyncCallback<T>
     {
         try
         {
-            if (!latch.await(DatabaseDescriptor.getWriteRpcTimeout(), TimeUnit.MILLISECONDS))
+            long timeout = TimeUnit.MILLISECONDS.toNanos(DatabaseDescriptor.getWriteRpcTimeout()) - (System.nanoTime() - queryStartNanoTime);
+            if (!latch.await(timeout, TimeUnit.NANOSECONDS))
                 throw new WriteTimeoutException(WriteType.CAS, consistency, getResponseCount(), targets);
         }
         catch (InterruptedException ex)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 544403a..5915eab 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -49,9 +49,9 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 
     private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
 
-    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency)
+    public PrepareCallback(DecoratedKey key, CFMetaData metadata, int targets, ConsistencyLevel consistency, long queryStartNanoTime)
     {
-        super(targets, consistency);
+        super(targets, consistency, queryStartNanoTime);
         // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
         mostRecentCommit = Commit.emptyCommit(key, metadata);
         mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/aa83c942/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
index b0bd163..c9cb1f0 100644
--- a/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/ProposeCallback.java
@@ -50,9 +50,9 @@ public class ProposeCallback extends AbstractPaxosCallback<Boolean>
     private final int requiredAccepts;
     private final boolean failFast;
 
-    public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency)
+    public ProposeCallback(int totalTargets, int requiredTargets, boolean failFast, ConsistencyLevel consistency, long queryStartNanoTime)
     {
-        super(totalTargets, consistency);
+        super(totalTargets, consistency, queryStartNanoTime);
         this.requiredAccepts = requiredTargets;
         this.failFast = failFast;
     }