You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/07/08 21:16:14 UTC

[cassandra] branch trunk updated: Add a virtual table that exposes currently running queries

This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 89f3978dcd Add a virtual table that exposes currently running queries
89f3978dcd is described below

commit 89f3978dcde958fbad191b8cf628fd89ace64d7a
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Mon Jun 27 18:40:40 2022 -0500

    Add a virtual table that exposes currently running queries
    
    patch by Chris Lohfink; reviewed by Caleb Rackliffe and Benedict Elliott Smith for CASSANDRA-15241
    
    Co-authored-by: Chris Lohfink <cl...@apple.com>
    Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
    Co-authored-by: Benedict Elliott Smith <be...@apache.org>
---
 CHANGES.txt                                        |   1 +
 .../cassandra/concurrent/DebuggableTask.java       |  85 +++++++++++++++
 .../cassandra/concurrent/ExecutionFailure.java     |  61 ++++++++++-
 .../apache/cassandra/concurrent/FutureTask.java    |  21 +++-
 .../org/apache/cassandra/concurrent/SEPWorker.java |  52 +++++++++-
 .../cassandra/concurrent/SharedExecutorPool.java   |  26 ++++-
 .../apache/cassandra/concurrent/TaskFactory.java   |   4 +
 .../apache/cassandra/db/virtual/QueriesTable.java  |  94 +++++++++++++++++
 .../cassandra/db/virtual/SystemViewsKeyspace.java  |   1 +
 .../org/apache/cassandra/service/StorageProxy.java | 115 ++++++++++++++++-----
 .../org/apache/cassandra/transport/Dispatcher.java |  68 +++++++++---
 .../transport/InitialConnectionHandler.java        |   5 +-
 .../org/apache/cassandra/transport/Message.java    |   3 +-
 .../cassandra/transport/messages/QueryMessage.java |   3 +-
 .../distributed/test/QueriesTableTest.java         |  89 ++++++++++++++++
 15 files changed, 581 insertions(+), 47 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 4623b48c3a..7693eb4891 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Add a virtual table that exposes currently running queries (CASSANDRA-15241)
  * Allow sstableloader to specify table without relying on path (CASSANDRA-16584)
  * Fix TestGossipingPropertyFileSnitch.test_prefer_local_reconnect_on_listen_address (CASSANDRA-17700)
  * Add ByteComparable API (CASSANDRA-6936)
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableTask.java b/src/java/org/apache/cassandra/concurrent/DebuggableTask.java
new file mode 100644
index 0000000000..ac04eb4c34
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+/**
+ * Interface to include on a Runnable or Callable submitted to the {@link SharedExecutorPool} to provide more
+ * detailed diagnostics.
+ */
+@Shared(scope = SIMULATION, inner = INTERFACES)
+public interface DebuggableTask
+{
+    public long creationTimeNanos();
+
+    public long startTimeNanos();
+
+    public String description();
+    
+    interface RunnableDebuggableTask extends Runnable, DebuggableTask {}
+
+    /**
+     * Wraps a {@link DebuggableTask} to include the name of the thread running it.
+     */
+    public static class RunningDebuggableTask implements DebuggableTask
+    {
+        private final DebuggableTask task;
+        private final String threadId;
+
+        public RunningDebuggableTask(String threadId, DebuggableTask task)
+        {
+            this.task = task;
+            this.threadId = threadId;
+        }
+
+        public String threadId()
+        {
+            return threadId;
+        }
+
+        public boolean hasTask()
+        {
+            return task != null;
+        }
+
+        @Override
+        public long creationTimeNanos()
+        {
+            assert hasTask();
+            return task.creationTimeNanos();
+        }
+
+        @Override
+        public long startTimeNanos()
+        {
+            assert hasTask();
+            return task.startTimeNanos();
+        }
+
+        @Override
+        public String description()
+        {
+            assert hasTask();
+            return task.description();
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java b/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java
index 7fa7dcbd54..27ab885e23 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.concurrent;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
 
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,6 +106,14 @@ public class ExecutionFailure
         return enforceOptions(withResources, wrap, false);
     }
 
+    /**
+     * @see #suppressing(WithResources, Runnable)
+     */
+    static RunnableDebuggableTask suppressingDebuggable(WithResources withResources, RunnableDebuggableTask debuggable)
+    {
+        return enforceOptionsDebuggable(withResources, debuggable, false);
+    }
+
     /**
      * Encapsulate the execution, propagating or suppressing any exceptions as requested.
      *
@@ -119,7 +128,7 @@ public class ExecutionFailure
             @Override
             public void run()
             {
-                try (Closeable close = withResources.get())
+                try (@SuppressWarnings("unused") Closeable close = withResources.get())
                 {
                     wrap.run();
                 }
@@ -139,6 +148,54 @@ public class ExecutionFailure
         };
     }
 
+    /**
+     * @see #enforceOptions(WithResources, Runnable, boolean)
+     */
+    private static RunnableDebuggableTask enforceOptionsDebuggable(WithResources withResources, RunnableDebuggableTask debuggable, boolean propagate)
+    {
+        return new RunnableDebuggableTask()
+        {
+            @Override
+            public void run()
+            {
+                try (@SuppressWarnings("unused") Closeable close = withResources.get())
+                {
+                    debuggable.run();
+                }
+                catch (Throwable t)
+                {
+                    handle(t);
+                    if (propagate)
+                        throw t;
+                }
+            }
+
+            @Override
+            public String toString()
+            {
+                return debuggable.toString();
+            }
+
+            @Override
+            public long creationTimeNanos()
+            {
+                return debuggable.creationTimeNanos();
+            }
+
+            @Override
+            public long startTimeNanos()
+            {
+                return debuggable.startTimeNanos();
+            }
+
+            @Override
+            public String description()
+            {
+                return debuggable.description();
+            }
+        };
+    }
+
     /**
      * See {@link #enforceOptions(WithResources, Callable)}
      */
@@ -158,7 +215,7 @@ public class ExecutionFailure
             @Override
             public V call() throws Exception
             {
-                try (Closeable close = withResources.get())
+                try (@SuppressWarnings("unused") Closeable close = withResources.get())
                 {
                     return wrap.call();
                 }
diff --git a/src/java/org/apache/cassandra/concurrent/FutureTask.java b/src/java/org/apache/cassandra/concurrent/FutureTask.java
index 2348ff6bf8..763884a2da 100644
--- a/src/java/org/apache/cassandra/concurrent/FutureTask.java
+++ b/src/java/org/apache/cassandra/concurrent/FutureTask.java
@@ -20,9 +20,10 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.Callable;
 
-import org.apache.cassandra.utils.concurrent.RunnableFuture;
+import javax.annotation.Nullable;
 
 import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.RunnableFuture;
 
 /**
  * A FutureTask that utilises Cassandra's {@link AsyncFuture}, making it compatible with {@link ExecutorPlus}.
@@ -31,15 +32,28 @@ import org.apache.cassandra.utils.concurrent.AsyncFuture;
 public class FutureTask<V> extends AsyncFuture<V> implements RunnableFuture<V>
 {
     private Callable<? extends V> call;
+    private volatile DebuggableTask debuggable;
 
     public FutureTask(Callable<? extends V> call)
     {
-        this.call = call;
+        this(call, call instanceof DebuggableTask ? (DebuggableTask) call : null);
     }
 
     public FutureTask(Runnable run)
     {
-        this.call = callable(run);
+        this(callable(run), run instanceof DebuggableTask ? (DebuggableTask) run : null);
+    }
+
+    private FutureTask(Callable<? extends V> call, DebuggableTask debuggable)
+    {
+        this.call = call;
+        this.debuggable = debuggable;
+    }
+
+    @Nullable
+    DebuggableTask debuggableTask()
+    {
+        return debuggable;
     }
 
     V call() throws Exception
@@ -63,6 +77,7 @@ public class FutureTask<V> extends AsyncFuture<V> implements RunnableFuture<V>
         finally
         {
             call = null;
+            debuggable = null;
         }
     }
 
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index c7b9abf719..fe16c950df 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -48,6 +48,8 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
     long prevStopCheck = 0;
     long soleSpinnerSpinTime = 0;
 
+    private final AtomicReference<Runnable> currentTask = new AtomicReference<>();
+
     SEPWorker(ThreadGroup threadGroup, Long workerId, Work initialState, SharedExecutorPool pool)
     {
         this.pool = pool;
@@ -58,9 +60,27 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
         thread.start();
     }
 
+    /**
+     * @return the current {@link DebuggableTask}, if one exists
+     */
+    public DebuggableTask currentDebuggableTask()
+    {
+        // can change after null check so go off local reference
+        Runnable task = currentTask.get();
+
+        // Local read and mutation Runnables are themselves debuggable
+        if (task instanceof DebuggableTask)
+            return (DebuggableTask) task;
+
+        if (task instanceof FutureTask)
+            return ((FutureTask<?>) task).debuggableTask();
+            
+        return null;
+    }
+
     public void run()
     {
-        /**
+        /*
          * we maintain two important invariants:
          * 1)   after exiting spinning phase, we ensure at least one more task on _each_ queue will be processed
          *      promptly after we begin, assuming any are outstanding on any pools. this is to permit producers to
@@ -101,8 +121,10 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
                 if (assigned == null)
                     continue;
                 if (SET_THREAD_NAME)
-                    Thread.currentThread().setName(assigned.name + "-" + workerId);
+                    Thread.currentThread().setName(assigned.name + '-' + workerId);
+
                 task = assigned.tasks.poll();
+                currentTask.lazySet(task);
 
                 // if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
                 // (which is also a state that will never be interrupted externally)
@@ -128,9 +150,12 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
                         break;
 
                     task = assigned.tasks.poll();
+                    currentTask.lazySet(task);
                 }
 
                 // return our work permit, and maybe signal shutdown
+                currentTask.lazySet(null);
+
                 if (status != RETURNED_WORK_PERMIT)
                     assigned.returnWorkPermit();
 
@@ -173,6 +198,11 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
                 logger.error("Unexpected exception killed worker", t);
             }
         }
+        finally
+        {
+            currentTask.lazySet(null);
+            pool.workerEnded(this);
+        }
     }
 
     // try to assign this worker the provided work
@@ -420,4 +450,22 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
             return assigned != null;
         }
     }
+
+    @Override
+    public String toString()
+    {
+        return thread.getName();
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return workerId.intValue();
+    }
+
+    @Override
+    public boolean equals(Object obj)
+    {
+        return obj == this;
+    }
 }
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index f74854f9cb..0631ec61da 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -17,8 +17,11 @@
  */
 package org.apache.cassandra.concurrent;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
@@ -26,6 +29,9 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.concurrent.DebuggableTask.RunningDebuggableTask;
 
 import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
 import static org.apache.cassandra.concurrent.SEPWorker.Work;
@@ -77,6 +83,8 @@ public class SharedExecutorPool
     final ConcurrentSkipListMap<Long, SEPWorker> spinning = new ConcurrentSkipListMap<>();
     // the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
     final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();
+    // All SEPWorkers that are currently running
+    private final Set<SEPWorker> allWorkers = Collections.newSetFromMap(new ConcurrentHashMap<>());
 
     volatile boolean shuttingDown = false;
 
@@ -102,7 +110,23 @@ public class SharedExecutorPool
                 return;
 
         if (!work.isStop())
-            new SEPWorker(threadGroup, workerId.incrementAndGet(), work, this);
+        {
+            SEPWorker worker = new SEPWorker(threadGroup, workerId.incrementAndGet(), work, this);
+            allWorkers.add(worker);
+        }
+    }
+
+    void workerEnded(SEPWorker worker)
+    {
+        allWorkers.remove(worker);
+    }
+
+    public List<RunningDebuggableTask> runningTasks()
+    {
+        return allWorkers.stream()
+                         .map(worker -> new RunningDebuggableTask(worker.toString(), worker.currentDebuggableTask()))
+                         .filter(RunningDebuggableTask::hasTask)
+                         .collect(Collectors.toList());
     }
 
     void maybeStartSpinningWorker()
diff --git a/src/java/org/apache/cassandra/concurrent/TaskFactory.java b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
index 56087d950b..faeabe6c4c 100644
--- a/src/java/org/apache/cassandra/concurrent/TaskFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
 
 import java.util.concurrent.Callable;
 
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
 import org.apache.cassandra.utils.Shared;
 import org.apache.cassandra.utils.WithResources;
 import org.apache.cassandra.utils.concurrent.RunnableFuture;
@@ -127,6 +128,9 @@ public interface TaskFactory
         @Override
         public Runnable toExecute(Runnable runnable)
         {
+            if (runnable instanceof RunnableDebuggableTask)
+                return ExecutionFailure.suppressingDebuggable(ExecutorLocals.propagate(), (RunnableDebuggableTask) runnable);
+
             // no reason to propagate exception when it is inaccessible to caller
             return ExecutionFailure.suppressing(ExecutorLocals.propagate(), runnable);
         }
diff --git a/src/java/org/apache/cassandra/db/virtual/QueriesTable.java b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java
new file mode 100644
index 0000000000..aeba61c004
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.concurrent.DebuggableTask;
+import org.apache.cassandra.concurrent.SharedExecutorPool;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static java.lang.Long.max;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
+
+/**
+ * Virtual table that lists currently running queries on the NTR (coordinator) and Read/Mutation (local) stages
+ *
+ * Example:
+ * <pre>
+ * cqlsh> SELECT * FROM system_views.queries;
+ *
+ *  thread_id                   | queued_micros |  running_micros | task
+ * ------------------------------+---------------+-----------------+--------------------------------------------------------------------------------
+ *  Native-Transport-Requests-7 |         72923 |            7611 |                      QUERY select * from system_views.queries; [pageSize = 100]
+ *              MutationStage-2 |         18249 |            2084 | Mutation(keyspace='distributed_test_keyspace', key='000000f8', modifications...
+ *                  ReadStage-2 |         72447 |           10121 |                                         SELECT * FROM keyspace.table LIMIT 5000
+ * </pre>
+ */    
+final class QueriesTable extends AbstractVirtualTable
+{
+    private static final String TABLE_NAME = "queries";
+    private static final String ID = "thread_id";
+    private static final String QUEUED = "queued_micros";
+    private static final String RUNNING = "running_micros";
+    private static final String DESC = "task";
+
+    QueriesTable(String keyspace)
+    {
+        super(TableMetadata.builder(keyspace, TABLE_NAME)
+                           .comment("Lists currently running queries")
+                           .kind(TableMetadata.Kind.VIRTUAL)
+                           .partitioner(new LocalPartitioner(UTF8Type.instance))
+                           // The thread name is unique since the id given to each SEPWorker is unique
+                           .addPartitionKeyColumn(ID, UTF8Type.instance)
+                           .addRegularColumn(QUEUED, LongType.instance)
+                           .addRegularColumn(RUNNING, LongType.instance)
+                           .addRegularColumn(DESC, UTF8Type.instance)
+                           .build());
+    }
+
+    /**
+     * Walks the {@link SharedExecutorPool} workers for any {@link DebuggableTask} instances and populates the table.
+     */
+    @Override
+    public DataSet data()
+    {
+        SimpleDataSet result = new SimpleDataSet(metadata());
+        
+        for (DebuggableTask.RunningDebuggableTask task : SharedExecutorPool.SHARED.runningTasks())
+        {
+            if (!task.hasTask()) continue;
+            
+            long creationTimeNanos = task.creationTimeNanos();
+            long startTimeNanos = task.startTimeNanos();
+            long now = approxTime.now();
+
+            long queuedMicros = NANOSECONDS.toMicros(max((startTimeNanos > 0 ? startTimeNanos : now) - creationTimeNanos, 0));
+            long runningMicros = startTimeNanos > 0 ? NANOSECONDS.toMicros(now - startTimeNanos) : 0;
+            
+            result.row(task.threadId())
+                  .column(QUEUED, queuedMicros)
+                  .column(RUNNING, runningMicros)
+                  .column(DESC, task.description());
+        }
+        
+        return result;
+    }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index f13e61ce73..59a0aba809 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -47,6 +47,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
                     .add(new BatchMetricsTable(VIRTUAL_VIEWS))
                     .add(new StreamingVirtualTable(VIRTUAL_VIEWS))
                     .add(new GossipInfoTable(VIRTUAL_VIEWS))
+                    .add(new QueriesTable(VIRTUAL_VIEWS))
                     .addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
                     .build());
     }
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 31d54771fe..557382df54 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -39,25 +39,25 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.Uninterruptibles;
 
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.service.paxos.*;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.ContentionStrategy;
 import org.apache.cassandra.service.paxos.Paxos;
-import org.apache.cassandra.utils.TimeUUID;
-import org.apache.cassandra.utils.concurrent.CountDownLatch;
-
+import org.apache.cassandra.service.paxos.PaxosState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.batchlog.Batch;
 import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
 import org.apache.cassandra.concurrent.Stage;
 import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.Config;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.ConsistencyLevel;
@@ -140,13 +140,17 @@ import org.apache.cassandra.utils.MBeanWrapper;
 import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.NoSpamLogger;
 import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
 import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
 
-import static com.google.common.collect.Iterables.concat;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import static com.google.common.collect.Iterables.concat;
+import static org.apache.commons.lang3.StringUtils.join;
+
 import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
-import static org.apache.cassandra.net.Message.out;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics;
@@ -154,8 +158,15 @@ import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetri
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.viewWriteMetrics;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics;
 import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetricsForLevel;
+import static org.apache.cassandra.net.Message.out;
 import static org.apache.cassandra.net.NoPayload.noPayload;
-import static org.apache.cassandra.net.Verb.*;
+import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
+import static org.apache.cassandra.net.Verb.MUTATION_REQ;
+import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ;
+import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
+import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
+import static org.apache.cassandra.net.Verb.SCHEMA_VERSION_REQ;
+import static org.apache.cassandra.net.Verb.TRUNCATE_REQ;
 import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup;
 import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL;
 import static org.apache.cassandra.service.paxos.Ballot.Flag.LOCAL;
@@ -166,7 +177,6 @@ import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
 import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
 import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
-import static org.apache.commons.lang3.StringUtils.join;
 
 public class StorageProxy implements StorageProxyMBean
 {
@@ -829,6 +839,12 @@ public class StorageProxy implements StorageProxyMBean
                 }
             }
 
+            @Override
+            public String description()
+            {
+                return "Paxos " + message.payload.toString();
+            }
+
             @Override
             protected Verb verb()
             {
@@ -1264,7 +1280,7 @@ public class StorageProxy implements StorageProxyMBean
             logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
 
             if (replica.isSelf())
-                performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler);
+                performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler, "Batchlog store");
             else
                 MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
         }
