You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ty...@apache.org on 2014/07/30 22:59:14 UTC

[1/2] git commit: Improve cqlsh waiting for a trace to complete

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1.0 cc3e0dbda -> 948ae016f


Improve cqlsh waiting for a trace to complete

Patch by Tyler Hobbs; review by Aleksey Yeschenko for CASSANDRA-7626


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2107e30b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2107e30b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2107e30b

Branch: refs/heads/cassandra-2.1.0
Commit: 2107e30bddf54fed5e7245a263eafba71917cc4a
Parents: cc3e0db
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Wed Jul 30 15:57:06 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Jul 30 15:57:06 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt               |  1 +
 bin/cqlsh                 | 21 ++++++++++++++++-----
 conf/cqlshrc.sample       |  3 +++
 pylib/cqlshlib/tracing.py | 29 +++++++++++++++++++++--------
 4 files changed, 41 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/2107e30b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c4772a7..b8990d9 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -17,6 +17,7 @@
  * Set -Dcassandra.storagedir for all tool shell scripts (CASSANDRA-7587)
  * Don't swap max/min col names when mutating sstable metadata (CASSANDRA-7596)
  * (cqlsh) Correctly handle paged result sets (CASSANDRA-7625)
+ * (cqlsh) Improve waiting for a trace to complete (CASSANDRA-7626)
 Merged from 2.0:
  * Fix ReversedType(DateType) mapping to native protocol (CASSANDRA-7576)
  * Always merge ranges owned by a single node (CASSANDRA-6930)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2107e30b/bin/cqlsh
----------------------------------------------------------------------
diff --git a/bin/cqlsh b/bin/cqlsh
index 551ad54..ba0c886 100755
--- a/bin/cqlsh
+++ b/bin/cqlsh
@@ -123,7 +123,7 @@ from cqlshlib.displaying import (RED, BLUE, ANSI_RESET, COLUMN_NAME_COLORS,
                                  FormattedValue, colorme)
 from cqlshlib.formatting import format_by_type, formatter_for, format_value_utype
 from cqlshlib.util import trim_if_present
-from cqlshlib.tracing import print_trace_session
+from cqlshlib.tracing import print_trace_session, print_trace
 
 HISTORY_DIR = os.path.expanduser(os.path.join('~', '.cassandra'))
 CONFIG_FILE = os.path.join(HISTORY_DIR, 'cqlshrc')
@@ -149,6 +149,7 @@ DEFAULT_PROTOCOL_VERSION = 3
 DEFAULT_TIME_FORMAT = '%Y-%m-%d %H:%M:%S%z'
 DEFAULT_FLOAT_PRECISION = 5
 DEFAULT_SELECT_LIMIT = 10000
+DEFAULT_MAX_TRACE_WAIT = 10
 
 if readline is not None and readline.__doc__ is not None and 'libedit' in readline.__doc__:
     DEFAULT_COMPLETEKEY = '\t'
@@ -491,6 +492,7 @@ class Shell(cmd.Cmd):
                  tracing_enabled=False, expand_enabled=False,
                  display_time_format=DEFAULT_TIME_FORMAT,
                  display_float_precision=DEFAULT_FLOAT_PRECISION,
+                 max_trace_wait=DEFAULT_MAX_TRACE_WAIT,
                  ssl=False,
                  single_statement=None):
         cmd.Cmd.__init__(self, completekey=completekey)
@@ -526,6 +528,8 @@ class Shell(cmd.Cmd):
         self.color = color
         self.display_time_format = display_time_format
         self.display_float_precision = display_float_precision
+        self.max_trace_wait = max_trace_wait
+        self.session.max_trace_wait = max_trace_wait
         if encoding is None:
             encoding = locale.getpreferredencoding()
         self.encoding = encoding
@@ -886,8 +890,11 @@ class Shell(cmd.Cmd):
         result = self.perform_simple_statement(stmt,
                                                 with_default_limit=with_default_limit)
         if self.tracing_enabled:
-            time.sleep(0.5) # trace writes are async so we wait a little.
-            print_trace_session(self, self.session, stmt.trace.trace_id)
+            if stmt.trace:
+                print_trace(self, stmt.trace)
+            else:
+                msg = "Statement trace did not complete within %d seconds" % (self.session.max_trace_wait)
+                self.writeresult(msg, color=RED)
 
         return result
 
