You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by st...@apache.org on 2016/07/29 07:34:10 UTC

[02/10] cassandra git commit: Wait for tracing events before returning response and query at same consistency level client side

Wait for tracing events before returning response and query at same consistency level client side

patch by Stefania Alborghetti; reviewed by Paulo Motta for CASSANDRA-11465


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/7bd65a12
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/7bd65a12
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/7bd65a12

Branch: refs/heads/cassandra-3.0
Commit: 7bd65a129c63091d6885f92afe77a41c4fc46a6f
Parents: 04ef62c
Author: Stefania Alborghetti <st...@datastax.com>
Authored: Thu Jul 14 14:15:39 2016 +0800
Committer: Stefania Alborghetti <st...@datastax.com>
Committed: Fri Jul 29 15:28:03 2016 +0800

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 bin/cqlsh.py                                    |  2 +-
 .../cassandra/concurrent/StageManager.java      | 18 ++++++++-
 .../apache/cassandra/tracing/TraceState.java    | 41 +++++++++++++++++++-
 4 files changed, 58 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ebebbdc..9a16ee3 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.8
+ * Wait for tracing events before returning response and query at same consistency level client side (CASSANDRA-11465)
  * cqlsh copyutil should get host metadata by connected address (CASSANDRA-11979)
  * Fixed cqlshlib.test.remove_test_db (CASSANDRA-12214)
  * Synchronize ThriftServer::stop() (CASSANDRA-12105)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index a3eacdd..b631450 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -1211,7 +1211,7 @@ class Shell(cmd.Cmd):
 
             if self.tracing_enabled:
                 try:
-                    for trace in future.get_all_query_traces(self.max_trace_wait):
+                    for trace in future.get_all_query_traces(max_wait_per=self.max_trace_wait, query_cl=self.consistency_level):
                         print_trace(self, trace)
                 except TraceUnavailable:
                     msg = "Statement trace did not complete within %d seconds; trace data may be incomplete." % (self.session.max_trace_wait,)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 114795e..343648c 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,7 +24,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.net.MessagingService;
-import org.apache.cassandra.tracing.TraceState;
 import org.apache.cassandra.utils.FBUtilities;
 
 import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -112,9 +111,19 @@ public class StageManager
         }
     }
 
+    public final static Runnable NO_OP_TASK = new Runnable()
+    {
+        public void run()
+        {
+
+        }
+    };
+
     /**
      * A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
-     * tracing stage.  See CASSANDRA-1123 for background.
+     * tracing stage.  See CASSANDRA-1123 for background. We allow submitting NO_OP tasks, to allow
+     * a final wait on pending trace events since typically the tracing executor is single-threaded, see
+     * CASSANDRA-11465.
      */
     private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements LocalAwareExecutorService
     {
@@ -137,6 +146,11 @@ public class StageManager
         @Override
         public Future<?> submit(Runnable task)
         {
+            if (task.equals(NO_OP_TASK))
+            {
+                assert getMaximumPoolSize() == 1 : "Cannot wait for pending tasks if running more than 1 thread";
+                return super.submit(task);
+            }
             throw new UnsupportedOperationException();
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/7bd65a12/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index e882e67..03e510f 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Stopwatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.slf4j.helpers.MessageFormatter;
 
 import org.apache.cassandra.concurrent.Stage;
@@ -36,6 +38,7 @@ import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.exceptions.OverloadedException;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.JVMStabilityInspector;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.apache.cassandra.utils.progress.ProgressEvent;
 import org.apache.cassandra.utils.progress.ProgressEventNotifier;
@@ -47,6 +50,10 @@ import org.apache.cassandra.utils.progress.ProgressListener;
  */
 public class TraceState implements ProgressEventNotifier
 {
+    private static final Logger logger = LoggerFactory.getLogger(TraceState.class);
+    private static final int WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS =
+    Integer.valueOf(System.getProperty("cassandra.wait_for_tracing_events_timeout_secs", "1"));
+
     public final UUID sessionId;
     public final InetAddress coordinator;
     public final Stopwatch watch;
@@ -119,6 +126,8 @@ public class TraceState implements ProgressEventNotifier
 
     public synchronized void stop()
     {
+        waitForPendingEvents();
+
         status = Status.STOPPED;
         notifyAll();
     }
@@ -181,6 +190,8 @@ public class TraceState implements ProgressEventNotifier
         final int elapsed = elapsed();
 
         executeMutation(TraceKeyspace.makeEventMutation(sessionIdBytes, message, elapsed, threadName, ttl));
+        if (logger.isTraceEnabled())
+            logger.trace("Adding <{}> to trace events", message);
 
         for (ProgressListener listener : listeners)
         {
@@ -194,7 +205,7 @@ public class TraceState implements ProgressEventNotifier
         {
             protected void runMayThrow() throws Exception
             {
-            mutateWithCatch(mutation);
+                mutateWithCatch(mutation);
             }
         });
     }
@@ -228,6 +239,33 @@ public class TraceState implements ProgressEventNotifier
         }
     }
 
+    /**
+     * Post a no-op event to the TRACING stage, so that we can be sure that any previous mutations
+     * have at least been applied to one replica. This works because the tracking executor only
+     * has one thread in its pool, see {@link StageManager#tracingExecutor()}.
+     */
+    protected void waitForPendingEvents()
+    {
+        if (WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS <= 0)
+            return;
+
+        try
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Waiting for up to {} seconds for trace events to complete",
+                             +WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS);
+
+            StageManager.getStage(Stage.TRACING).submit(StageManager.NO_OP_TASK)
+                        .get(WAIT_FOR_PENDING_EVENTS_TIMEOUT_SECS, TimeUnit.SECONDS);
+        }
+        catch (Throwable t)
+        {
+            JVMStabilityInspector.inspectThrowable(t);
+            logger.debug("Failed to wait for tracing events to complete: {}", t);
+        }
+    }
+
+
     public boolean acquireReference()
     {
         while (true)
@@ -242,6 +280,7 @@ public class TraceState implements ProgressEventNotifier
 
     public int releaseReference()
     {
+        waitForPendingEvents();
         return references.decrementAndGet();
     }
 }