@@ -1280,7 +1296,7 @@ public class StorageProxy implements StorageProxyMBean
                 logger.trace("Sending batchlog remove request {} to {}", uuid, target);
 
             if (target.isSelf())
-                performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid));
+                performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid), "Batchlog remove");
             else
                 MessagingService.instance().send(message, target.endpoint());
         }
@@ -1524,7 +1540,7 @@ public class StorageProxy implements StorageProxyMBean
         if (insertLocal)
         {
             Preconditions.checkNotNull(localReplica);
-            performLocally(stage, localReplica, mutation::apply, responseHandler);
+            performLocally(stage, localReplica, mutation::apply, responseHandler, mutation);
         }
 
         if (localDc != null)
@@ -1591,7 +1607,7 @@ public class StorageProxy implements StorageProxyMBean
         logger.trace("Sending message to {}@{}", message.id(), target);
     }
 
-    private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable)
+    private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, String description)
     {
         stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica)
         {
@@ -1607,6 +1623,12 @@ public class StorageProxy implements StorageProxyMBean
                 }
             }
 
+            @Override
+            public String description()
+            {
+                return description;
+            }
+
             @Override
             protected Verb verb()
             {
@@ -1615,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean
         });
     }
 
-    private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler)
+    private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler, Object description)
     {
         stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica)
         {
@@ -1634,6 +1656,14 @@ public class StorageProxy implements StorageProxyMBean
                 }
             }
 