@@ -920,7 +927,7 @@ class Shell(cmd.Cmd):
         self.decoding_errors = []
 
         self.writeresult("")
-        if rows :
+        if rows:
             rows = list(rows)  # this may be an iterator if the result is large enough to page
             self.print_static_result(rows)
         self.writeresult("(%d rows)" % len(rows or []))
@@ -1493,7 +1500,8 @@ class Shell(cmd.Cmd):
                          color=self.color, encoding=self.encoding, stdin=f,
                          tty=False, use_conn=self.conn, cqlver=self.cql_version,
                          display_time_format=self.display_time_format,
-                         display_float_precision=self.display_float_precision)
+                         display_float_precision=self.display_float_precision,
+                         max_trace_wait=self.max_trace_wait)
         subshell.cmdloop()
         f.close()
 
@@ -1788,6 +1796,8 @@ def read_options(cmdlineargs, environment):
                                                     DEFAULT_TIME_FORMAT)
     optvalues.float_precision = option_with_default(configs.getint, 'ui', 'float_precision',
                                                     DEFAULT_FLOAT_PRECISION)
+    optvalues.max_trace_wait = option_with_default(configs.getfloat, 'tracing', 'max_trace_wait',
+                                                   DEFAULT_MAX_TRACE_WAIT)
     optvalues.debug = False
     optvalues.file = None
     optvalues.ssl = False
@@ -1895,6 +1905,7 @@ def main(options, hostname, port):
                       keyspace=options.keyspace,
                       display_time_format=options.time_format,
                       display_float_precision=options.float_precision,
+                      max_trace_wait=options.max_trace_wait,
                       ssl=options.ssl,
                       single_statement=options.execute)
     except KeyboardInterrupt:

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2107e30b/conf/cqlshrc.sample
----------------------------------------------------------------------
diff --git a/conf/cqlshrc.sample b/conf/cqlshrc.sample
index ad59787..6558ad2 100644
--- a/conf/cqlshrc.sample
+++ b/conf/cqlshrc.sample
@@ -32,6 +32,9 @@ version = 3.1.5
 hostname = 127.0.0.1
 port = 9042
 
+[tracing]
+max_trace_wait = 10.0
+
 ;[ssl]
 ;certfile = ~/keys/cassandra.cert
 ;; optional - true by default.

http://git-wip-us.apache.org/repos/asf/cassandra/blob/2107e30b/pylib/cqlshlib/tracing.py
----------------------------------------------------------------------
diff --git a/pylib/cqlshlib/tracing.py b/pylib/cqlshlib/tracing.py
index 456ea31..40d22f0 100644
--- a/pylib/cqlshlib/tracing.py
+++ b/pylib/cqlshlib/tracing.py
@@ -17,14 +17,29 @@
 from cqlshlib.displaying import MAGENTA
 from datetime import datetime
 import time
-from cassandra.query import QueryTrace
+from cassandra.query import QueryTrace, TraceUnavailable
 
 
 def print_trace_session(shell, session, session_id):
+    """
+    Lookup a trace by session and trace session ID, then print it.
+    """
     trace = QueryTrace(session_id, session)
-    rows = fetch_trace_session(trace)
-    if not rows:
+    try:
+        trace.populate()
+    except TraceUnavailable:
         shell.printerr("Session %s wasn't found." % session_id)
+    else:
+        print_trace(shell, trace)
+
+
+def print_trace(shell, trace):
+    """
+    Print an already populated cassandra.query.QueryTrace instance.
+    """
+    rows = make_trace_rows(trace)
+    if not rows:
+        shell.printerr("No rows for session %s found." % (trace.trace_id,))
         return
     names = ['activity', 'timestamp', 'source', 'source_elapsed']
 
@@ -33,14 +48,13 @@ def print_trace_session(shell, session, session_id):
 
     shell.writeresult('')
     shell.writeresult('Tracing session: ', color=MAGENTA, newline=False)
-    shell.writeresult(session_id)
+    shell.writeresult(trace.trace_id)
     shell.writeresult('')
     shell.print_formatted_result(formatted_names, formatted_values)
     shell.writeresult('')
 
 
-def fetch_trace_session(trace):
-    trace.populate()
+def make_trace_rows(trace):
     if not trace.events:
         return []
 
