You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by al...@apache.org on 2014/11/12 23:11:21 UTC
[1/3] cassandra git commit: Fix millisecond timestamps in Tracing
Repository: cassandra
Updated Branches:
refs/heads/trunk cf94e3c31 -> eb1c2831c
Fix millisecond timestamps in Tracing
patch by Aleksey Yeschenko; reviewed by Sam Tunnicliffe for
CASSANDRA-8297
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/be791422
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/be791422
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/be791422
Branch: refs/heads/trunk
Commit: be791422972a2ee485239e3d724ffcab3d8fdace
Parents: a6802aa
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Wed Nov 12 23:44:00 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Wed Nov 12 23:46:53 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/config/CFMetaData.java | 4 ++--
.../org/apache/cassandra/tracing/Tracing.java | 21 +++++---------------
3 files changed, 8 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be791422/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 842643c..47e611c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.0.12:
+ * Fix millisecond timestamps in Tracing (CASSANDRA-8297)
* Include keyspace name in error message when there are insufficient
live nodes to stream from (CASSANDRA-8221)
* Avoid overlap in L1 when L0 contains many nonoverlapping
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be791422/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index 391341f..9d69710 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -217,7 +217,7 @@ public final class CFMetaData
+ "started_at timestamp,"
+ "parameters map<text, text>,"
+ "duration int"
- + ") WITH COMMENT='traced sessions'",
+ + ") WITH COMMENT='traced sessions' AND default_time_to_live=86400",
Tracing.TRACE_KS);
public static final CFMetaData TraceEventsCf = compile("CREATE TABLE " + Tracing.EVENTS_CF + " ("
@@ -228,7 +228,7 @@ public final class CFMetaData
+ "activity text,"
+ "source_elapsed int,"
+ "PRIMARY KEY (session_id, event_id)"
- + ")",
+ + ") WITH default_time_to_live=86400",
Tracing.TRACE_KS);
public static final CFMetaData BatchlogCf = compile("CREATE TABLE " + SystemKeyspace.BATCHLOG_CF + " ("
http://git-wip-us.apache.org/repos/asf/cassandra/blob/be791422/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 88239be..b38dc10 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -59,15 +59,13 @@ public class Tracing
public static final String SESSIONS_CF = "sessions";
public static final String TRACE_HEADER = "TraceSession";
- private static final int TTL = 24 * 3600;
-
private static final Logger logger = LoggerFactory.getLogger(Tracing.class);
private final InetAddress localAddress = FBUtilities.getLocalAddress();
- private final ThreadLocal<TraceState> state = new ThreadLocal<TraceState>();
+ private final ThreadLocal<TraceState> state = new ThreadLocal<>();
- private final ConcurrentMap<UUID, TraceState> sessions = new ConcurrentHashMap<UUID, TraceState>();
+ private final ConcurrentMap<UUID, TraceState> sessions = new ConcurrentHashMap<>();
public static final Tracing instance = new Tracing();
@@ -93,16 +91,7 @@ public class Tracing
private static void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer value)
{
- cf.addColumn(new ExpiringColumn(name, value, System.currentTimeMillis(), TTL));
- }
-
- public void addParameterColumns(ColumnFamily cf, Map<String, String> rawPayload)
- {
- for (Map.Entry<String, String> entry : rawPayload.entrySet())
- {
- cf.addColumn(new ExpiringColumn(buildName(cf.metadata(), bytes("parameters"), bytes(entry.getKey())),
- bytes(entry.getValue()), System.currentTimeMillis(), TTL));
- }
+ cf.addColumn(name, value, FBUtilities.timestampMicros());
}
public static ByteBuffer buildName(CFMetaData meta, ByteBuffer... args)
@@ -208,10 +197,10 @@ public class Tracing
CFMetaData cfMeta = CFMetaData.TraceSessionsCf;
ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfMeta);
addColumn(cf, buildName(cfMeta, bytes("coordinator")), FBUtilities.getBroadcastAddress());
- addParameterColumns(cf, parameters);
+ for (Map.Entry<String, String> entry : parameters.entrySet())
+ addColumn(cf, buildName(cf.metadata(), bytes("parameters"), bytes(entry.getKey())), entry.getValue());
addColumn(cf, buildName(cfMeta, bytes("request")), request);
addColumn(cf, buildName(cfMeta, bytes("started_at")), started_at);
- addParameterColumns(cf, parameters);
mutateWithCatch(new RowMutation(TRACE_KS, sessionIdBytes, cf));
}
});
[2/3] cassandra git commit: Merge branch 'cassandra-2.0' into
cassandra-2.1
Posted by al...@apache.org.
Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/tracing/Tracing.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/26ea0f6d
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/26ea0f6d
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/26ea0f6d
Branch: refs/heads/trunk
Commit: 26ea0f6d7442760d1aeb57ce0b8ab6f86c3190a8
Parents: deff175 be79142
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Nov 13 00:34:14 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Nov 13 00:34:14 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/cassandra/config/CFMetaData.java | 4 +-
.../apache/cassandra/tracing/TraceState.java | 20 ++---
.../org/apache/cassandra/tracing/Tracing.java | 80 +++++---------------
4 files changed, 36 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26ea0f6d/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5b63f48,47e611c..51a3ed0
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,40 -1,11 +1,42 @@@
-2.0.12:
+2.1.3
+ * Support for frozen collections (CASSANDRA-7859)
+ * Fix overflow on histogram computation (CASSANDRA-8028)
+ * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801)
+Merged from 2.0:
+ * Fix millisecond timestamps in Tracing (CASSANDRA-8297)
* Include keyspace name in error message when there are insufficient
live nodes to stream from (CASSANDRA-8221)
* Avoid overlap in L1 when L0 contains many nonoverlapping
sstables (CASSANDRA-8211)
* Improve PropertyFileSnitch logging (CASSANDRA-8183)
- * Abort liveRatio calculation if the memtable is flushed (CASSANDRA-8164)
+
++
+2.1.2
+ * (cqlsh) parse_for_table_meta errors out on queries with undefined
+ grammars (CASSANDRA-8262)
+ * (cqlsh) Fix SELECT ... TOKEN() function broken in C* 2.1.1 (CASSANDRA-8258)
+ * Fix Cassandra crash when running on JDK8 update 40 (CASSANDRA-8209)
+ * Optimize partitioner tokens (CASSANDRA-8230)
+ * Improve compaction of repaired/unrepaired sstables (CASSANDRA-8004)
+ * Make cache serializers pluggable (CASSANDRA-8096)
+ * Fix issues with CONTAINS (KEY) queries on secondary indexes
+ (CASSANDRA-8147)
+ * Fix read-rate tracking of sstables for some queries (CASSANDRA-8239)
+ * Fix default timestamp in QueryOptions (CASSANDRA-8246)
+ * Set socket timeout when reading remote version (CASSANDRA-8188)
+ * Refactor how we track live size (CASSANDRA-7852)
+ * Make sure unfinished compaction files are removed (CASSANDRA-8124)
+ * Fix shutdown when run as Windows service (CASSANDRA-8136)
+ * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
+ * Fix race in RecoveryManagerTest (CASSANDRA-8176)
+ * Avoid IllegalArgumentException while sorting sstables in
+ IndexSummaryManager (CASSANDRA-8182)
+ * Shutdown JVM on file descriptor exhaustion (CASSANDRA-7579)
+ * Add 'die' policy for commit log and disk failure (CASSANDRA-7927)
+ * Fix installing as service on Windows (CASSANDRA-8115)
+ * Fix CREATE TABLE for CQL2 (CASSANDRA-8144)
+ * Avoid boxing in ColumnStats min/max trackers (CASSANDRA-8109)
+Merged from 2.0:
* Correctly handle non-text column names in cql3 (CASSANDRA-8178)
* Fix deletion for indexes on primary key columns (CASSANDRA-8206)
* Add 'nodetool statusgossip' (CASSANDRA-8125)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26ea0f6d/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26ea0f6d/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceState.java
index cfff295,5fec633..399b6e9
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@@ -29,9 -28,9 +29,9 @@@ import org.slf4j.helpers.MessageFormatt
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.db.ArrayBackedSortedColumns;
++import org.apache.cassandra.db.CFRowAdder;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
-import org.apache.cassandra.db.TreeMapBackedSortedColumns;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
@@@ -91,21 -87,21 +91,23 @@@ public class TraceStat
public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
{
-- final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
final String threadName = Thread.currentThread().getName();
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
public void runMayThrow()
{
-- CFMetaData cfMeta = CFMetaData.TraceEventsCf;
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfMeta);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
++ Mutation mutation = new Mutation(Tracing.TRACE_KS, sessionIdBytes);
++ ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceEventsCf);
++
++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros());
++ adder.add("activity", message);
++ adder.add("source", FBUtilities.getBroadcastAddress());
if (elapsed >= 0)
-- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
-- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
- Tracing.mutateWithCatch(new Mutation(Tracing.TRACE_KS, sessionIdBytes, cf));
- Tracing.mutateWithCatch(new RowMutation(Tracing.TRACE_KS, sessionIdBytes, cf));
++ adder.add("source_elapsed", elapsed);
++ adder.add("thread", threadName);
++
++ Tracing.mutateWithCatch(mutation);
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/26ea0f6d/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/Tracing.java
index e377c6e,b38dc10..d74859a
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@@ -22,6 -22,6 +22,7 @@@ package org.apache.cassandra.tracing
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.Arrays;
++import java.util.Date;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@@ -33,8 -33,8 +34,7 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.cql3.ColumnNameBuilder;
import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.UnavailableException;
@@@ -42,12 -42,12 +42,9 @@@ import org.apache.cassandra.exceptions.
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
--import static org.apache.cassandra.utils.ByteBufferUtil.bytes;
--
/**
* A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may
* have multiple local and remote events before it is completed. All events and sessions are stored at keyspace.
@@@ -71,45 -69,39 +66,6 @@@ public class Tracin
public static final Tracing instance = new Tracing();
- public static void addColumn(ColumnFamily cf, CellName name, InetAddress address)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(address));
- }
-
- public static void addColumn(ColumnFamily cf, CellName name, int value)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(value));
- }
-
- public static void addColumn(ColumnFamily cf, CellName name, long value)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(value));
- }
-
- public static void addColumn(ColumnFamily cf, CellName name, String value)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(value));
- }
-
- private static void addColumn(ColumnFamily cf, CellName name, ByteBuffer value)
- {
- cf.addColumn(new BufferExpiringCell(name, value, System.currentTimeMillis(), TTL));
- }
-
- public void addParameterColumns(ColumnFamily cf, Map<String, String> rawPayload)
- {
- for (Map.Entry<String, String> entry : rawPayload.entrySet())
- {
- cf.addColumn(new BufferExpiringCell(buildName(CFMetaData.TraceSessionsCf, "parameters", entry.getKey()),
- bytes(entry.getValue()), System.currentTimeMillis(), TTL));
- }
- }
-
- public static CellName buildName(CFMetaData meta, Object... args)
- {
- return meta.comparator.makeCellName(args);
- }
-
- public static void addColumn(ColumnFamily cf, ByteBuffer name, InetAddress address)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(address));
- }
-
- public static void addColumn(ColumnFamily cf, ByteBuffer name, int value)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(value));
- }
-
- public static void addColumn(ColumnFamily cf, ByteBuffer name, long value)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(value));
- }
-
- public static void addColumn(ColumnFamily cf, ByteBuffer name, String value)
- {
- addColumn(cf, name, ByteBufferUtil.bytes(value));
- }
-
- private static void addColumn(ColumnFamily cf, ByteBuffer name, ByteBuffer value)
- {
- cf.addColumn(name, value, FBUtilities.timestampMicros());
- }
-
- public static ByteBuffer buildName(CFMetaData meta, ByteBuffer... args)
- {
- ColumnNameBuilder builder = meta.getCfDef().getColumnNameBuilder();
- for (ByteBuffer arg : args)
- builder.add(arg);
- return builder.build();
- }
-
public UUID getSessionId()
{
assert isTracing();
@@@ -165,10 -156,10 +121,13 @@@
{
public void run()
{
-- CFMetaData cfMeta = CFMetaData.TraceSessionsCf;
-- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
- addColumn(cf, buildName(cfMeta, "duration"), elapsed);
- mutateWithCatch(new Mutation(TRACE_KS, sessionIdBytes, cf));
- addColumn(cf, buildName(cfMeta, bytes("duration")), elapsed);
- mutateWithCatch(new RowMutation(TRACE_KS, sessionIdBytes, cf));
++ Mutation mutation = new Mutation(TRACE_KS, sessionIdBytes);
++ ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceSessionsCf);
++
++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
++ adder.add("duration", elapsed);
++
++ mutateWithCatch(mutation);
}
});
@@@ -203,14 -194,14 +162,17 @@@
{
public void run()
{
-- CFMetaData cfMeta = CFMetaData.TraceSessionsCf;
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
- addColumn(cf, buildName(cfMeta, "coordinator"), FBUtilities.getBroadcastAddress());
- addParameterColumns(cf, parameters);
- addColumn(cf, buildName(cfMeta, bytes("request")), request);
- addColumn(cf, buildName(cfMeta, bytes("started_at")), started_at);
- addParameterColumns(cf, parameters);
- mutateWithCatch(new Mutation(TRACE_KS, sessionIdBytes, cf));
- ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(cfMeta);
- addColumn(cf, buildName(cfMeta, bytes("coordinator")), FBUtilities.getBroadcastAddress());
++ Mutation mutation = new Mutation(TRACE_KS, sessionIdBytes);
++ ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceSessionsCf);
++
++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
++ adder.add("coordinator", FBUtilities.getBroadcastAddress());
+ for (Map.Entry<String, String> entry : parameters.entrySet())
- addColumn(cf, buildName(cf.metadata(), bytes("parameters"), bytes(entry.getKey())), entry.getValue());
- addColumn(cf, buildName(cfMeta, bytes("request")), request);
- addColumn(cf, buildName(cfMeta, bytes("started_at")), started_at);
- mutateWithCatch(new RowMutation(TRACE_KS, sessionIdBytes, cf));
++ adder.addMapEntry("parameters", entry.getKey(), entry.getValue());
++ adder.add("request", request);
++ adder.add("started_at", new Date(started_at));
++
++ mutateWithCatch(mutation);
}
});
}
[3/3] cassandra git commit: Merge branch 'cassandra-2.1' into trunk
Posted by al...@apache.org.
Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/config/CFMetaData.java
src/java/org/apache/cassandra/tracing/TraceState.java
src/java/org/apache/cassandra/tracing/Tracing.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eb1c2831
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eb1c2831
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eb1c2831
Branch: refs/heads/trunk
Commit: eb1c2831cdcd1b96710bdab4e4dad3e9ea48b5ab
Parents: cf94e3c 26ea0f6
Author: Aleksey Yeschenko <al...@apache.org>
Authored: Thu Nov 13 01:11:09 2014 +0300
Committer: Aleksey Yeschenko <al...@apache.org>
Committed: Thu Nov 13 01:11:09 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../apache/cassandra/tracing/TraceKeyspace.java | 57 ++++++++++++++++--
.../apache/cassandra/tracing/TraceState.java | 18 +-----
.../org/apache/cassandra/tracing/Tracing.java | 63 ++------------------
4 files changed, 62 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index a20fadd,0000000..4d234bd
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@@ -1,73 -1,0 +1,122 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tracing;
+
++import java.nio.ByteBuffer;
+import java.util.Arrays;
++import java.util.Date;
+import java.util.List;
++import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.KSMetaData;
++import org.apache.cassandra.db.CFRowAdder;
++import org.apache.cassandra.db.ColumnFamily;
++import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.locator.SimpleStrategy;
++import org.apache.cassandra.utils.FBUtilities;
++import org.apache.cassandra.utils.UUIDGen;
+
+public final class TraceKeyspace
+{
+ public static final String NAME = "system_traces";
+
- static final String SESSIONS_TABLE = "sessions";
- static final String EVENTS_TABLE = "events";
++ private static final String SESSIONS_TABLE = "sessions";
++ private static final String EVENTS_TABLE = "events";
+
+ private static final int DAY = (int) TimeUnit.DAYS.toSeconds(1);
+
- static final CFMetaData SessionsTable =
++ private static final CFMetaData SessionsTable =
+ compile(SESSIONS_TABLE, "tracing sessions",
+ "CREATE TABLE %s ("
+ + "session_id uuid,"
+ + "coordinator inet,"
+ + "duration int,"
+ + "parameters map<text, text>,"
+ + "request text,"
+ + "started_at timestamp,"
+ + "PRIMARY KEY ((session_id)))")
+ .defaultTimeToLive(DAY);
+
- static final CFMetaData EventsTable =
++ private static final CFMetaData EventsTable =
+ compile(EVENTS_TABLE, "tracing events",
+ "CREATE TABLE %s ("
+ + "session_id uuid,"
+ + "event_id timeuuid,"
+ + "activity text,"
+ + "source inet,"
+ + "source_elapsed int,"
+ + "thread text,"
+ + "PRIMARY KEY ((session_id), event_id))")
+ .defaultTimeToLive(DAY);
+
+ private static CFMetaData compile(String table, String comment, String cql)
+ {
+ return CFMetaData.compile(String.format(cql, table), NAME).comment(comment);
+ }
+
+ public static KSMetaData definition()
+ {
+ List<CFMetaData> tables = Arrays.asList(SessionsTable, EventsTable);
+ return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "2"), true, tables);
+ }
++
++ static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed)
++ {
++ Mutation mutation = new Mutation(NAME, sessionId);
++ ColumnFamily cells = mutation.addOrGet(SessionsTable);
++
++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
++ adder.add("duration", elapsed);
++
++ return mutation;
++ }
++
++ static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, String> parameters, String request, long startedAt)
++ {
++ Mutation mutation = new Mutation(NAME, sessionId);
++ ColumnFamily cells = mutation.addOrGet(TraceKeyspace.SessionsTable);
++
++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
++ adder.add("coordinator", FBUtilities.getBroadcastAddress());
++ for (Map.Entry<String, String> entry : parameters.entrySet())
++ adder.addMapEntry("parameters", entry.getKey(), entry.getValue());
++ adder.add("request", request);
++ adder.add("started_at", new Date(startedAt));
++
++ return mutation;
++ }
++
++ static Mutation toEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName)
++ {
++ Mutation mutation = new Mutation(NAME, sessionId);
++ ColumnFamily cells = mutation.addOrGet(EventsTable);
++
++ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros());
++ adder.add("activity", message);
++ adder.add("source", FBUtilities.getBroadcastAddress());
++ if (elapsed >= 0)
++ adder.add("source_elapsed", elapsed);
++ adder.add("thread", threadName);
++
++ return mutation;
++ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceState.java
index 2d89d39,399b6e9..04abce3
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@@ -28,13 -28,13 +28,7 @@@ import org.slf4j.helpers.MessageFormatt
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
--import org.apache.cassandra.config.CFMetaData;
- import org.apache.cassandra.db.ArrayBackedSortedColumns;
- import org.apache.cassandra.db.ColumnFamily;
- import org.apache.cassandra.db.Mutation;
-import org.apache.cassandra.db.CFRowAdder;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.utils.ByteBufferUtil;
--import org.apache.cassandra.utils.FBUtilities;
--import org.apache.cassandra.utils.UUIDGen;
import org.apache.cassandra.utils.WrappedRunnable;
/**
@@@ -89,23 -89,25 +83,15 @@@ public class TraceStat
TraceState.trace(sessionIdBytes, message, elapsed());
}
-- public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
++ public static void trace(final ByteBuffer sessionId, final String message, final int elapsed)
{
- final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
final String threadName = Thread.currentThread().getName();
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
public void runMayThrow()
{
- CFMetaData cfMeta = TraceKeyspace.EventsTable;
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
- if (elapsed >= 0)
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
- Tracing.mutateWithCatch(new Mutation(TraceKeyspace.NAME, sessionIdBytes, cf));
- Mutation mutation = new Mutation(Tracing.TRACE_KS, sessionIdBytes);
- ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceEventsCf);
-
- CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros());
- adder.add("activity", message);
- adder.add("source", FBUtilities.getBroadcastAddress());
- if (elapsed >= 0)
- adder.add("source_elapsed", elapsed);
- adder.add("thread", threadName);
-
- Tracing.mutateWithCatch(mutation);
++ Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, elapsed, threadName));
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eb1c2831/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/Tracing.java
index 509239a,d74859a..773ccd4
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@@ -32,9 -33,8 +32,7 @@@ import org.slf4j.LoggerFactory
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
--import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.db.*;
- import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.exceptions.OverloadedException;
import org.apache.cassandra.exceptions.UnavailableException;
@@@ -153,16 -115,19 +110,13 @@@ public class Tracin
else
{
final int elapsed = state.elapsed();
-- final ByteBuffer sessionIdBytes = state.sessionIdBytes;
++ final ByteBuffer sessionId = state.sessionIdBytes;
StageManager.getStage(Stage.TRACING).execute(new Runnable()
{
public void run()
{
- CFMetaData cfMeta = TraceKeyspace.SessionsTable;
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
- addColumn(cf, buildName(cfMeta, "duration"), elapsed);
- mutateWithCatch(new Mutation(TraceKeyspace.NAME, sessionIdBytes, cf));
- Mutation mutation = new Mutation(TRACE_KS, sessionIdBytes);
- ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceSessionsCf);
-
- CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
- adder.add("duration", elapsed);
-
- mutateWithCatch(mutation);
++ mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed));
}
});
@@@ -190,21 -155,24 +144,14 @@@
{
assert isTracing();
-- final long started_at = System.currentTimeMillis();
-- final ByteBuffer sessionIdBytes = state.get().sessionIdBytes;
++ final long startedAt = System.currentTimeMillis();
++ final ByteBuffer sessionId = state.get().sessionIdBytes;
StageManager.getStage(Stage.TRACING).execute(new Runnable()
{
public void run()
{
- CFMetaData cfMeta = TraceKeyspace.SessionsTable;
- ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
- addColumn(cf, buildName(cfMeta, "coordinator"), FBUtilities.getBroadcastAddress());
- addParameterColumns(cf, parameters);
- addColumn(cf, buildName(cfMeta, bytes("request")), request);
- addColumn(cf, buildName(cfMeta, bytes("started_at")), started_at);
- addParameterColumns(cf, parameters);
- mutateWithCatch(new Mutation(TraceKeyspace.NAME, sessionIdBytes, cf));
- Mutation mutation = new Mutation(TRACE_KS, sessionIdBytes);
- ColumnFamily cells = mutation.addOrGet(CFMetaData.TraceSessionsCf);
-
- CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
- adder.add("coordinator", FBUtilities.getBroadcastAddress());
- for (Map.Entry<String, String> entry : parameters.entrySet())
- adder.addMapEntry("parameters", entry.getKey(), entry.getValue());
- adder.add("request", request);
- adder.add("started_at", new Date(started_at));
-
- mutateWithCatch(mutation);
++ mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, request, startedAt));
}
});
}