+            @Override
+            public String description()
+            {
+                // description is an Object and toString() called so we do not have to evaluate the Mutation.toString()
+                // unless expliclitly checked
+                return description.toString();
+            }
+
             @Override
             protected Verb verb()
             {
@@ -2088,7 +2118,7 @@ public class StorageProxy implements StorageProxyMBean
         return concatAndBlockOnRepair(results, repairs);
     }
 
-    public static class LocalReadRunnable extends DroppableRunnable
+    public static class LocalReadRunnable extends DroppableRunnable implements RunnableDebuggableTask
     {
         private final ReadCommand command;
         private final ReadCallback handler;
@@ -2158,6 +2188,24 @@ public class StorageProxy implements StorageProxyMBean
                 }
             }
         }
+
+        @Override
+        public long creationTimeNanos()
+        {
+            return approxCreationTimeNanos;
+        }
+
+        @Override
+        public long startTimeNanos()
+        {
+            return approxStartTimeNanos;
+        }
+
+        @Override
+        public String description()
+        {
+            return command.toCQLString();
+        }
     }
 
     public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command,
@@ -2468,7 +2516,9 @@ public class StorageProxy implements StorageProxyMBean
      */
     private static abstract class DroppableRunnable implements Runnable
     {
-        final long approxCreationTimeNanos;
+        protected final long approxCreationTimeNanos;
+        protected volatile long approxStartTimeNanos;
+        
         final Verb verb;
 
         public DroppableRunnable(Verb verb)
@@ -2479,11 +2529,11 @@ public class StorageProxy implements StorageProxyMBean
 
         public final void run()
         {
-            long approxCurrentTimeNanos = MonotonicClock.Global.approxTime.now();
+            approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
             long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos);
-            if (approxCurrentTimeNanos > expirationTimeNanos)
+            if (approxStartTimeNanos > expirationTimeNanos)
             {
-                long timeTakenNanos = approxCurrentTimeNanos - approxCreationTimeNanos;
+                long timeTakenNanos = approxStartTimeNanos - approxCreationTimeNanos;
                 MessagingService.instance().metrics.recordSelfDroppedMessage(verb, timeTakenNanos, NANOSECONDS);
                 return;
             }
@@ -2504,9 +2554,10 @@ public class StorageProxy implements StorageProxyMBean
      * Like DroppableRunnable, but if it aborts, it will rerun (on the mutation stage) after
      * marking itself as a hint in progress so that the hint backpressure mechanism can function.
      */
-    private static abstract class LocalMutationRunnable implements Runnable
+    private static abstract class LocalMutationRunnable implements RunnableDebuggableTask
     {
         private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
+        private volatile long approxStartTimeNanos;
 
         private final Replica localReplica;
 
@@ -2518,11 +2569,12 @@ public class StorageProxy implements StorageProxyMBean
         public final void run()
         {
             final Verb verb = verb();
-            long nowNanos = MonotonicClock.Global.approxTime.now();
+            approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
             long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos);
-            if (nowNanos > expirationTimeNanos)
+            
+            if (approxStartTimeNanos > expirationTimeNanos)
             {
-                long timeTakenNanos = nowNanos - approxCreationTimeNanos;
+                long timeTakenNanos = approxStartTimeNanos - approxCreationTimeNanos;
                 MessagingService.instance().metrics.recordSelfDroppedMessage(Verb.MUTATION_REQ, timeTakenNanos, NANOSECONDS);
 
                 HintRunnable runnable = new HintRunnable(EndpointsForToken.of(localReplica.range().right, localReplica))
@@ -2546,6 +2598,21 @@ public class StorageProxy implements StorageProxyMBean
             }
         }
 
+        @Override
+        public long creationTimeNanos()
+        {
+            return approxCreationTimeNanos;
+        }
+
+        @Override
+        public long startTimeNanos()
+        {
+            return approxStartTimeNanos;
+        }
+
+        @Override
+        abstract public String description();
+
         abstract protected Verb verb();
         abstract protected void runMayThrow() throws Exception;
     }
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java
index da79c3d2c4..8f8a607c77 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -24,15 +24,16 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import com.google.common.base.Predicate;
-import org.apache.cassandra.metrics.ClientMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import io.netty.channel.Channel;
 import io.netty.channel.EventLoop;
 import io.netty.util.AttributeKey;
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
 import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.ClientMetrics;
 import org.apache.cassandra.net.FrameEncoder;
 import org.apache.cassandra.service.ClientWarn;
 import org.apache.cassandra.service.QueryState;
