You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2022/07/08 21:16:14 UTC
[cassandra] branch trunk updated: Add a virtual table that exposes currently running queries
This is an automated email from the ASF dual-hosted git repository.
maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 89f3978dcd Add a virtual table that exposes currently running queries
89f3978dcd is described below
commit 89f3978dcde958fbad191b8cf628fd89ace64d7a
Author: Caleb Rackliffe <ca...@gmail.com>
AuthorDate: Mon Jun 27 18:40:40 2022 -0500
Add a virtual table that exposes currently running queries
patch by Chris Lohfink; reviewed by Caleb Rackliffe and Benedict Elliott Smith for CASSANDRA-15241
Co-authored-by: Chris Lohfink <cl...@apple.com>
Co-authored-by: Caleb Rackliffe <ca...@gmail.com>
Co-authored-by: Benedict Elliott Smith <be...@apache.org>
---
CHANGES.txt | 1 +
.../cassandra/concurrent/DebuggableTask.java | 85 +++++++++++++++
.../cassandra/concurrent/ExecutionFailure.java | 61 ++++++++++-
.../apache/cassandra/concurrent/FutureTask.java | 21 +++-
.../org/apache/cassandra/concurrent/SEPWorker.java | 52 +++++++++-
.../cassandra/concurrent/SharedExecutorPool.java | 26 ++++-
.../apache/cassandra/concurrent/TaskFactory.java | 4 +
.../apache/cassandra/db/virtual/QueriesTable.java | 94 +++++++++++++++++
.../cassandra/db/virtual/SystemViewsKeyspace.java | 1 +
.../org/apache/cassandra/service/StorageProxy.java | 115 ++++++++++++++++-----
.../org/apache/cassandra/transport/Dispatcher.java | 68 +++++++++---
.../transport/InitialConnectionHandler.java | 5 +-
.../org/apache/cassandra/transport/Message.java | 3 +-
.../cassandra/transport/messages/QueryMessage.java | 3 +-
.../distributed/test/QueriesTableTest.java | 89 ++++++++++++++++
15 files changed, 581 insertions(+), 47 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 4623b48c3a..7693eb4891 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Add a virtual table that exposes currently running queries (CASSANDRA-15241)
* Allow sstableloader to specify table without relying on path (CASSANDRA-16584)
* Fix TestGossipingPropertyFileSnitch.test_prefer_local_reconnect_on_listen_address (CASSANDRA-17700)
* Add ByteComparable API (CASSANDRA-6936)
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableTask.java b/src/java/org/apache/cassandra/concurrent/DebuggableTask.java
new file mode 100644
index 0000000000..ac04eb4c34
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableTask.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.concurrent;
+
+import org.apache.cassandra.utils.Shared;
+
+import static org.apache.cassandra.utils.Shared.Recursive.INTERFACES;
+import static org.apache.cassandra.utils.Shared.Scope.SIMULATION;
+
+/**
+ * Interface to include on a Runnable or Callable submitted to the {@link SharedExecutorPool} to provide more
+ * detailed diagnostics.
+ */
+@Shared(scope = SIMULATION, inner = INTERFACES)
+public interface DebuggableTask
+{
+ public long creationTimeNanos();
+
+ public long startTimeNanos();
+
+ public String description();
+
+ interface RunnableDebuggableTask extends Runnable, DebuggableTask {}
+
+ /**
+ * Wraps a {@link DebuggableTask} to include the name of the thread running it.
+ */
+ public static class RunningDebuggableTask implements DebuggableTask
+ {
+ private final DebuggableTask task;
+ private final String threadId;
+
+ public RunningDebuggableTask(String threadId, DebuggableTask task)
+ {
+ this.task = task;
+ this.threadId = threadId;
+ }
+
+ public String threadId()
+ {
+ return threadId;
+ }
+
+ public boolean hasTask()
+ {
+ return task != null;
+ }
+
+ @Override
+ public long creationTimeNanos()
+ {
+ assert hasTask();
+ return task.creationTimeNanos();
+ }
+
+ @Override
+ public long startTimeNanos()
+ {
+ assert hasTask();
+ return task.startTimeNanos();
+ }
+
+ @Override
+ public String description()
+ {
+ assert hasTask();
+ return task.description();
+ }
+ }
+}
diff --git a/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java b/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java
index 7fa7dcbd54..27ab885e23 100644
--- a/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java
+++ b/src/java/org/apache/cassandra/concurrent/ExecutionFailure.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -105,6 +106,14 @@ public class ExecutionFailure
return enforceOptions(withResources, wrap, false);
}
+ /**
+ * @see #suppressing(WithResources, Runnable)
+ */
+ static RunnableDebuggableTask suppressingDebuggable(WithResources withResources, RunnableDebuggableTask debuggable)
+ {
+ return enforceOptionsDebuggable(withResources, debuggable, false);
+ }
+
/**
* Encapsulate the execution, propagating or suppressing any exceptions as requested.
*
@@ -119,7 +128,7 @@ public class ExecutionFailure
@Override
public void run()
{
- try (Closeable close = withResources.get())
+ try (@SuppressWarnings("unused") Closeable close = withResources.get())
{
wrap.run();
}
@@ -139,6 +148,54 @@ public class ExecutionFailure
};
}
+ /**
+ * @see #enforceOptions(WithResources, Runnable, boolean)
+ */
+ private static RunnableDebuggableTask enforceOptionsDebuggable(WithResources withResources, RunnableDebuggableTask debuggable, boolean propagate)
+ {
+ return new RunnableDebuggableTask()
+ {
+ @Override
+ public void run()
+ {
+ try (@SuppressWarnings("unused") Closeable close = withResources.get())
+ {
+ debuggable.run();
+ }
+ catch (Throwable t)
+ {
+ handle(t);
+ if (propagate)
+ throw t;
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return debuggable.toString();
+ }
+
+ @Override
+ public long creationTimeNanos()
+ {
+ return debuggable.creationTimeNanos();
+ }
+
+ @Override
+ public long startTimeNanos()
+ {
+ return debuggable.startTimeNanos();
+ }
+
+ @Override
+ public String description()
+ {
+ return debuggable.description();
+ }
+ };
+ }
+
/**
* See {@link #enforceOptions(WithResources, Callable)}
*/
@@ -158,7 +215,7 @@ public class ExecutionFailure
@Override
public V call() throws Exception
{
- try (Closeable close = withResources.get())
+ try (@SuppressWarnings("unused") Closeable close = withResources.get())
{
return wrap.call();
}
diff --git a/src/java/org/apache/cassandra/concurrent/FutureTask.java b/src/java/org/apache/cassandra/concurrent/FutureTask.java
index 2348ff6bf8..763884a2da 100644
--- a/src/java/org/apache/cassandra/concurrent/FutureTask.java
+++ b/src/java/org/apache/cassandra/concurrent/FutureTask.java
@@ -20,9 +20,10 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.Callable;
-import org.apache.cassandra.utils.concurrent.RunnableFuture;
+import javax.annotation.Nullable;
import org.apache.cassandra.utils.concurrent.AsyncFuture;
+import org.apache.cassandra.utils.concurrent.RunnableFuture;
/**
* A FutureTask that utilises Cassandra's {@link AsyncFuture}, making it compatible with {@link ExecutorPlus}.
@@ -31,15 +32,28 @@ import org.apache.cassandra.utils.concurrent.AsyncFuture;
public class FutureTask<V> extends AsyncFuture<V> implements RunnableFuture<V>
{
private Callable<? extends V> call;
+ private volatile DebuggableTask debuggable;
public FutureTask(Callable<? extends V> call)
{
- this.call = call;
+ this(call, call instanceof DebuggableTask ? (DebuggableTask) call : null);
}
public FutureTask(Runnable run)
{
- this.call = callable(run);
+ this(callable(run), run instanceof DebuggableTask ? (DebuggableTask) run : null);
+ }
+
+ private FutureTask(Callable<? extends V> call, DebuggableTask debuggable)
+ {
+ this.call = call;
+ this.debuggable = debuggable;
+ }
+
+ @Nullable
+ DebuggableTask debuggableTask()
+ {
+ return debuggable;
}
V call() throws Exception
@@ -63,6 +77,7 @@ public class FutureTask<V> extends AsyncFuture<V> implements RunnableFuture<V>
finally
{
call = null;
+ debuggable = null;
}
}
diff --git a/src/java/org/apache/cassandra/concurrent/SEPWorker.java b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
index c7b9abf719..fe16c950df 100644
--- a/src/java/org/apache/cassandra/concurrent/SEPWorker.java
+++ b/src/java/org/apache/cassandra/concurrent/SEPWorker.java
@@ -48,6 +48,8 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
long prevStopCheck = 0;
long soleSpinnerSpinTime = 0;
+ private final AtomicReference<Runnable> currentTask = new AtomicReference<>();
+
SEPWorker(ThreadGroup threadGroup, Long workerId, Work initialState, SharedExecutorPool pool)
{
this.pool = pool;
@@ -58,9 +60,27 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
thread.start();
}
+ /**
+ * @return the current {@link DebuggableTask}, if one exists
+ */
+ public DebuggableTask currentDebuggableTask()
+ {
+ // can change after null check so go off local reference
+ Runnable task = currentTask.get();
+
+ // Local read and mutation Runnables are themselves debuggable
+ if (task instanceof DebuggableTask)
+ return (DebuggableTask) task;
+
+ if (task instanceof FutureTask)
+ return ((FutureTask<?>) task).debuggableTask();
+
+ return null;
+ }
+
public void run()
{
- /**
+ /*
* we maintain two important invariants:
* 1) after exiting spinning phase, we ensure at least one more task on _each_ queue will be processed
* promptly after we begin, assuming any are outstanding on any pools. this is to permit producers to
@@ -101,8 +121,10 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
if (assigned == null)
continue;
if (SET_THREAD_NAME)
- Thread.currentThread().setName(assigned.name + "-" + workerId);
+ Thread.currentThread().setName(assigned.name + '-' + workerId);
+
task = assigned.tasks.poll();
+ currentTask.lazySet(task);
// if we do have tasks assigned, nobody will change our state so we can simply set it to WORKING
// (which is also a state that will never be interrupted externally)
@@ -128,9 +150,12 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
break;
task = assigned.tasks.poll();
+ currentTask.lazySet(task);
}
// return our work permit, and maybe signal shutdown
+ currentTask.lazySet(null);
+
if (status != RETURNED_WORK_PERMIT)
assigned.returnWorkPermit();
@@ -173,6 +198,11 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
logger.error("Unexpected exception killed worker", t);
}
}
+ finally
+ {
+ currentTask.lazySet(null);
+ pool.workerEnded(this);
+ }
}
// try to assign this worker the provided work
@@ -420,4 +450,22 @@ final class SEPWorker extends AtomicReference<SEPWorker.Work> implements Runnabl
return assigned != null;
}
}
+
+ @Override
+ public String toString()
+ {
+ return thread.getName();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return workerId.intValue();
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ return obj == this;
+ }
}
diff --git a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
index f74854f9cb..0631ec61da 100644
--- a/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
+++ b/src/java/org/apache/cassandra/concurrent/SharedExecutorPool.java
@@ -17,8 +17,11 @@
*/
package org.apache.cassandra.concurrent;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
@@ -26,6 +29,9 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.concurrent.DebuggableTask.RunningDebuggableTask;
import static org.apache.cassandra.concurrent.ExecutorFactory.Global.executorFactory;
import static org.apache.cassandra.concurrent.SEPWorker.Work;
@@ -77,6 +83,8 @@ public class SharedExecutorPool
final ConcurrentSkipListMap<Long, SEPWorker> spinning = new ConcurrentSkipListMap<>();
// the collection of threads that have been asked to stop/deschedule - new workers are scheduled from here last
final ConcurrentSkipListMap<Long, SEPWorker> descheduled = new ConcurrentSkipListMap<>();
+ // All SEPWorkers that are currently running
+ private final Set<SEPWorker> allWorkers = Collections.newSetFromMap(new ConcurrentHashMap<>());
volatile boolean shuttingDown = false;
@@ -102,7 +110,23 @@ public class SharedExecutorPool
return;
if (!work.isStop())
- new SEPWorker(threadGroup, workerId.incrementAndGet(), work, this);
+ {
+ SEPWorker worker = new SEPWorker(threadGroup, workerId.incrementAndGet(), work, this);
+ allWorkers.add(worker);
+ }
+ }
+
+ void workerEnded(SEPWorker worker)
+ {
+ allWorkers.remove(worker);
+ }
+
+ public List<RunningDebuggableTask> runningTasks()
+ {
+ return allWorkers.stream()
+ .map(worker -> new RunningDebuggableTask(worker.toString(), worker.currentDebuggableTask()))
+ .filter(RunningDebuggableTask::hasTask)
+ .collect(Collectors.toList());
}
void maybeStartSpinningWorker()
diff --git a/src/java/org/apache/cassandra/concurrent/TaskFactory.java b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
index 56087d950b..faeabe6c4c 100644
--- a/src/java/org/apache/cassandra/concurrent/TaskFactory.java
+++ b/src/java/org/apache/cassandra/concurrent/TaskFactory.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.concurrent;
import java.util.concurrent.Callable;
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
import org.apache.cassandra.utils.Shared;
import org.apache.cassandra.utils.WithResources;
import org.apache.cassandra.utils.concurrent.RunnableFuture;
@@ -127,6 +128,9 @@ public interface TaskFactory
@Override
public Runnable toExecute(Runnable runnable)
{
+ if (runnable instanceof RunnableDebuggableTask)
+ return ExecutionFailure.suppressingDebuggable(ExecutorLocals.propagate(), (RunnableDebuggableTask) runnable);
+
// no reason to propagate exception when it is inaccessible to caller
return ExecutionFailure.suppressing(ExecutorLocals.propagate(), runnable);
}
diff --git a/src/java/org/apache/cassandra/db/virtual/QueriesTable.java b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java
new file mode 100644
index 0000000000..aeba61c004
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/virtual/QueriesTable.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.virtual;
+
+import org.apache.cassandra.concurrent.DebuggableTask;
+import org.apache.cassandra.concurrent.SharedExecutorPool;
+import org.apache.cassandra.db.marshal.LongType;
+import org.apache.cassandra.db.marshal.UTF8Type;
+import org.apache.cassandra.dht.LocalPartitioner;
+import org.apache.cassandra.schema.TableMetadata;
+
+import static java.lang.Long.max;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import static org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
+
+/**
+ * Virtual table that lists currently running queries on the NTR (coordinator) and Read/Mutation (local) stages
+ *
+ * Example:
+ * <pre>
+ * cqlsh> SELECT * FROM system_views.queries;
+ *
+ * thread_id | queued_micros | running_micros | task
+ * ------------------------------+---------------+-----------------+--------------------------------------------------------------------------------
+ * Native-Transport-Requests-7 | 72923 | 7611 | QUERY select * from system_views.queries; [pageSize = 100]
+ * MutationStage-2 | 18249 | 2084 | Mutation(keyspace='distributed_test_keyspace', key='000000f8', modifications...
+ * ReadStage-2 | 72447 | 10121 | SELECT * FROM keyspace.table LIMIT 5000
+ * </pre>
+ */
+final class QueriesTable extends AbstractVirtualTable
+{
+ private static final String TABLE_NAME = "queries";
+ private static final String ID = "thread_id";
+ private static final String QUEUED = "queued_micros";
+ private static final String RUNNING = "running_micros";
+ private static final String DESC = "task";
+
+ QueriesTable(String keyspace)
+ {
+ super(TableMetadata.builder(keyspace, TABLE_NAME)
+ .comment("Lists currently running queries")
+ .kind(TableMetadata.Kind.VIRTUAL)
+ .partitioner(new LocalPartitioner(UTF8Type.instance))
+ // The thread name is unique since the id given to each SEPWorker is unique
+ .addPartitionKeyColumn(ID, UTF8Type.instance)
+ .addRegularColumn(QUEUED, LongType.instance)
+ .addRegularColumn(RUNNING, LongType.instance)
+ .addRegularColumn(DESC, UTF8Type.instance)
+ .build());
+ }
+
+ /**
+ * Walks the {@link SharedExecutorPool} workers for any {@link DebuggableTask} instances and populates the table.
+ */
+ @Override
+ public DataSet data()
+ {
+ SimpleDataSet result = new SimpleDataSet(metadata());
+
+ for (DebuggableTask.RunningDebuggableTask task : SharedExecutorPool.SHARED.runningTasks())
+ {
+ if (!task.hasTask()) continue;
+
+ long creationTimeNanos = task.creationTimeNanos();
+ long startTimeNanos = task.startTimeNanos();
+ long now = approxTime.now();
+
+ long queuedMicros = NANOSECONDS.toMicros(max((startTimeNanos > 0 ? startTimeNanos : now) - creationTimeNanos, 0));
+ long runningMicros = startTimeNanos > 0 ? NANOSECONDS.toMicros(now - startTimeNanos) : 0;
+
+ result.row(task.threadId())
+ .column(QUEUED, queuedMicros)
+ .column(RUNNING, runningMicros)
+ .column(DESC, task.description());
+ }
+
+ return result;
+ }
+}
diff --git a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
index f13e61ce73..59a0aba809 100644
--- a/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
+++ b/src/java/org/apache/cassandra/db/virtual/SystemViewsKeyspace.java
@@ -47,6 +47,7 @@ public final class SystemViewsKeyspace extends VirtualKeyspace
.add(new BatchMetricsTable(VIRTUAL_VIEWS))
.add(new StreamingVirtualTable(VIRTUAL_VIEWS))
.add(new GossipInfoTable(VIRTUAL_VIEWS))
+ .add(new QueriesTable(VIRTUAL_VIEWS))
.addAll(LocalRepairTables.getAll(VIRTUAL_VIEWS))
.build());
}
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 31d54771fe..557382df54 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -39,25 +39,25 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.cassandra.config.Config;
-import org.apache.cassandra.service.paxos.*;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.ContentionStrategy;
import org.apache.cassandra.service.paxos.Paxos;
-import org.apache.cassandra.utils.TimeUUID;
-import org.apache.cassandra.utils.concurrent.CountDownLatch;
-
+import org.apache.cassandra.service.paxos.PaxosState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.batchlog.Batch;
import org.apache.cassandra.batchlog.BatchlogManager;
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.config.CassandraRelevantProperties;
+import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ConsistencyLevel;
@@ -140,13 +140,17 @@ import org.apache.cassandra.utils.MBeanWrapper;
import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import org.apache.cassandra.utils.Pair;
+import org.apache.cassandra.utils.TimeUUID;
+import org.apache.cassandra.utils.concurrent.CountDownLatch;
import org.apache.cassandra.utils.concurrent.UncheckedInterruptedException;
-import static com.google.common.collect.Iterables.concat;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+import static com.google.common.collect.Iterables.concat;
+import static org.apache.commons.lang3.StringUtils.join;
+
import static org.apache.cassandra.db.ConsistencyLevel.SERIAL;
-import static org.apache.cassandra.net.Message.out;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casReadMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.casWriteMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetrics;
@@ -154,8 +158,15 @@ import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.readMetri
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.viewWriteMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics;
import static org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetricsForLevel;
+import static org.apache.cassandra.net.Message.out;
import static org.apache.cassandra.net.NoPayload.noPayload;
-import static org.apache.cassandra.net.Verb.*;
+import static org.apache.cassandra.net.Verb.BATCH_STORE_REQ;
+import static org.apache.cassandra.net.Verb.MUTATION_REQ;
+import static org.apache.cassandra.net.Verb.PAXOS_COMMIT_REQ;
+import static org.apache.cassandra.net.Verb.PAXOS_PREPARE_REQ;
+import static org.apache.cassandra.net.Verb.PAXOS_PROPOSE_REQ;
+import static org.apache.cassandra.net.Verb.SCHEMA_VERSION_REQ;
+import static org.apache.cassandra.net.Verb.TRUNCATE_REQ;
import static org.apache.cassandra.service.BatchlogResponseHandler.BatchlogCleanup;
import static org.apache.cassandra.service.paxos.Ballot.Flag.GLOBAL;
import static org.apache.cassandra.service.paxos.Ballot.Flag.LOCAL;
@@ -166,7 +177,6 @@ import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.TimeUUID.Generator.nextTimeUUID;
import static org.apache.cassandra.utils.concurrent.CountDownLatch.newCountDownLatch;
-import static org.apache.commons.lang3.StringUtils.join;
public class StorageProxy implements StorageProxyMBean
{
@@ -829,6 +839,12 @@ public class StorageProxy implements StorageProxyMBean
}
}
+ @Override
+ public String description()
+ {
+ return "Paxos " + message.payload.toString();
+ }
+
@Override
protected Verb verb()
{
@@ -1264,7 +1280,7 @@ public class StorageProxy implements StorageProxyMBean
logger.trace("Sending batchlog store request {} to {} for {} mutations", batch.id, replica, batch.size());
if (replica.isSelf())
- performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler);
+ performLocally(Stage.MUTATION, replica, () -> BatchlogManager.store(batch), handler, "Batchlog store");
else
MessagingService.instance().sendWithCallback(message, replica.endpoint(), handler);
}
@@ -1280,7 +1296,7 @@ public class StorageProxy implements StorageProxyMBean
logger.trace("Sending batchlog remove request {} to {}", uuid, target);
if (target.isSelf())
- performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid));
+ performLocally(Stage.MUTATION, target, () -> BatchlogManager.remove(uuid), "Batchlog remove");
else
MessagingService.instance().send(message, target.endpoint());
}
@@ -1524,7 +1540,7 @@ public class StorageProxy implements StorageProxyMBean
if (insertLocal)
{
Preconditions.checkNotNull(localReplica);
- performLocally(stage, localReplica, mutation::apply, responseHandler);
+ performLocally(stage, localReplica, mutation::apply, responseHandler, mutation);
}
if (localDc != null)
@@ -1591,7 +1607,7 @@ public class StorageProxy implements StorageProxyMBean
logger.trace("Sending message to {}@{}", message.id(), target);
}
- private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable)
+ private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, String description)
{
stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica)
{
@@ -1607,6 +1623,12 @@ public class StorageProxy implements StorageProxyMBean
}
}
+ @Override
+ public String description()
+ {
+ return description;
+ }
+
@Override
protected Verb verb()
{
@@ -1615,7 +1637,7 @@ public class StorageProxy implements StorageProxyMBean
});
}
- private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler)
+ private static void performLocally(Stage stage, Replica localReplica, final Runnable runnable, final RequestCallback<?> handler, Object description)
{
stage.maybeExecuteImmediately(new LocalMutationRunnable(localReplica)
{
@@ -1634,6 +1656,14 @@ public class StorageProxy implements StorageProxyMBean
}
}
+ @Override
+ public String description()
+ {
+ // description is an Object and toString() called so we do not have to evaluate the Mutation.toString()
+ // unless expliclitly checked
+ return description.toString();
+ }
+
@Override
protected Verb verb()
{
@@ -2088,7 +2118,7 @@ public class StorageProxy implements StorageProxyMBean
return concatAndBlockOnRepair(results, repairs);
}
- public static class LocalReadRunnable extends DroppableRunnable
+ public static class LocalReadRunnable extends DroppableRunnable implements RunnableDebuggableTask
{
private final ReadCommand command;
private final ReadCallback handler;
@@ -2158,6 +2188,24 @@ public class StorageProxy implements StorageProxyMBean
}
}
}
+
+ @Override
+ public long creationTimeNanos()
+ {
+ return approxCreationTimeNanos;
+ }
+
+ @Override
+ public long startTimeNanos()
+ {
+ return approxStartTimeNanos;
+ }
+
+ @Override
+ public String description()
+ {
+ return command.toCQLString();
+ }
}
public static PartitionIterator getRangeSlice(PartitionRangeReadCommand command,
@@ -2468,7 +2516,9 @@ public class StorageProxy implements StorageProxyMBean
*/
private static abstract class DroppableRunnable implements Runnable
{
- final long approxCreationTimeNanos;
+ protected final long approxCreationTimeNanos;
+ protected volatile long approxStartTimeNanos;
+
final Verb verb;
public DroppableRunnable(Verb verb)
@@ -2479,11 +2529,11 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
- long approxCurrentTimeNanos = MonotonicClock.Global.approxTime.now();
+ approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos);
- if (approxCurrentTimeNanos > expirationTimeNanos)
+ if (approxStartTimeNanos > expirationTimeNanos)
{
- long timeTakenNanos = approxCurrentTimeNanos - approxCreationTimeNanos;
+ long timeTakenNanos = approxStartTimeNanos - approxCreationTimeNanos;
MessagingService.instance().metrics.recordSelfDroppedMessage(verb, timeTakenNanos, NANOSECONDS);
return;
}
@@ -2504,9 +2554,10 @@ public class StorageProxy implements StorageProxyMBean
* Like DroppableRunnable, but if it aborts, it will rerun (on the mutation stage) after
* marking itself as a hint in progress so that the hint backpressure mechanism can function.
*/
- private static abstract class LocalMutationRunnable implements Runnable
+ private static abstract class LocalMutationRunnable implements RunnableDebuggableTask
{
private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
+ private volatile long approxStartTimeNanos;
private final Replica localReplica;
@@ -2518,11 +2569,12 @@ public class StorageProxy implements StorageProxyMBean
public final void run()
{
final Verb verb = verb();
- long nowNanos = MonotonicClock.Global.approxTime.now();
+ approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
long expirationTimeNanos = verb.expiresAtNanos(approxCreationTimeNanos);
- if (nowNanos > expirationTimeNanos)
+
+ if (approxStartTimeNanos > expirationTimeNanos)
{
- long timeTakenNanos = nowNanos - approxCreationTimeNanos;
+ long timeTakenNanos = approxStartTimeNanos - approxCreationTimeNanos;
MessagingService.instance().metrics.recordSelfDroppedMessage(Verb.MUTATION_REQ, timeTakenNanos, NANOSECONDS);
HintRunnable runnable = new HintRunnable(EndpointsForToken.of(localReplica.range().right, localReplica))
@@ -2546,6 +2598,21 @@ public class StorageProxy implements StorageProxyMBean
}
}
+ @Override
+ public long creationTimeNanos()
+ {
+ return approxCreationTimeNanos;
+ }
+
+ @Override
+ public long startTimeNanos()
+ {
+ return approxStartTimeNanos;
+ }
+
+ @Override
+ abstract public String description();
+
abstract protected Verb verb();
abstract protected void runMayThrow() throws Exception;
}
diff --git a/src/java/org/apache/cassandra/transport/Dispatcher.java b/src/java/org/apache/cassandra/transport/Dispatcher.java
index da79c3d2c4..8f8a607c77 100644
--- a/src/java/org/apache/cassandra/transport/Dispatcher.java
+++ b/src/java/org/apache/cassandra/transport/Dispatcher.java
@@ -24,15 +24,16 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import com.google.common.base.Predicate;
-import org.apache.cassandra.metrics.ClientMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.channel.Channel;
import io.netty.channel.EventLoop;
import io.netty.util.AttributeKey;
+import org.apache.cassandra.concurrent.DebuggableTask.RunnableDebuggableTask;
import org.apache.cassandra.concurrent.LocalAwareExecutorPlus;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.metrics.ClientMetrics;
import org.apache.cassandra.net.FrameEncoder;
import org.apache.cassandra.service.ClientWarn;
import org.apache.cassandra.service.QueryState;
@@ -42,10 +43,10 @@ import org.apache.cassandra.transport.Flusher.FlushItem;
import org.apache.cassandra.transport.messages.ErrorMessage;
import org.apache.cassandra.transport.messages.EventMessage;
import org.apache.cassandra.utils.JVMStabilityInspector;
+import org.apache.cassandra.utils.MonotonicClock;
import org.apache.cassandra.utils.NoSpamLogger;
import static org.apache.cassandra.concurrent.SharedExecutorPool.SHARED;
-import static org.apache.cassandra.utils.Clock.Global.nanoTime;
public class Dispatcher
{
@@ -79,17 +80,60 @@ public class Dispatcher
public void dispatch(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
{
- requestExecutor.submit(() -> processRequest(channel, request, forFlusher, backpressure));
+ requestExecutor.submit(new RequestProcessor(channel, request, forFlusher, backpressure));
ClientMetrics.instance.markRequestDispatched();
}
+ public class RequestProcessor implements RunnableDebuggableTask
+ {
+ private final Channel channel;
+ private final Message.Request request;
+ private final FlushItemConverter forFlusher;
+ private final Overload backpressure;
+
+ private final long approxCreationTimeNanos = MonotonicClock.Global.approxTime.now();
+ private volatile long approxStartTimeNanos;
+
+ public RequestProcessor(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
+ {
+ this.channel = channel;
+ this.request = request;
+ this.forFlusher = forFlusher;
+ this.backpressure = backpressure;
+ }
+
+ @Override
+ public void run()
+ {
+ approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
+ processRequest(channel, request, forFlusher, backpressure, approxStartTimeNanos);
+ }
+
+ @Override
+ public long creationTimeNanos()
+ {
+ return approxCreationTimeNanos;
+ }
+
+ @Override
+ public long startTimeNanos()
+ {
+ return approxStartTimeNanos;
+ }
+
+ @Override
+ public String description()
+ {
+ return request.toString();
+ }
+ }
+
/**
* Note: this method may be executed on the netty event loop, during initial protocol negotiation; the caller is
* responsible for cleaning up any global or thread-local state. (ex. tracing, client warnings, etc.).
*/
- private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure)
+ private static Message.Response processRequest(ServerConnection connection, Message.Request request, Overload backpressure, long startTimeNanos)
{
- long queryStartNanoTime = nanoTime();
if (connection.getVersion().isGreaterOrEqualTo(ProtocolVersion.V4))
ClientWarn.instance.captureWarnings();
@@ -119,7 +163,7 @@ public class Dispatcher
Message.logger.trace("Received: {}, v={}", request, connection.getVersion());
connection.requests.inc();
- Message.Response response = request.execute(qstate, queryStartNanoTime);
+ Message.Response response = request.execute(qstate, startTimeNanos);
if (request.isTrackable())
CoordinatorWarnings.done();
@@ -130,15 +174,15 @@ public class Dispatcher
connection.applyStateTransition(request.type, response.type);
return response;
}
-
+
/**
* Note: this method may be executed on the netty event loop.
*/
- static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure)
+ static Message.Response processRequest(Channel channel, Message.Request request, Overload backpressure, long approxStartTimeNanos)
{
try
{
- return processRequest((ServerConnection) request.connection(), request, backpressure);
+ return processRequest((ServerConnection) request.connection(), request, backpressure, approxStartTimeNanos);
}
catch (Throwable t)
{
@@ -163,9 +207,9 @@ public class Dispatcher
/**
* Note: this method is not expected to execute on the netty event loop.
*/
- void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure)
+ void processRequest(Channel channel, Message.Request request, FlushItemConverter forFlusher, Overload backpressure, long approxStartTimeNanos)
{
- Message.Response response = processRequest(channel, request, backpressure);
+ Message.Response response = processRequest(channel, request, backpressure, approxStartTimeNanos);
FlushItem<?> toFlush = forFlusher.toFlushItem(channel, request, response);
Message.logger.trace("Responding: {}, v={}", response, request.connection().getVersion());
flush(toFlush);
@@ -201,7 +245,7 @@ public class Dispatcher
* for delivering events to registered clients is dependent on protocol version and the configuration
* of the pipeline. For v5 and newer connections, the event message is encoded into an Envelope,
* wrapped in a FlushItem and then delivered via the pipeline's flusher, in a similar way to
- * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter, Overload)}.
+ * a Response returned from {@link #processRequest(Channel, Message.Request, FlushItemConverter, Overload, long)}.
* It's worth noting that events are not generally fired as a direct response to a client request,
* so this flush item has a null request attribute. The dispatcher itself is created when the
* pipeline is first configured during protocol negotiation and is attached to the channel for
diff --git a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
index 75cb72e8b5..e4cff99acb 100644
--- a/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
+++ b/src/java/org/apache/cassandra/transport/InitialConnectionHandler.java
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.Map;
import org.apache.cassandra.transport.ClientResourceLimits.Overload;
+import org.apache.cassandra.utils.MonotonicClock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,7 +149,9 @@ public class InitialConnectionHandler extends ByteToMessageDecoder
promise = new VoidChannelPromise(ctx.channel(), false);
}
- final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE);
+ long approxStartTimeNanos = MonotonicClock.Global.approxTime.now();
+ final Message.Response response = Dispatcher.processRequest(ctx.channel(), startup, Overload.NONE, approxStartTimeNanos);
+
outbound = response.encode(inbound.header.version);
ctx.writeAndFlush(outbound, promise);
logger.trace("Configured pipeline: {}", ctx.pipeline());
diff --git a/src/java/org/apache/cassandra/transport/Message.java b/src/java/org/apache/cassandra/transport/Message.java
index 75c997e38c..2c91a76c3b 100644
--- a/src/java/org/apache/cassandra/transport/Message.java
+++ b/src/java/org/apache/cassandra/transport/Message.java
@@ -193,7 +193,8 @@ public abstract class Message
this.customPayload = customPayload;
}
- public String debugString()
+ @Override
+ public String toString()
{
return String.format("(%s:%s:%s)", type, streamId, connection == null ? "null" : connection.getVersion().asInt());
}
diff --git a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
index 9a296e442b..c295216d2c 100644
--- a/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
+++ b/src/java/org/apache/cassandra/transport/messages/QueryMessage.java
@@ -148,6 +148,7 @@ public class QueryMessage extends Message.Request
@Override
public String toString()
{
- return String.format("QUERY %s [pageSize = %d]", query, options.getPageSize());
+ return String.format("QUERY %s [pageSize = %d] at consistency %s",
+ query, options.getPageSize(), options.getConsistency());
}
}
diff --git a/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
new file mode 100644
index 0000000000..09e56e0b61
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/QueriesTableTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Row;
+import org.apache.cassandra.distributed.api.SimpleQueryResult;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class QueriesTableTest extends TestBaseImpl
+{
+ public static final int ITERATIONS = 256;
+
+ @Test
+ public void shouldExposeReadsAndWrites() throws Throwable
+ {
+ try (Cluster cluster = init(Cluster.build(1).start()))
+ {
+ ExecutorService executor = Executors.newFixedThreadPool(16);
+
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (k int primary key, v int)");
+
+ AtomicInteger reads = new AtomicInteger(0);
+ AtomicInteger writes = new AtomicInteger(0);
+ AtomicInteger paxos = new AtomicInteger(0);
+
+ for (int i = 0; i < ITERATIONS; i++)
+ {
+ int k = i;
+ executor.execute(() -> cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (k, v) VALUES (" + k + ", 0)", ConsistencyLevel.ALL));
+ executor.execute(() -> cluster.coordinator(1).execute("UPDATE " + KEYSPACE + ".tbl SET v = 10 WHERE k = " + (k - 1) + " IF v = 0", ConsistencyLevel.ALL));
+ executor.execute(() -> cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE k = " + (k - 1), ConsistencyLevel.ALL));
+
+ executor.execute(() ->
+ {
+ SimpleQueryResult result = cluster.get(1).executeInternalWithResult("SELECT * FROM system_views.queries");
+
+ while (result.hasNext())
+ {
+ Row row = result.next();
+ String threadId = row.get("thread_id").toString();
+ String task = row.get("task").toString();
+
+ if (threadId.contains("Read") && task.contains("SELECT"))
+ reads.incrementAndGet();
+ else if (threadId.contains("Mutation") && task.contains("Mutation"))
+ writes.incrementAndGet();
+ else if (threadId.contains("Mutation") && task.contains("Paxos"))
+ paxos.incrementAndGet();
+ }
+ });
+ }
+
+ executor.shutdown();
+ assertTrue(executor.awaitTermination(1, TimeUnit.MINUTES));
+
+ // We should see at least one read, write, and conditional update in the "queries" table.
+ assertThat(reads.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
+ assertThat(writes.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
+ assertThat(paxos.get()).isGreaterThan(0).isLessThanOrEqualTo(ITERATIONS);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org