@@ -51,7 +65,7 @@ def fetch_trace_session(trace):
         rows.append(["%s [%s]" % (event.description, event.thread_name),
                      str(datetime_from_utc_to_local(event.datetime)),
                      event.source,
-                     event.source_elapsed.microseconds])
+                     event.source_elapsed.microseconds if event.source_elapsed else "--"])
     # append footer row (from sessions table).
     if trace.duration:
         finished_at = (datetime_from_utc_to_local(trace.started_at) + trace.duration)
@@ -67,4 +81,3 @@ def datetime_from_utc_to_local(utc_datetime):
     now_timestamp = time.time()
     offset = datetime.fromtimestamp(now_timestamp) - datetime.utcfromtimestamp(now_timestamp)
     return utc_datetime + offset
-


[2/2] git commit: Fix tracing of concurrent range slices

Posted by ty...@apache.org.
Fix tracing of concurrent range slices

Patch by Tyler Hobbs; review by Aleksey Yeschenko for CASSANDRA-7626


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/948ae016
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/948ae016
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/948ae016

Branch: refs/heads/cassandra-2.1.0
Commit: 948ae016f4bf17c01e7b3320d7095c0ed9347840
Parents: 2107e30
Author: Tyler Hobbs <ty...@datastax.com>
Authored: Wed Jul 30 15:58:26 2014 -0500
Committer: Tyler Hobbs <ty...@datastax.com>
Committed: Wed Jul 30 15:58:26 2014 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/net/OutboundTcpConnection.java    |  2 +-
 .../apache/cassandra/tracing/TraceState.java    | 22 ++++++++++++++++++++
 .../org/apache/cassandra/tracing/Tracing.java   |  7 ++++---
 4 files changed, 28 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/948ae016/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b8990d9..d74db1a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -18,6 +18,7 @@
  * Don't swap max/min col names when mutating sstable metadata (CASSANDRA-7596)
  * (cqlsh) Correctly handle paged result sets (CASSANDRA-7625)
  * (cqlsh) Improve waiting for a trace to complete (CASSANDRA-7626)
+ * Fix tracing of concurrent range slices and 2ary index queries (CASSANDRA-7626)
 Merged from 2.0:
  * Fix ReversedType(DateType) mapping to native protocol (CASSANDRA-7576)
  * Always merge ranges owned by a single node (CASSANDRA-6930)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/948ae016/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 1781a5d..a0db992 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -219,7 +219,7 @@ public class OutboundTcpConnection extends Thread
                 {
                     state.trace(message);
                     if (qm.message.verb == MessagingService.Verb.REQUEST_RESPONSE)
-                        Tracing.instance.stopNonLocal(state);
+                        Tracing.instance.doneWithNonLocalSession(state);
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/948ae016/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 62eb891..cfff295 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -21,6 +21,7 @@ import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.base.Stopwatch;
 import org.slf4j.helpers.MessageFormatter;
@@ -47,6 +48,10 @@ public class TraceState
     public final Stopwatch watch;
     public final ByteBuffer sessionIdBytes;
 
+    // Multiple requests can use the same TraceState at a time, so we need to reference count.
+    // See CASSANDRA-7626 for more details.
+    private final AtomicInteger references = new AtomicInteger(1);
+
     public TraceState(InetAddress coordinator, UUID sessionId)
     {
         assert coordinator != null;
@@ -104,4 +109,21 @@ public class TraceState
             }
         });
     }
+
+    public boolean acquireReference()
+    {
+        while (true)
+        {
+            int n = references.get();
+            if (n <= 0)
+                return false;
+            if (references.compareAndSet(n, n + 1))
+                return true;
+        }
+    }
+
+    public int releaseReference()
+    {
+        return references.decrementAndGet();
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/948ae016/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index f650d16..e377c6e 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -140,9 +140,10 @@ public class Tracing
         return sessionId;
     }
 
-    public void stopNonLocal(TraceState state)
+    public void doneWithNonLocalSession(TraceState state)
     {
-        sessions.remove(state.sessionId);
+        if (state.releaseReference() == 0)
+            sessions.remove(state.sessionId);
     }
 
     /**
@@ -229,7 +230,7 @@ public class Tracing
         assert sessionBytes.length == 16;
         UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
         TraceState ts = sessions.get(sessionId);
-        if (ts != null)
+        if (ts != null && ts.acquireReference())
             return ts;
 
         if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)