@@ -42,10 +43,10 @@ import org.apache.cassandra.transport.Flusher.FlushItem;
 import org.apache.cassandra.transport.messages.ErrorMessage;
 import org.apache.cassandra.transport.messages.EventMessage;
 import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MonotonicClock;
 import org.apache.cassandra.utils.NoSpamLogger;
 
 import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
-import static org.apache.cassandra.utils.Clock.Global.nanoTime;
 
 public class Dispatcher
 {
@@ -79,17 +80,60 @@ public class Dispatcher
 
     public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
     {
-        requestExecutor.submit(() -> processRequest(channel, request, forFlusher, backpressure));
+        requestExecutor.submit(new RequestProcessor(channel, request, forFlusher, backpressure));
         ClientMetrics.instance.markRequestDispatched();
     }
 
+    public class RequestProcessor implements RunnableDebuggableTask
+    {
+        private final Channel channel;
+        private final Message.Request request;
+        private final FlushItemConverter forFlusher;
+        private final Overload backpressure;
+        
+        private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
+        private volatile long approxStartTimeNanos;
+        
+        public RequestProcessor(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
+        {
+            this.channel = channel;
+            this.request = request;
+            this.forFlusher = forFlusher;
+            this.backpressure = backpressure;
+        }
+
+        @Override
+        public void run()
+        {
+            approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
+            processRequest(channel, request, forFlusher, backpressure, approxStartTimeNanos);
+        }
+
+        @Override
+        public long creationTimeNanos()
+        {
+            return approxCreationTimeNanos;
+        }
+
+        @Override
+        public long startTimeNanos()
+        {
+            return approxStartTimeNanos;
+        }
+
+        @Override
+        public String description()
+        {
+            return request.toString();
+        }
+    }
+
     /**
      * Note: this method may be executed on the netty event loop, during initial protocol negotiation; the caller is
      * responsible for cleaning up any global or thread-local state. (ex. tracing, client warnings, etc.).
      */
-    private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure)
+    private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure, long startTimeNanos)
     {
-        long queryStartNanoTime = nanoTime();
         if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
             ClientWarn.instance.captureWarnings();
 
@@ -119,7 +163,7 @@ public class Dispatcher
 
         Message.logger.trace("Received: {}, v={}", request, connection.getVersion());
         connection.requests.inc();
-        Message.Response response = request.execute(qstate, queryStartNanoTime);
+        Message.Response response = request.execute(qstate, startTimeNanos);
 
         if (request.isTrackable())
             CoordinatorWarnings.done();
@@ -130,15 +174,15 @@ public class Dispatcher
         connection.applyStateTransition(request.type, response.type);
         return response;
     }
