You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jm...@apache.org on 2014/12/03 20:17:37 UTC
cassandra git commit: Add repair tracing
Repository: cassandra
Updated Branches:
refs/heads/trunk 3916e4867 -> f5866ca2b
Add repair tracing
Patch by Ben Chan; reviewed by jmckenzie for CASSANDRA-5483
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f5866ca2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f5866ca2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f5866ca2
Branch: refs/heads/trunk
Commit: f5866ca2bac8ce530bb4e20832ff61e622206815
Parents: 3916e48
Author: Ben Chan <us...@yahoo.com>
Authored: Wed Dec 3 13:15:10 2014 -0600
Committer: Joshua McKenzie <jm...@apache.org>
Committed: Wed Dec 3 13:15:10 2014 -0600
----------------------------------------------------------------------
conf/cassandra.yaml | 4 +
.../DebuggableThreadPoolExecutor.java | 15 +++
.../org/apache/cassandra/config/Config.java | 4 +
.../cassandra/config/DatabaseDescriptor.java | 10 ++
.../org/apache/cassandra/db/CFRowAdder.java | 13 +-
.../org/apache/cassandra/net/MessageOut.java | 7 +-
.../cassandra/net/OutboundTcpConnection.java | 4 +-
.../apache/cassandra/repair/LocalSyncTask.java | 39 +++++-
.../apache/cassandra/repair/RemoteSyncTask.java | 5 +-
.../org/apache/cassandra/repair/RepairJob.java | 4 +
.../apache/cassandra/repair/RepairSession.java | 23 +++-
.../org/apache/cassandra/repair/SyncTask.java | 3 +
.../org/apache/cassandra/repair/Validator.java | 6 +-
.../cassandra/repair/messages/RepairOption.java | 18 ++-
.../cassandra/service/ActiveRepairService.java | 2 +-
.../cassandra/service/StorageService.java | 128 ++++++++++++++++++-
.../cassandra/service/StorageServiceMBean.java | 2 +
.../org/apache/cassandra/tools/NodeProbe.java | 5 +
.../org/apache/cassandra/tools/NodeTool.java | 11 +-
.../cassandra/tracing/ExpiredTraceState.java | 4 +-
.../apache/cassandra/tracing/TraceKeyspace.java | 21 +--
.../apache/cassandra/tracing/TraceState.java | 87 ++++++++++++-
.../org/apache/cassandra/tracing/Tracing.java | 98 ++++++++++++--
23 files changed, 465 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/conf/cassandra.yaml
----------------------------------------------------------------------
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index 9c4f0b5..f458ed8 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -765,3 +765,7 @@ internode_compression: all
# reducing overhead from the TCP protocol itself, at the cost of increasing
# latency if you block for cross-datacenter responses.
inter_dc_tcp_nodelay: false
+
+# TTL for different trace types used during logging of the repair process.
+tracetype_query_ttl: 86400
+tracetype_repair_ttl: 604800
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index ea04af3..fe6cade 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -100,6 +100,21 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements
}
/**
+ * Creates a thread pool that creates new threads as needed, but
+ * will reuse previously constructed threads when they are
+ * available.
+ * @param threadPoolName the name of the threads created by this executor
+ * @return The new DebuggableThreadPoolExecutor
+ */
+ public static DebuggableThreadPoolExecutor createCachedThreadpoolWithMaxSize(String threadPoolName)
+ {
+ return new DebuggableThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>(),
+ new NamedThreadFactory(threadPoolName));
+ }
+
+ /**
* Returns a ThreadPoolExecutor with a fixed number of threads.
* When all threads are actively executing tasks, new tasks are queued.
* If (most) threads are expected to be idle most of the time, prefer createWithMaxSize() instead.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/config/Config.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java
index ca6276c..8c3021d 100644
--- a/src/java/org/apache/cassandra/config/Config.java
+++ b/src/java/org/apache/cassandra/config/Config.java
@@ -214,6 +214,10 @@ public class Config
private static final CsvPreference STANDARD_SURROUNDING_SPACES_NEED_QUOTES = new CsvPreference.Builder(CsvPreference.STANDARD_PREFERENCE)
.surroundingSpacesNeedQuotes(true).build();
+ // TTL for different types of trace events.
+ public Integer tracetype_query_ttl = 60 * 60 * 24;
+ public Integer tracetype_repair_ttl = 60 * 60 * 24 * 7;
+
public static boolean getOutboundBindAny()
{
return outboundBindAny;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
index a359cce..c34c6ea 100644
--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
@@ -1601,4 +1601,14 @@ public class DatabaseDescriptor
String arch = System.getProperty("os.arch");
return arch.contains("64") || arch.contains("sparcv9");
}
+
+ public static int getTracetypeRepairTTL()
+ {
+ return conf.tracetype_repair_ttl;
+ }
+
+ public static int getTracetypeQueryTTL()
+ {
+ return conf.tracetype_query_ttl;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/db/CFRowAdder.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/CFRowAdder.java b/src/java/org/apache/cassandra/db/CFRowAdder.java
index 3ff9171..6fab8d5 100644
--- a/src/java/org/apache/cassandra/db/CFRowAdder.java
+++ b/src/java/org/apache/cassandra/db/CFRowAdder.java
@@ -41,13 +41,20 @@ public class CFRowAdder
public final ColumnFamily cf;
public final Composite prefix;
public final long timestamp;
+ public final int ttl;
private final int ldt;
public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp)
{
+ this(cf, prefix, timestamp, 0);
+ }
+
+ public CFRowAdder(ColumnFamily cf, Composite prefix, long timestamp, int ttl)
+ {
this.cf = cf;
this.prefix = prefix;
this.timestamp = timestamp;
+ this.ttl = ttl;
this.ldt = (int) (System.currentTimeMillis() / 1000);
// If a CQL3 table, add the row marker
@@ -103,7 +110,11 @@ public class CFRowAdder
AbstractType valueType = def.type.isCollection()
? ((CollectionType) def.type).valueComparator()
: def.type;
- cf.addColumn(new BufferCell(name, value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value), timestamp));
+ ByteBuffer valueBytes = value instanceof ByteBuffer ? (ByteBuffer)value : valueType.decompose(value);
+ if (ttl == 0)
+ cf.addColumn(new BufferCell(name, valueBytes, timestamp));
+ else
+ cf.addColumn(new BufferExpiringCell(name, valueBytes, timestamp, ttl));
}
return this;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/net/MessageOut.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessageOut.java b/src/java/org/apache/cassandra/net/MessageOut.java
index 70c4f5c..5193c2b 100644
--- a/src/java/org/apache/cassandra/net/MessageOut.java
+++ b/src/java/org/apache/cassandra/net/MessageOut.java
@@ -36,6 +36,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.tracing.Tracing.TRACE_HEADER;
+import static org.apache.cassandra.tracing.Tracing.TRACE_TYPE;
import static org.apache.cassandra.tracing.Tracing.isTracing;
public class MessageOut<T>
@@ -57,8 +58,10 @@ public class MessageOut<T>
this(verb,
payload,
serializer,
- isTracing() ? ImmutableMap.of(TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId()))
- : Collections.<String, byte[]>emptyMap());
+ isTracing()
+ ? ImmutableMap.of(TRACE_HEADER, UUIDGen.decompose(Tracing.instance.getSessionId()),
+ TRACE_TYPE, new byte[] { Tracing.TraceType.serialize(Tracing.instance.getTraceType()) })
+ : Collections.<String, byte[]>emptyMap());
}
private MessageOut(MessagingService.Verb verb, T payload, IVersionedSerializer<T> serializer, Map<String, byte[]> parameters)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index a0ad011..cddce07 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -215,7 +215,9 @@ public class OutboundTcpConnection extends Thread
// session may have already finished; see CASSANDRA-5668
if (state == null)
{
- TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+ byte[] traceTypeBytes = qm.message.parameters.get(Tracing.TRACE_TYPE);
+ Tracing.TraceType traceType = traceTypeBytes == null ? Tracing.TraceType.QUERY : Tracing.TraceType.deserialize(traceTypeBytes[0]);
+ TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1, traceType.getTTL(), null);
}
else
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/LocalSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
index a43d326..bbb6362 100644
--- a/src/java/org/apache/cassandra/repair/LocalSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java
@@ -26,10 +26,13 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.streaming.ProgressInfo;
import org.apache.cassandra.streaming.StreamEvent;
import org.apache.cassandra.streaming.StreamEventHandler;
import org.apache.cassandra.streaming.StreamPlan;
import org.apache.cassandra.streaming.StreamState;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -37,6 +40,8 @@ import org.apache.cassandra.utils.FBUtilities;
*/
public class LocalSyncTask extends SyncTask implements StreamEventHandler
{
+ private final TraceState state = Tracing.instance.get();
+
private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class);
private final long repairedAt;
@@ -58,7 +63,9 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint;
InetAddress preferred = SystemKeyspace.getPreferredIP(dst);
- logger.info(String.format("[repair #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, differences.size(), dst));
+ String message = String.format("Performing streaming repair of %d ranges with %s", differences.size(), dst);
+ logger.info("[repair #{}] {}", desc.sessionId, message);
+ Tracing.traceRepair(message);
new StreamPlan("Repair", repairedAt, 1, false).listeners(this)
.flushBeforeTransfer(true)
// request ranges from the remote node
@@ -68,11 +75,37 @@ public class LocalSyncTask extends SyncTask implements StreamEventHandler
.execute();
}
- public void handleStreamEvent(StreamEvent event) { /* noop */ }
+ public void handleStreamEvent(StreamEvent event)
+ {
+ if (state == null)
+ return;
+ switch (event.eventType)
+ {
+ case STREAM_PREPARED:
+ StreamEvent.SessionPreparedEvent spe = (StreamEvent.SessionPreparedEvent) event;
+ state.trace("Streaming session with {} prepared", spe.session.peer);
+ break;
+ case STREAM_COMPLETE:
+ StreamEvent.SessionCompleteEvent sce = (StreamEvent.SessionCompleteEvent) event;
+ state.trace("Streaming session with {} {}", sce.peer, sce.success ? "completed successfully" : "failed");
+ break;
+ case FILE_PROGRESS:
+ ProgressInfo pi = ((StreamEvent.ProgressEvent) event).progress;
+ state.trace("{}/{} bytes ({}%) {} idx:{}{}",
+ new Object[] { pi.currentBytes,
+ pi.totalBytes,
+ pi.currentBytes * 100 / pi.totalBytes,
+ pi.direction == ProgressInfo.Direction.OUT ? "sent to" : "received from",
+ pi.sessionIndex,
+ pi.peer });
+ }
+ }
public void onSuccess(StreamState result)
{
- logger.info(String.format("[repair #%s] Sync complete between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily));
+ String message = String.format("Sync complete between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily);
+ logger.info("[repair #{}] {}", desc.sessionId, message);
+ Tracing.traceRepair(message);
set(stat);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
index ca5c998..ededc40 100644
--- a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
+++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java
@@ -28,6 +28,7 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.RepairException;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.SyncRequest;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
/**
@@ -49,7 +50,9 @@ public class RemoteSyncTask extends SyncTask
{
InetAddress local = FBUtilities.getBroadcastAddress();
SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences);
- logger.info(String.format("[repair #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", desc.sessionId, request.ranges.size(), request.src, request.dst));
+ String message = String.format("Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", request.ranges.size(), request.src, request.dst);
+ logger.info("[repair #{}] {}", desc.sessionId, message);
+ Tracing.traceRepair(message);
MessagingService.instance().sendOneWay(request.createMessage(), request.src);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RepairJob.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java
index 708ee70..5c649af 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.Pair;
@@ -180,6 +181,9 @@ public class RepairJob extends AbstractFuture<RepairResult> implements Runnable
*/
private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddress> endpoints)
{
+ String message = String.format("Requesting merkle trees for %s (to %s)", desc.columnFamily, endpoints);
+ logger.info("[repair #{}] {}", desc.sessionId, message);
+ Tracing.traceRepair(message);
int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis());
List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size());
for (InetAddress endpoint : endpoints)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index d9787e2..9a8f645 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -30,11 +30,12 @@ import com.google.common.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.*;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import org.apache.cassandra.utils.Pair;
@@ -99,7 +100,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>();
// Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor
- private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask")));
+ private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(DebuggableThreadPoolExecutor.createCachedThreadpoolWithMaxSize("RepairJobTask"));
private volatile boolean terminated = false;
@@ -172,7 +173,9 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
return;
}
- logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", getId(), desc.columnFamily, endpoint));
+ String message = String.format("Received merkle tree for %s from %s", desc.columnFamily, endpoint);
+ logger.info("[repair #{}] {}", getId(), message);
+ Tracing.traceRepair(message);
task.treeReceived(tree);
}
@@ -215,14 +218,17 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
*/
public void start(ListeningExecutorService executor)
{
+ String message;
if (terminated)
return;
logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames)));
+ Tracing.traceRepair("Syncing range {}", range);
if (endpoints.isEmpty())
{
- logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getId(), range));
+ logger.info("[repair #{}] {}", getId(), message = String.format("No neighbors to repair with on range %s: session completed", range));
+ Tracing.traceRepair(message);
set(new RepairSessionResult(id, keyspace, range, Lists.<RepairResult>newArrayList()));
return;
}
@@ -232,7 +238,7 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
{
if (!FailureDetector.instance.isAlive(endpoint))
{
- String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
+ message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint);
logger.error("[repair #{}] {}", getId(), message);
setException(new IOException(message));
return;
@@ -254,8 +260,10 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public void onSuccess(List<RepairResult> results)
{
// this repair session is completed
- logger.info(String.format("[repair #%s] session completed successfully", getId()));
+ logger.info("[repair #{}] {}", getId(), "Session completed successfully");
+ Tracing.traceRepair("Completed sync of range {}", range);
set(new RepairSessionResult(id, keyspace, range, results));
+
taskExecutor.shutdown();
// mark this session as terminated
terminate();
@@ -263,7 +271,8 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public void onFailure(Throwable t)
{
- logger.error("Repair job failed", t);
+ logger.error(String.format("[repair #%s] Session completed with the following error", getId()), t);
+ Tracing.traceRepair("Session completed with the following error: {}", t);
setException(t);
}
});
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/SyncTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java
index 3ce5532..7350a66 100644
--- a/src/java/org/apache/cassandra/repair/SyncTask.java
+++ b/src/java/org/apache/cassandra/repair/SyncTask.java
@@ -26,6 +26,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.MerkleTree;
/**
@@ -65,12 +66,14 @@ public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runna
if (differences.isEmpty())
{
logger.info(String.format(format, "are consistent"));
+ Tracing.traceRepair("Endpoint {} is consistent with {} for {}", r1.endpoint, r2.endpoint, desc.columnFamily);
set(stat);
return;
}
// non-0 difference: perform streaming repair
logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync"));
+ Tracing.traceRepair("Endpoint {} has {} range(s) out of sync with {} for {}", r1.endpoint, differences.size(), r2.endpoint, desc.columnFamily);
startSync(differences);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/Validator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/Validator.java b/src/java/org/apache/cassandra/repair/Validator.java
index 641717e..972afd6 100644
--- a/src/java/org/apache/cassandra/repair/Validator.java
+++ b/src/java/org/apache/cassandra/repair/Validator.java
@@ -34,6 +34,7 @@ import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.compaction.AbstractCompactedRow;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.ValidationComplete;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
@@ -241,7 +242,10 @@ public class Validator implements Runnable
{
// respond to the request that triggered this validation
if (!initiator.equals(FBUtilities.getBroadcastAddress()))
- logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s/%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily));
+ {
+ logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s.%s", desc.sessionId, initiator, desc.keyspace, desc.columnFamily));
+ Tracing.traceRepair("Sending completed merkle tree to {} for {}.{}", initiator, desc.keyspace, desc.columnFamily);
+ }
MessagingService.instance().sendOneWay(new ValidationComplete(desc, tree).createMessage(), initiator);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
index d11f94f..5987aed 100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@ -40,6 +40,7 @@ public class RepairOption
public static final String COLUMNFAMILIES_KEY = "columnFamilies";
public static final String DATACENTERS_KEY = "dataCenters";
public static final String HOSTS_KEY = "hosts";
+ public static final String TRACE_KEY = "trace";
// we don't want to push nodes too much for repair
public static final int MAX_JOB_THREADS = 4;
@@ -76,6 +77,11 @@ public class RepairOption
* <td>false</td>
* </tr>
* <tr>
+ * <td>trace</td>
+ * <td>"true" if repair is traced.</td>
+ * <td>false</td>
+ * </tr>
+ * <tr>
* <td>jobThreads</td>
* <td>Number of threads to use to run repair job.</td>
* <td>1</td>
@@ -117,6 +123,7 @@ public class RepairOption
RepairParallelism parallelism = RepairParallelism.fromName(options.get(PARALLELISM_KEY));
boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
+ boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
int jobThreads = 1;
if (options.containsKey(JOB_THREADS_KEY))
@@ -146,7 +153,7 @@ public class RepairOption
}
}
- RepairOption option = new RepairOption(parallelism, primaryRange, incremental, jobThreads, ranges);
+ RepairOption option = new RepairOption(parallelism, primaryRange, incremental, trace, jobThreads, ranges);
// data centers
String dataCentersStr = options.get(DATACENTERS_KEY);
@@ -203,6 +210,7 @@ public class RepairOption
private final RepairParallelism parallelism;
private final boolean primaryRange;
private final boolean incremental;
+ private final boolean trace;
private final int jobThreads;
private final Collection<String> columnFamilies = new HashSet<>();
@@ -210,11 +218,12 @@ public class RepairOption
private final Collection<String> hosts = new HashSet<>();
private final Collection<Range<Token>> ranges = new HashSet<>();
- public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, int jobThreads, Collection<Range<Token>> ranges)
+ public RepairOption(RepairParallelism parallelism, boolean primaryRange, boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> ranges)
{
this.parallelism = parallelism;
this.primaryRange = primaryRange;
this.incremental = incremental;
+ this.trace = trace;
this.jobThreads = jobThreads;
this.ranges.addAll(ranges);
}
@@ -234,6 +243,11 @@ public class RepairOption
return incremental;
}
+ public boolean isTraced()
+ {
+ return trace;
+ }
+
public int getJobThreads()
{
return jobThreads;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 252bcd1..e4b7fff 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -79,7 +79,7 @@ public class ActiveRepairService
public static enum Status
{
- STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
+ STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED, RUNNING
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index cf2152b..4ec23a6 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -55,6 +55,10 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.auth.Auth;
import org.apache.cassandra.concurrent.*;
import org.apache.cassandra.config.*;
+import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.cql3.QueryOptions;
+import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.statements.SelectStatement;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.commitlog.CommitLog;
import org.apache.cassandra.db.compaction.CompactionManager;
@@ -87,6 +91,9 @@ import org.apache.cassandra.thrift.EndpointDetails;
import org.apache.cassandra.thrift.TokenRange;
import org.apache.cassandra.thrift.cassandraConstants;
import org.apache.cassandra.tracing.TraceKeyspace;
+import org.apache.cassandra.tracing.TraceState;
+import org.apache.cassandra.tracing.Tracing;
+import org.apache.cassandra.transport.messages.ResultMessage;
import org.apache.cassandra.utils.*;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
@@ -2474,7 +2481,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
parallelismDegree = RepairParallelism.PARALLEL;
}
- RepairOption options = new RepairOption(parallelismDegree, primaryRange, !fullRepair, 1, Collections.<Range<Token>>emptyList());
+ RepairOption options = new RepairOption(parallelismDegree, primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList());
if (dataCenters != null)
{
options.getDataCenters().addAll(dataCenters);
@@ -2536,7 +2543,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
}
Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken);
- RepairOption options = new RepairOption(parallelismDegree, false, !fullRepair, 1, repairingRange);
+ RepairOption options = new RepairOption(parallelismDegree, false, !fullRepair, false, 1, repairingRange);
options.getDataCenters().addAll(dataCenters);
if (hosts != null)
{
@@ -2620,6 +2627,75 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return cmd;
}
+ private Thread createQueryThread(final int cmd, final UUID sessionId)
+ {
+ return new Thread(new WrappedRunnable()
+ {
+ // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces.
+ // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts.
+ public void runMayThrow() throws Exception
+ {
+ TraceState state = Tracing.instance.get(sessionId);
+ if (state == null)
+ throw new Exception("no tracestate");
+
+ String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;";
+ String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS_TABLE);
+ SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement;
+
+ ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+ InetAddress source = FBUtilities.getBroadcastAddress();
+
+ HashSet<UUID>[] seen = new HashSet[] { new HashSet<UUID>(), new HashSet<UUID>() };
+ int si = 0;
+ UUID uuid;
+
+ long tlast = System.currentTimeMillis(), tcur;
+
+ TraceState.Status status;
+ long minWaitMillis = 125;
+ long maxWaitMillis = 1000 * 1024L;
+ long timeout = minWaitMillis;
+ boolean shouldDouble = false;
+
+ while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED)
+ {
+ if (status == TraceState.Status.IDLE)
+ {
+ timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout;
+ shouldDouble = !shouldDouble;
+ }
+ else
+ {
+ timeout = minWaitMillis;
+ shouldDouble = false;
+ }
+ ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
+ ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
+ QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, tminBytes, tmaxBytes));
+ ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options);
+ UntypedResultSet result = UntypedResultSet.create(rows.result);
+
+ for (UntypedResultSet.Row r : result)
+ {
+ if (source.equals(r.getInetAddress("source")))
+ continue;
+ if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000)
+ seen[si].add(uuid);
+ if (seen[si == 0 ? 1 : 0].contains(uuid))
+ continue;
+ String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity"));
+ sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.RUNNING.ordinal()});
+ }
+ tlast = tcur;
+
+ si = si == 0 ? 1 : 0;
+ seen[si].clear();
+ }
+ }
+ });
+ }
+
private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options)
{
if (!options.getDataCenters().isEmpty() && options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
@@ -2631,10 +2707,34 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
protected void runMayThrow() throws Exception
{
+ final TraceState traceState;
+
+ String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
+ Iterable<ColumnFamilyStore> validColumnFamilies = getValidColumnFamilies(false, false, keyspace, columnFamilies);
+
final long startTime = System.currentTimeMillis();
String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, options);
logger.info(message);
sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()});
+ if (options.isTraced())
+ {
+ StringBuilder cfsb = new StringBuilder();
+ for (ColumnFamilyStore cfs : validColumnFamilies)
+ cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name);
+
+ UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR);
+ traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", cfsb.substring(2)));
+ Tracing.traceRepair(message);
+ traceState.enableActivityNotification();
+ traceState.setNotificationHandle(new int[]{ cmd, ActiveRepairService.Status.RUNNING.ordinal() });
+ Thread queryThread = createQueryThread(cmd, sessionId);
+ queryThread.setName("RepairTracePolling");
+ queryThread.start();
+ }
+ else
+ {
+ traceState = null;
+ }
final Set<InetAddress> allNeighbors = new HashSet<>();
Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
@@ -2656,10 +2756,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
// Validate columnfamilies
List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
- String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]);
try
{
- Iterables.addAll(columnFamilyStores, getValidColumnFamilies(false, false, keyspace, columnFamilies));
+ Iterables.addAll(columnFamilyStores, validColumnFamilies);
}
catch (IllegalArgumentException e)
{
@@ -2760,12 +2859,23 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
private void repairComplete()
{
- String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
- true, true);
+ String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true);
String message = String.format("Repair command #%d finished in %s", cmd, duration);
sendNotification("repair", message,
new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()});
logger.info(message);
+ if (options.isTraced())
+ {
+ traceState.setNotificationHandle(null);
+ // Because DebuggableThreadPoolExecutor#afterExecute and this callback
+ // run in a nondeterministic order (within the same thread), the
+ // TraceState may have been nulled out at this point. The TraceState
+ // should be traceState, so just set it without bothering to check if it
+ // actually was nulled out.
+ Tracing.instance.set(traceState);
+ Tracing.traceRepair(message);
+ Tracing.instance.stopSession();
+ }
executor.shutdownNow();
}
});
@@ -3774,6 +3884,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
return Collections.unmodifiableList(keyspaceNamesList);
}
+ public List<String> getNonSystemKeyspaces()
+ {
+ List<String> keyspaceNamesList = new ArrayList<>(Schema.instance.getNonSystemKeyspaces());
+ return Collections.unmodifiableList(keyspaceNamesList);
+ }
+
public void updateSnitch(String epSnitchClassName, Boolean dynamic, Integer dynamicUpdateInterval, Integer dynamicResetInterval, Double dynamicBadnessThreshold) throws ClassNotFoundException
{
IEndpointSnitch oldSnitch = DatabaseDescriptor.getEndpointSnitch();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 1865f7f..70b2b81 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -386,6 +386,8 @@ public interface StorageServiceMBean extends NotificationEmitter
public List<String> getKeyspaces();
+ public List<String> getNonSystemKeyspaces();
+
/**
* Change endpointsnitch class and dynamic-ness (and dynamic attributes) at runtime
* @param epSnitchClassName the canonical path name for a class implementing IEndpointSnitch
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index d749481..ea24530 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -737,6 +737,11 @@ public class NodeProbe implements AutoCloseable
return ssProxy.getKeyspaces();
}
+ public List<String> getNonSystemKeyspaces()
+ {
+ return ssProxy.getNonSystemKeyspaces();
+ }
+
public String getClusterName()
{
return ssProxy.getClusterName();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tools/NodeTool.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java
index 612af8a..9314225 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -318,10 +318,15 @@ public class NodeTool
protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe)
{
+ return parseOptionalKeyspace(cmdArgs, nodeProbe, false);
+ }
+
+ protected List<String> parseOptionalKeyspace(List<String> cmdArgs, NodeProbe nodeProbe, boolean includeSystemKS)
+ {
List<String> keyspaces = new ArrayList<>();
if (cmdArgs == null || cmdArgs.isEmpty())
- keyspaces.addAll(nodeProbe.getKeyspaces());
+ keyspaces.addAll(includeSystemKS ? nodeProbe.getKeyspaces() : nodeProbe.getNonSystemKeyspaces());
else
keyspaces.add(cmdArgs.get(0));
@@ -1757,6 +1762,9 @@ public class NodeTool
"WARNING: increasing this puts more load on repairing nodes, so be careful. (default: 1, max: 4)")
private int numJobThreads = 1;
+ @Option(title = "trace_repair", name = {"-tr", "--trace"}, description = "Use -tr to trace the repair. Traces are logged to system_traces.events.")
+ private boolean trace = false;
+
@Override
public void execute(NodeProbe probe)
{
@@ -1778,6 +1786,7 @@ public class NodeTool
options.put(RepairOption.PRIMARY_RANGE_KEY, Boolean.toString(primaryRange));
options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!fullRepair));
options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(numJobThreads));
+ options.put(RepairOption.TRACE_KEY, Boolean.toString(trace));
options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ","));
if (!startToken.isEmpty() || !endToken.isEmpty())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
index 37a013b..5cc3c21 100644
--- a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -27,9 +27,9 @@ import org.apache.cassandra.utils.FBUtilities;
public class ExpiredTraceState extends TraceState
{
- public ExpiredTraceState(UUID sessionId)
+ public ExpiredTraceState(UUID sessionId, Tracing.TraceType traceType)
{
- super(FBUtilities.getBroadcastAddress(), sessionId);
+ super(FBUtilities.getBroadcastAddress(), sessionId, traceType);
}
public int elapsed()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
index 4d234bd..72a7c47 100644
--- a/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
+++ b/src/java/org/apache/cassandra/tracing/TraceKeyspace.java
@@ -39,8 +39,8 @@ public final class TraceKeyspace
{
public static final String NAME = "system_traces";
- private static final String SESSIONS_TABLE = "sessions";
- private static final String EVENTS_TABLE = "events";
+ public static final String SESSIONS_TABLE = "sessions";
+ public static final String EVENTS_TABLE = "events";
private static final int DAY = (int) TimeUnit.DAYS.toSeconds(1);
@@ -48,6 +48,7 @@ public final class TraceKeyspace
compile(SESSIONS_TABLE, "tracing sessions",
"CREATE TABLE %s ("
+ "session_id uuid,"
+ + "command text,"
+ "coordinator inet,"
+ "duration int,"
+ "parameters map<text, text>,"
@@ -79,38 +80,42 @@ public final class TraceKeyspace
return new KSMetaData(NAME, SimpleStrategy.class, ImmutableMap.of("replication_factor", "2"), true, tables);
}
- static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed)
+ static Mutation toStopSessionMutation(ByteBuffer sessionId, int elapsed, int ttl)
{
Mutation mutation = new Mutation(NAME, sessionId);
ColumnFamily cells = mutation.addOrGet(SessionsTable);
- CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
+ ttl = ttl == DAY ? 0 : ttl;
+ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl);
adder.add("duration", elapsed);
return mutation;
}
- static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, String> parameters, String request, long startedAt)
+ static Mutation toStartSessionMutation(ByteBuffer sessionId, Map<String, String> parameters, String request, long startedAt, String command, int ttl)
{
Mutation mutation = new Mutation(NAME, sessionId);
ColumnFamily cells = mutation.addOrGet(TraceKeyspace.SessionsTable);
- CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros());
+ ttl = ttl == DAY ? 0 : ttl;
+ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.builder().build(), FBUtilities.timestampMicros(), ttl);
adder.add("coordinator", FBUtilities.getBroadcastAddress());
for (Map.Entry<String, String> entry : parameters.entrySet())
adder.addMapEntry("parameters", entry.getKey(), entry.getValue());
adder.add("request", request);
adder.add("started_at", new Date(startedAt));
+ adder.add("command", command);
return mutation;
}
- static Mutation toEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName)
+ static Mutation toEventMutation(ByteBuffer sessionId, String message, int elapsed, String threadName, int ttl)
{
Mutation mutation = new Mutation(NAME, sessionId);
ColumnFamily cells = mutation.addOrGet(EventsTable);
- CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros());
+ ttl = ttl == DAY ? 0 : ttl;
+ CFRowAdder adder = new CFRowAdder(cells, cells.metadata().comparator.make(UUIDGen.getTimeUUID()), FBUtilities.timestampMicros(), ttl);
adder.add("activity", message);
adder.add("source", FBUtilities.getBroadcastAddress());
if (elapsed >= 0)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 04abce3..f7d2741 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -28,6 +28,7 @@ import org.slf4j.helpers.MessageFormatter;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.WrappedRunnable;
@@ -41,6 +42,20 @@ public class TraceState
public final InetAddress coordinator;
public final Stopwatch watch;
public final ByteBuffer sessionIdBytes;
+ public final Tracing.TraceType traceType;
+ public final int ttl;
+
+ private boolean notify;
+ private Object notificationHandle;
+
+ public enum Status
+ {
+ IDLE,
+ ACTIVE,
+ STOPPED;
+ }
+
+ private Status status;
// Multiple requests can use the same TraceState at a time, so we need to reference count.
// See CASSANDRA-7626 for more details.
@@ -48,13 +63,33 @@ public class TraceState
public TraceState(InetAddress coordinator, UUID sessionId)
{
+ this(coordinator, sessionId, Tracing.TraceType.QUERY);
+ }
+
+ public TraceState(InetAddress coordinator, UUID sessionId, Tracing.TraceType traceType)
+ {
assert coordinator != null;
assert sessionId != null;
this.coordinator = coordinator;
this.sessionId = sessionId;
sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+ this.traceType = traceType;
+ this.ttl = traceType.getTTL();
watch = Stopwatch.createStarted();
+ this.status = Status.IDLE;
+ }
+
+ public void enableActivityNotification()
+ {
+ assert traceType == Tracing.TraceType.REPAIR;
+ notify = true;
+ }
+
+ public void setNotificationHandle(Object handle)
+ {
+ assert traceType == Tracing.TraceType.REPAIR;
+ notificationHandle = handle;
}
public int elapsed()
@@ -63,6 +98,46 @@ public class TraceState
return elapsed < Integer.MAX_VALUE ? (int) elapsed : Integer.MAX_VALUE;
}
+ public synchronized void stop()
+ {
+ status = Status.STOPPED;
+ notifyAll();
+ }
+
+ /*
+ * Returns immediately if there has been trace activity since the last
+ * call, otherwise waits until there is trace activity, or until the
+ * timeout expires.
+ * @param timeout timeout in milliseconds
+ * @return activity status
+ */
+ public synchronized Status waitActivity(long timeout)
+ {
+ if (status == Status.IDLE)
+ {
+ try
+ {
+ wait(timeout);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException();
+ }
+ }
+ if (status == Status.ACTIVE)
+ {
+ status = Status.IDLE;
+ return Status.ACTIVE;
+ }
+ return status;
+ }
+
+ private synchronized void notifyActivity()
+ {
+ status = Status.ACTIVE;
+ notifyAll();
+ }
+
public void trace(String format, Object arg)
{
trace(MessageFormatter.format(format, arg).getMessage());
@@ -80,18 +155,24 @@ public class TraceState
public void trace(String message)
{
- TraceState.trace(sessionIdBytes, message, elapsed());
+ if (notify)
+ notifyActivity();
+
+ TraceState.trace(sessionIdBytes, message, elapsed(), ttl, notificationHandle);
}
- public static void trace(final ByteBuffer sessionId, final String message, final int elapsed)
+ public static void trace(final ByteBuffer sessionId, final String message, final int elapsed, final int ttl, final Object notificationHandle)
{
final String threadName = Thread.currentThread().getName();
+ if (notificationHandle != null)
+ StorageService.instance.sendNotification("repair", message, notificationHandle);
+
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
{
public void runMayThrow()
{
- Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, elapsed, threadName));
+ Tracing.mutateWithCatch(TraceKeyspace.toEventMutation(sessionId, message, elapsed, threadName, ttl));
}
});
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f5866ca2/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 773ccd4..5e76957 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.exceptions.OverloadedException;
@@ -43,6 +44,7 @@ import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
+
/**
* A trace session context. Able to track and store trace sessions. A session is usually a user initiated query, and may
* have multiple local and remote events before it is completed. All events and sessions are stored at keyspace.
@@ -50,6 +52,38 @@ import org.apache.cassandra.utils.UUIDGen;
public class Tracing
{
public static final String TRACE_HEADER = "TraceSession";
+ public static final String TRACE_TYPE = "TraceType";
+ public static final String TRACE_TTL = "TraceTTL";
+
+ public enum TraceType
+ {
+ NONE,
+ QUERY,
+ REPAIR;
+
+ private static final TraceType[] ALL_VALUES = values();
+
+ public static TraceType deserialize(byte b)
+ {
+ if (b < 0 || ALL_VALUES.length <= b)
+ return NONE;
+ return ALL_VALUES[b];
+ }
+
+ public static byte serialize(TraceType value)
+ {
+ return (byte) value.ordinal();
+ }
+
+ private static final int[] TTLS = { DatabaseDescriptor.getTracetypeQueryTTL(),
+ DatabaseDescriptor.getTracetypeQueryTTL(),
+ DatabaseDescriptor.getTracetypeRepairTTL() };
+
+ public int getTTL()
+ {
+ return TTLS[ordinal()];
+ }
+ }
private static final Logger logger = LoggerFactory.getLogger(Tracing.class);
@@ -67,6 +101,18 @@ public class Tracing
return state.get().sessionId;
}
+ public TraceType getTraceType()
+ {
+ assert isTracing();
+ return state.get().traceType;
+ }
+
+ public int getTTL()
+ {
+ assert isTracing();
+ return state.get().ttl;
+ }
+
/**
* Indicates if the current thread's execution is being traced.
*/
@@ -77,14 +123,24 @@ public class Tracing
public UUID newSession()
{
- return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())));
+ return newSession(TraceType.QUERY);
+ }
+
+ public UUID newSession(TraceType traceType)
+ {
+ return newSession(TimeUUIDType.instance.compose(ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes())), traceType);
}
public UUID newSession(UUID sessionId)
{
+ return newSession(sessionId, TraceType.QUERY);
+ }
+
+ public UUID newSession(UUID sessionId, TraceType traceType)
+ {
assert state.get() == null;
- TraceState ts = new TraceState(localAddress, sessionId);
+ TraceState ts = new TraceState(localAddress, sessionId, traceType);
state.set(ts);
sessions.put(sessionId, ts);
@@ -111,15 +167,17 @@ public class Tracing
{
final int elapsed = state.elapsed();
final ByteBuffer sessionId = state.sessionIdBytes;
+ final int ttl = state.ttl;
StageManager.getStage(Stage.TRACING).execute(new Runnable()
{
public void run()
{
- mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed));
+ mutateWithCatch(TraceKeyspace.toStopSessionMutation(sessionId, elapsed, ttl));
}
});
+ state.stop();
sessions.remove(state.sessionId);
this.state.set(null);
}
@@ -140,20 +198,25 @@ public class Tracing
state.set(tls);
}
- public void begin(final String request, final Map<String, String> parameters)
+ public TraceState begin(final String request, final Map<String, String> parameters)
{
assert isTracing();
+ final TraceState state = this.state.get();
final long startedAt = System.currentTimeMillis();
- final ByteBuffer sessionId = state.get().sessionIdBytes;
+ final ByteBuffer sessionId = state.sessionIdBytes;
+ final String command = state.traceType.toString();
+ final int ttl = state.ttl;
StageManager.getStage(Stage.TRACING).execute(new Runnable()
{
public void run()
{
- mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, request, startedAt));
+ mutateWithCatch(TraceKeyspace.toStartSessionMutation(sessionId, parameters, request, startedAt, command, ttl));
}
});
+
+ return state;
}
/**
@@ -163,7 +226,7 @@ public class Tracing
*/
public TraceState initializeFromMessage(final MessageIn<?> message)
{
- final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
+ final byte[] sessionBytes = message.parameters.get(TRACE_HEADER);
if (sessionBytes == null)
return null;
@@ -174,19 +237,36 @@ public class Tracing
if (ts != null && ts.acquireReference())
return ts;
+ byte[] tmpBytes;
+ TraceType traceType = TraceType.QUERY;
+ if ((tmpBytes = message.parameters.get(TRACE_TYPE)) != null)
+ traceType = TraceType.deserialize(tmpBytes[0]);
+
if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
{
// received a message for a session we've already closed out. see CASSANDRA-5668
- return new ExpiredTraceState(sessionId);
+ return new ExpiredTraceState(sessionId, traceType);
}
else
{
- ts = new TraceState(message.from, sessionId);
+ ts = new TraceState(message.from, sessionId, traceType);
sessions.put(sessionId, ts);
return ts;
}
}
+
+ // repair just gets a varargs method since it's so heavyweight anyway
+ public static void traceRepair(String format, Object... args)
+ {
+ final TraceState state = instance.get();
+ if (state == null) // inline isTracing to avoid implicit two calls to state.get()
+ return;
+
+ state.trace(format, args);
+ }
+
+ // normal traces get zero-, one-, and two-argument overloads so common case doesn't need to create varargs array
public static void trace(String message)
{
final TraceState state = instance.get();