-
+    
     /**
      * Note: this method may be executed on the netty event loop.
      */
-    static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure)
+    static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure, long approxStartTimeNanos)
     {
         try
         {
-            return processRequest((ServerConnection) request.connection(), request, backpressure);
+            return processRequest((ServerConnection) request.connection(), request, backpressure, approxStartTimeNanos);
         }
         catch (Throwable t)
         {
@@ -163,9 +207,9 @@ public class Dispatcher
     /**
      * Note: this method is not expected to execute on the netty event loop.
      */
-    void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
+    void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure, long approxStartTimeNanos)
     {
-        Message.Response response = processRequest(channel, request, backpressure);
+        Message.Response response = processRequest(channel, request, backpressure, approxStartTimeNanos);
         FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response);
         Message.logger.trace("Responding: {}, v={}", response, request.connection().getVersion());
         flush(toFlush);
@@ -201,7 +245,7 @@ public class Dispatcher
      * for delivering events to registered clients is dependent on protocol version and the configuration
      * of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope,
      * wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to
-     * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter, Overload)}.
+     * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter, Overload, long)}.
      * It's worth noting that events are not generally fired as a direct response to a client request,
      * so this flush item has a null request attribute. The dispatcher itself is created when the
      * pipeline is first configured during protocol negotiation and is attached to the channel for
diff --git a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index 75cb72e8b5..e4cff99acb 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.cassandra.transport.ClientResourceLimits.Overload;
+import org.apache.cassandra.utils.MonotonicClock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,7 +149,9 @@ public class InitialConnectionHandler extends ByteToMessageDecoder
                         promise = new VoidChannelPromise(ctx.channel(), false);
                     }
 
-                    final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE);
+                    long approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
+                    final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE, approxStartTimeNanos);
+
                     outbound = response.encode(inbound.header.version);
                     ctx.writeAndFlush(outbound, promise);
                     logger.trace("Configured pipeline: {}", ctx.pipeline());
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 75c997e38c..2c91a76c3b 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -193,7 +193,8 @@ public abstract class Message
         this.customPayload = customPayload;
     }
 
-    public String debugString()
+    @Override
+    public String toString()
     {
         return String.format("(%s:%s:%s)", type, streamId, connection == null ? "null" :  connection.getVersion().asInt());
     }
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 9a296e442b..c295216d2c 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -148,6 +148,7 @@ public class QueryMessage extends Message.Request
     @Override
     public String toString()
     {
-        return String.format("QUERY %s [pageSize = %d]", query, options.getPageSize());
+        return String.format("QUERY %s [pageSize = %d] at consistency %s", 
+                             query, options.getPageSize(), options.getConsistency());
     }
 }
diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
new file mode 100644
index 0000000000..09e56e0b61
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class QueriesTableTest extends TestBaseImpl
+{
+    public static final int ITERATIONS = 256;
+
+    @Test
+    public void shouldExposeReadsAndWrites() throws Throwable
+    {
+        try (Cluster cluster = init(Cluster.build(1).start()))
+        {
+            ExecutorService executor = Executors.newFixedThreadPool(16);
+            
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int primary key, v int)");
+
+            AtomicInteger reads = new AtomicInteger(0);
+            AtomicInteger writes = new AtomicInteger(0);
+            AtomicInteger paxos = new AtomicInteger(0);
+            
+            for (int i = 0; i < ITERATIONS; i++)
+            {
+                int k = i;
+                executor.execute(() -> cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (" + k + ", 0)", ConsistencyLevel.ALL));
+                executor.execute(() -> cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 10 WHERE k = " + (k - 1) + " IF v = 0", ConsistencyLevel.ALL));
+                executor.execute(() -> cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = " + (k - 1), ConsistencyLevel.ALL));
+
+                executor.execute(() ->
+                {
+                    SimpleQueryResult result = cluster.get(1).executeInternalWithResult("SELECT * FROM system_views.queries");
+                    
+                    while (result.hasNext())
+                    {
+                        Row row = result.next();
+                        String threadId = row.get("thread_id").toString();
+                        String task = row.get("task").toString();
+
+                        if (threadId.contains("Read") && task.contains("SELECT"))
+                            reads.incrementAndGet();
+                        else if (threadId.contains("Mutation") && task.contains("Mutation"))
+                            writes.incrementAndGet();
+                        else if (threadId.contains("Mutation") && task.contains("Paxos"))
+                            paxos.incrementAndGet();
+                    }
+                });
+            }
+
+            executor.shutdown();
+            assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
+            
+            // We should see at least one read, write, and conditional update in the "queries" table.
+            assertThat(reads.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
+            assertThat(writes.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
+            assertThat(paxos.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org