You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2012/08/29 20:18:20 UTC
[2/3] add request tracing patch by David Alves;
reviewed by jbellis for CASSANDRA-1123
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0fea181..9d95fad 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -37,6 +37,9 @@ import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+
+import org.apache.cassandra.tracing.Tracing;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,7 +80,9 @@ public final class MessagingService implements MessagingServiceMBean
public static final int VERSION_12 = 5;
public static final int current_version = VERSION_12;
- /** we preface every message with this number so the recipient can validate the sender is sane */
+ /**
+ * we preface every message with this number so the recipient can validate the sender is sane
+ */
static final int PROTOCOL_MAGIC = 0xCA552DFA;
/* All verb handler identifiers */
@@ -113,6 +118,7 @@ public final class MessagingService implements MessagingServiceMBean
SNAPSHOT, // Similar to nt snapshot
MIGRATION_REQUEST,
GOSSIP_SHUTDOWN,
+ _TRACE, // dummy verb so we can use MS.droppedMessages
// use as padding for backwards compatability where a previous version needs to validate a verb from the future.
UNUSED_1,
UNUSED_2,
@@ -120,6 +126,7 @@ public final class MessagingService implements MessagingServiceMBean
;
// remember to add new verbs at the end, since we serialize by ordinal
}
+
public static final Verb[] VERBS = Verb.values();
public static final EnumMap<MessagingService.Verb, Stage> verbStages = new EnumMap<MessagingService.Verb, Stage>(MessagingService.Verb.class)
@@ -239,14 +246,15 @@ public final class MessagingService implements MessagingServiceMBean
/* Lookup table for registering message handlers based on the verb. */
private final Map<Verb, IVerbHandler> verbHandlers;
- /** One executor per destination InetAddress for streaming.
- *
+ /**
+ * One executor per destination InetAddress for streaming.
+ * <p/>
* See CASSANDRA-3494 for the background. We have streaming in place so we do not want to limit ourselves to
* one stream at a time for throttling reasons. But, we also do not want to just arbitrarily stream an unlimited
* amount of files at once because a single destination might have hundreds of files pending and it would cause a
* seek storm. So, transfer exactly one file per destination host. That puts a very natural rate limit on it, in
* addition to mapping well to the expected behavior in many cases.
- *
+ * <p/>
* We will create our stream executors with a core size of 0 so that they time out and do not consume threads. This
* means the overhead in the degenerate case of having streamed to everyone in the ring over time as a ring changes,
* is not going to be a thread per node - but rather an instance per node. That's totally fine.
@@ -268,11 +276,11 @@ public final class MessagingService implements MessagingServiceMBean
* drop internal messages like bootstrap or repair notifications.
*/
public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb.BINARY,
- Verb.MUTATION,
- Verb.READ_REPAIR,
- Verb.READ,
- Verb.RANGE_SLICE,
- Verb.REQUEST_RESPONSE);
+ Verb.MUTATION,
+ Verb.READ_REPAIR,
+ Verb.READ,
+ Verb.RANGE_SLICE,
+ Verb.REQUEST_RESPONSE);
// total dropped message counts for server lifetime
private final Map<Verb, AtomicInteger> droppedMessages = new EnumMap<Verb, AtomicInteger>(Verb.class);
@@ -364,7 +372,8 @@ public final class MessagingService implements MessagingServiceMBean
/**
* Track latency information for the dynamic snitch
- * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in
+ *
+ * @param cb the callback associated with this message -- this lets us know if it's a message type we're interested in
* @param address the host that replied to the message
* @param latency
*/
@@ -380,7 +389,9 @@ public final class MessagingService implements MessagingServiceMBean
subscriber.receiveTiming(address, latency);
}
- /** called from gossiper when it notices a node is not responding. */
+ /**
+ * called from gossiper when it notices a node is not responding.
+ */
public void convict(InetAddress ep)
{
logger.debug("Resetting pool for " + ep);
@@ -389,12 +400,13 @@ public final class MessagingService implements MessagingServiceMBean
/**
* Listen on the specified port.
+ *
* @param localEp InetAddress whose port to listen on.
*/
public void listen(InetAddress localEp) throws IOException, ConfigurationException
{
callbacks.reset(); // hack to allow tests to stop/restart MS
- for (ServerSocket ss: getServerSocket(localEp))
+ for (ServerSocket ss : getServerSocket(localEp))
{
SocketThread th = new SocketThread(ss, "ACCEPT-" + localEp);
th.start();
@@ -405,7 +417,7 @@ public final class MessagingService implements MessagingServiceMBean
private List<ServerSocket> getServerSocket(InetAddress localEp) throws IOException, ConfigurationException
{
- final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
+ final List<ServerSocket> ss = new ArrayList<ServerSocket>(2);
if (DatabaseDescriptor.getEncryptionOptions().internode_encryption != EncryptionOptions.InternodeEncryption.none)
{
ss.add(SSLFactory.getServerSocket(DatabaseDescriptor.getEncryptionOptions(), localEp, DatabaseDescriptor.getSSLStoragePort()));
@@ -427,7 +439,7 @@ public final class MessagingService implements MessagingServiceMBean
throw new ConfigurationException(address + " is in use by another process. Change listen_address:storage_port in cassandra.yaml to values that do not conflict with other services");
else if (e.getMessage().contains("Cannot assign requested address"))
throw new ConfigurationException("Unable to bind to address " + address
- + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
+ + ". Set listen_address in cassandra.yaml to an interface you can bind to, e.g., your private IP address on EC2");
else
throw e;
}
@@ -467,6 +479,7 @@ public final class MessagingService implements MessagingServiceMBean
/**
* Register a verb and the corresponding verb handler with the
* Messaging Service.
+ *
* @param verb
* @param verbHandler handler for the specified verb
*/
@@ -479,6 +492,7 @@ public final class MessagingService implements MessagingServiceMBean
/**
* This method returns the verb handler associated with the registered
* verb. If no handler has been registered then null is returned.
+ *
* @param type for which the verb handler is sought
* @return a reference to IVerbHandler which is the handler for the specified verb
*/
@@ -503,6 +517,7 @@ public final class MessagingService implements MessagingServiceMBean
}
private static final AtomicInteger idGen = new AtomicInteger(0);
+
// TODO make these integers to avoid unnecessary int -> string -> int conversions
private static String nextId()
{
@@ -522,11 +537,12 @@ public final class MessagingService implements MessagingServiceMBean
* which is invoked with the actual response.
* Also holds the message (only mutation messages) to determine if it
* needs to trigger a hint (uses StorageProxy for that).
+ *
* @param message message to be sent.
- * @param to endpoint to which the message needs to be sent
- * @param cb callback interface which is used to pass the responses or
- * suggest that a timeout occurred to the invoker of the send().
- * suggest that a timeout occurred to the invoker of the send().
+ * @param to endpoint to which the message needs to be sent
+ * @param cb callback interface which is used to pass the responses or
+ * suggest that a timeout occurred to the invoker of the send().
+ * suggest that a timeout occurred to the invoker of the send().
* @param timeout the timeout used for expiration
* @return an reference to message id used to match with the result
*/
@@ -550,8 +566,9 @@ public final class MessagingService implements MessagingServiceMBean
/**
* Send a message to a given endpoint. This method adheres to the fire and forget
* style messaging.
+ *
* @param message messages to be sent.
- * @param to endpoint to which the message needs to be sent
+ * @param to endpoint to which the message needs to be sent
*/
public void sendOneWay(MessageOut message, String id, InetAddress to)
{
@@ -585,9 +602,10 @@ public final class MessagingService implements MessagingServiceMBean
/**
* Stream a file from source to destination. This is highly optimized
* to not hold any of the contents of the file in memory.
+ *
* @param header Header contains file to stream and other metadata.
- * @param to endpoint to which we need to stream the file.
- */
+ * @param to endpoint to which we need to stream the file.
+ */
public void stream(StreamHeader header, InetAddress to)
{
@@ -605,8 +623,8 @@ public final class MessagingService implements MessagingServiceMBean
}
executor.execute(header.file == null || header.file.compressionInfo == null
- ? new FileStreamTask(header, to)
- : new CompressedFileStreamTask(header, to));
+ ? new FileStreamTask(header, to)
+ : new CompressedFileStreamTask(header, to));
}
public void incrementActiveStreamsOutbound()
@@ -619,7 +637,9 @@ public final class MessagingService implements MessagingServiceMBean
activeStreamsOutbound.decrementAndGet();
}
- /** The count of active outbound stream tasks. */
+ /**
+ * The count of active outbound stream tasks.
+ */
public int getActiveStreamsOutbound()
{
return activeStreamsOutbound.get();
@@ -675,9 +695,10 @@ public final class MessagingService implements MessagingServiceMBean
public void receive(MessageIn message, String id)
{
+ Tracing.instance().initializeFromMessage(message);
+
if (logger.isTraceEnabled())
- logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.verb
- + " from " + id + "@" + message.from);
+ logger.trace(FBUtilities.getBroadcastAddress() + " received " + message.verb + " from " + id + "@" + message.from);
message = SinkManager.processInboundMessage(message, id);
if (message == null)
@@ -815,7 +836,7 @@ public final class MessagingService implements MessagingServiceMBean
{
logTpstats = true;
logger.info("{} {} messages dropped in last {}ms",
- new Object[] {recent, verb, LOG_DROPPED_INTERVAL_IN_MS});
+ new Object[]{ recent, verb, LOG_DROPPED_INTERVAL_IN_MS });
lastDroppedInternal.put(verb, dropped.get());
}
}
@@ -875,7 +896,7 @@ public final class MessagingService implements MessagingServiceMBean
OutboundTcpConnectionPool connection = connectionManagers.get(address);
return connection == null ? 0 : connection.cmdCon.getPendingMessages();
}
-
+
public Map<String, Long> getCommandCompletedTasks()
{
Map<String, Long> completedTasks = new HashMap<String, Long>();
@@ -945,7 +966,7 @@ public final class MessagingService implements MessagingServiceMBean
public Map<String, Long> getTimeoutsPerHost()
{
Map<String, Long> result = new HashMap<String, Long>();
- for (Map.Entry<String, AtomicLong> entry: timeoutsPerHost.entrySet())
+ for (Map.Entry<String, AtomicLong> entry : timeoutsPerHost.entrySet())
{
result.put(entry.getKey(), entry.getValue().get());
}
@@ -955,7 +976,7 @@ public final class MessagingService implements MessagingServiceMBean
public Map<String, Long> getRecentTimeoutsPerHost()
{
Map<String, Long> result = new HashMap<String, Long>();
- for (Map.Entry<String, AtomicLong> entry: recentTimeoutsPerHost.entrySet())
+ for (Map.Entry<String, AtomicLong> entry : recentTimeoutsPerHost.entrySet())
{
String ip = entry.getKey();
AtomicLong recent = entry.getValue();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 22a2066..efe6564 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -194,8 +194,8 @@ public class CassandraDaemon
// check the system table to keep user from shooting self in foot by changing partitioner, cluster name, etc.
// we do a one-off scrub of the system table first; we can't load the list of the rest of the tables,
// until system table is opened.
- for (CFMetaData cfm : Schema.instance.getTableMetaData(Table.SYSTEM_TABLE).values())
- ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_TABLE, cfm.cfName);
+ for (CFMetaData cfm : Schema.instance.getTableMetaData(Table.SYSTEM_KS).values())
+ ColumnFamilyStore.scrubDataDirectories(Table.SYSTEM_KS, cfm.cfName);
try
{
SystemTable.checkHealth();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/ClientState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java
index 89b3715..a04f3f1 100644
--- a/src/java/org/apache/cassandra/service/ClientState.java
+++ b/src/java/org/apache/cassandra/service/ClientState.java
@@ -32,8 +32,11 @@ import org.apache.cassandra.cql.CQLStatement;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.thrift.AuthenticationException;
import org.apache.cassandra.thrift.InvalidRequestException;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.SemanticVersion;
+import static org.apache.cassandra.tracing.Tracing.instance;
+
/**
* A container for per-client, thread-local state that Avro/Thrift threads must hold.
* TODO: Kill thrift exceptions
@@ -47,6 +50,8 @@ public class ClientState
// Current user for the session
private AuthenticatedUser user;
private String keyspace;
+ private UUID preparedTracingSession;
+
// Reusable array for authorization
private final List<Object> resource = new ArrayList<Object>();
private SemanticVersion cqlVersion = DEFAULT_CQL_VERSION;
@@ -103,6 +108,36 @@ public class ClientState
keyspace = ks;
}
+ public boolean traceNextQuery()
+ {
+ if (preparedTracingSession != null)
+ {
+ return true;
+ }
+
+ double tracingProbability = StorageService.instance.getTracingProbability();
+ return tracingProbability != 0 && FBUtilities.threadLocalRandom().nextDouble() < tracingProbability;
+ }
+
+ public void prepareTracingSession(UUID sessionId)
+ {
+ this.preparedTracingSession = sessionId;
+ }
+
+ public void createSession()
+ {
+ if (this.preparedTracingSession == null)
+ {
+ instance().newSession();
+ }
+ else
+ {
+ UUID session = this.preparedTracingSession;
+ this.preparedTracingSession = null;
+ instance().newSession(session);
+ }
+ }
+
public String getSchedulingValue()
{
switch(DatabaseDescriptor.getRequestSchedulerId())
@@ -141,6 +176,7 @@ public class ClientState
{
user = DatabaseDescriptor.getAuthenticator().defaultUser();
keyspace = null;
+ preparedTracingSession = null;
resourceClear();
prepared.clear();
cql3Prepared.clear();
@@ -174,7 +210,7 @@ public class ClientState
validateKeyspace(keyspace);
// hardcode disallowing messing with system keyspace
- if (keyspace.equalsIgnoreCase(Table.SYSTEM_TABLE) && perm == Permission.WRITE)
+ if (keyspace.equalsIgnoreCase(Table.SYSTEM_KS) && perm == Permission.WRITE)
throw new InvalidRequestException("system keyspace is not user-modifiable");
resourceClear();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index dd6391c..a31cbbc 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -297,7 +297,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public static UUID getLastMigrationId()
{
DecoratedKey dkey = StorageService.getPartitioner().decorateKey(LAST_MIGRATION_KEY);
- Table defs = Table.open(Table.SYSTEM_TABLE);
+ Table defs = Table.open(Table.SYSTEM_KS);
ColumnFamilyStore cfStore = defs.getColumnFamilyStore(DefsTable.OLD_SCHEMA_CF);
QueryFilter filter = QueryFilter.getNamesFilter(dkey, new QueryPath(DefsTable.OLD_SCHEMA_CF), LAST_MIGRATION_KEY);
ColumnFamily cf = cfStore.getColumnFamily(filter);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index f308405..5b85a84 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -588,7 +588,7 @@ public class StorageProxy implements StorageProxyMBean
private static boolean systemTableQuery(List<ReadCommand> cmds)
{
for (ReadCommand cmd : cmds)
- if (!cmd.table.equals(Table.SYSTEM_TABLE))
+ if (!cmd.table.equals(Table.SYSTEM_KS))
return false;
return true;
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 8abd3e8..7f0714a 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.service.AntiEntropyService.RepairFuture;
import org.apache.cassandra.service.AntiEntropyService.TreeRequestVerbHandler;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.thrift.*;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
/**
@@ -152,6 +153,9 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
private boolean initialized;
private volatile boolean joined = false;
+ /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
+ private double tracingProbability = 0.0;
+
private static enum Mode { NORMAL, CLIENT, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
private Mode operationMode;
@@ -429,7 +433,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
@Override
public void runMayThrow() throws ExecutionException, InterruptedException, IOException
{
- ThreadPoolExecutor mutationStage = StageManager.getStage(Stage.MUTATION);
+ ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
if (mutationStage.isShutdown())
return; // drained already
@@ -821,6 +825,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
// Dont set any state for the node which is bootstrapping the existing token...
tokenMetadata.updateNormalTokens(tokens, FBUtilities.getBroadcastAddress());
}
+ Tracing.instance();
setMode(Mode.JOINING, "Starting to bootstrap...", true);
new BootStrapper(FBUtilities.getBroadcastAddress(), tokens, tokenMetadata).bootstrap(); // handles token update
}
@@ -1892,7 +1897,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public void forceTableCleanup(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException
{
- if (tableName.equals(Table.SYSTEM_TABLE))
+ if (tableName.equals(Table.SYSTEM_KS))
throw new RuntimeException("Cleanup of the system table is neither necessary nor wise");
NodeId.OneShotRenewer nodeIdRenewer = new NodeId.OneShotRenewer();
@@ -2067,7 +2072,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
*/
public void forceTableRepair(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
{
- if (Table.SYSTEM_TABLE.equals(tableName))
+ if (Table.SYSTEM_KS.equals(tableName))
return;
Collection<Range<Token>> ranges = getLocalRanges(tableName);
@@ -2116,7 +2121,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public void forceTableRepairPrimaryRange(final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
{
- if (Table.SYSTEM_TABLE.equals(tableName))
+ if (Table.SYSTEM_KS.equals(tableName))
return;
List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>();
@@ -2134,7 +2139,7 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
public void forceTableRepairRange(String beginToken, String endToken, final String tableName, boolean isSequential, final String... columnFamilies) throws IOException
{
- if (Table.SYSTEM_TABLE.equals(tableName))
+ if (Table.SYSTEM_KS.equals(tableName))
return;
Token parsedBeginToken = getPartitioner().getTokenFactory().fromString(beginToken);
@@ -3309,4 +3314,14 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe
{
MigrationManager.resetLocalSchema();
}
+
+ public void setTraceProbability(double probability)
+ {
+ this.tracingProbability = probability;
+ }
+
+ public double getTracingProbability()
+ {
+ return tracingProbability;
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 0c4bfa3..350eff5 100644
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@ -434,4 +434,18 @@ public interface StorageServiceMBean
public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames);
public void resetLocalSchema() throws IOException;
+
+ /**
+ * Enables/Disables tracing for the whole system. Only thrift requests can start tracing currently.
+ *
+ * @param probability
+ * ]0,1[ will enable tracing on a partial number of requests with the provided probability. 0 will
+ * disable tracing and 1 will enable tracing for all requests (which mich severely cripple the system)
+ */
+ public void setTraceProbability(double probability);
+
+ /**
+ * Returns the configured tracing probability.
+ */
+ public double getTracingProbability();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/thrift/CassandraServer.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/CassandraServer.java b/src/java/org/apache/cassandra/thrift/CassandraServer.java
index 1437cb8..5eedcd7 100644
--- a/src/java/org/apache/cassandra/thrift/CassandraServer.java
+++ b/src/java/org/apache/cassandra/thrift/CassandraServer.java
@@ -27,7 +27,10 @@ import java.util.concurrent.TimeoutException;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
-import com.google.common.base.Predicates;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,12 +44,16 @@ import org.apache.cassandra.db.context.CounterContext;
import org.apache.cassandra.db.filter.IFilter;
import org.apache.cassandra.db.filter.QueryPath;
import org.apache.cassandra.db.marshal.MarshalException;
+import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.DynamicEndpointSnitch;
import org.apache.cassandra.scheduler.IRequestScheduler;
import org.apache.cassandra.service.*;
+import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.UUIDGen;
import org.apache.thrift.TException;
public class CassandraServer implements Cassandra.Iface
@@ -301,21 +308,61 @@ public class CassandraServer implements Cassandra.Iface
public List<ColumnOrSuperColumn> get_slice(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
- logger.debug("get_slice");
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+ "column_parent", column_parent.toString(),
+ "predicate", predicate.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("get_slice", traceParameters);
+ }
+ else
+ {
+ logger.debug("get_slice");
+ }
- ClientState cState = state();
- cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
- return multigetSliceInternal(cState.getKeyspace(), Collections.singletonList(key), column_parent, predicate, consistency_level).get(key);
+ try
+ {
+ state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+ return multigetSliceInternal(state().getKeyspace(), Collections.singletonList(key), column_parent,
+ predicate, consistency_level).get(key);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
public Map<ByteBuffer, List<ColumnOrSuperColumn>> multiget_slice(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
- logger.debug("multiget_slice");
+ if (startSessionIfRequested())
+ {
+ List<String> keysList = Lists.newArrayList();
+ for (ByteBuffer key : keys)
+ {
+ keysList.add(ByteBufferUtil.bytesToHex(key));
+ }
+ Map<String, String> traceParameters = ImmutableMap.of("keys", keysList.toString(),
+ "column_parent", column_parent.toString(),
+ "predicate", predicate.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("multiget_slice", traceParameters);
+ }
+ else
+ {
+ logger.debug("multiget_slice");
+ }
- ClientState cState = state();
- cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
- return multigetSliceInternal(cState.getKeyspace(), keys, column_parent, predicate, consistency_level);
+ try
+ {
+ state().hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+ return multigetSliceInternal(state().getKeyspace(), keys, column_parent, predicate, consistency_level);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
private Map<ByteBuffer, List<ColumnOrSuperColumn>> multigetSliceInternal(String keyspace, List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
@@ -380,74 +427,112 @@ public class CassandraServer implements Cassandra.Iface
public ColumnOrSuperColumn get(ByteBuffer key, ColumnPath column_path, ConsistencyLevel consistency_level)
throws InvalidRequestException, NotFoundException, UnavailableException, TimedOutException
{
- logger.debug("get");
- return internal_get(key, column_path, consistency_level);
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+ "column_path", column_path.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("get", traceParameters);
+ }
+ else
+ {
+ logger.debug("get");
+ }
+
+ try
+ {
+ return internal_get(key, column_path, consistency_level);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
public int get_count(ByteBuffer key, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
- logger.debug("get_count");
-
- ClientState cState = state();
- cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
- Table table = Table.open(cState.getKeyspace());
- ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
-
- if (predicate.column_names != null)
- return get_slice(key, column_parent, predicate, consistency_level).size();
-
- int pageSize;
- // request by page if this is a large row
- if (cfs.getMeanColumns() > 0)
+ if (startSessionIfRequested())
{
- int averageColumnSize = (int) (cfs.getMeanRowSize() / cfs.getMeanColumns());
- pageSize = Math.min(COUNT_PAGE_SIZE,
- DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
- pageSize = Math.max(2, pageSize);
- logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+ Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+ "column_parent", column_parent.toString(),
+ "predicate", predicate.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("get_count", traceParameters);
}
else
{
- pageSize = COUNT_PAGE_SIZE;
+ logger.debug("get_count");
}
- int totalCount = 0;
- List<ColumnOrSuperColumn> columns;
-
- if (predicate.slice_range == null)
+ try
{
- predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
- ByteBufferUtil.EMPTY_BYTE_BUFFER,
- false,
- Integer.MAX_VALUE);
- }
+ ClientState cState = state();
+ cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+ Table table = Table.open(cState.getKeyspace());
+ ColumnFamilyStore cfs = table.getColumnFamilyStore(column_parent.column_family);
- int requestedCount = predicate.slice_range.count;
- int pages = 0;
- while (true)
- {
- predicate.slice_range.count = Math.min(pageSize, requestedCount);
- columns = get_slice(key, column_parent, predicate, consistency_level);
- if (columns.isEmpty())
- break;
+ if (predicate.column_names != null)
+ return get_slice(key, column_parent, predicate, consistency_level).size();
- ByteBuffer firstName = getName(columns.get(0));
- int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size() : columns.size() - 1;
- totalCount += newColumns;
- requestedCount -= newColumns;
- pages++;
- // We're done if either:
- // - We've querying the number of columns requested by the user
- // - The last page wasn't full
- if (requestedCount == 0 || columns.size() < predicate.slice_range.count)
- break;
+ int pageSize;
+ // request by page if this is a large row
+ if (cfs.getMeanColumns() > 0)
+ {
+ int averageColumnSize = (int) (cfs.getMeanRowSize() / cfs.getMeanColumns());
+ pageSize = Math.min(COUNT_PAGE_SIZE,
+ DatabaseDescriptor.getInMemoryCompactionLimit() / averageColumnSize);
+ pageSize = Math.max(2, pageSize);
+ logger.debug("average row column size is {}; using pageSize of {}", averageColumnSize, pageSize);
+ }
else
- predicate.slice_range.start = getName(columns.get(columns.size() - 1));
- }
+ {
+ pageSize = COUNT_PAGE_SIZE;
+ }
+
+ int totalCount = 0;
+ List<ColumnOrSuperColumn> columns;
+
+ if (predicate.slice_range == null)
+ {
+ predicate.slice_range = new SliceRange(ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER,
+ false,
+ Integer.MAX_VALUE);
+ }
+
+ int requestedCount = predicate.slice_range.count;
+ int pages = 0;
+ while (true)
+ {
+ predicate.slice_range.count = Math.min(pageSize, requestedCount);
+ columns = get_slice(key, column_parent, predicate, consistency_level);
+ if (columns.isEmpty())
+ break;
+
+ ByteBuffer firstName = getName(columns.get(0));
+ int newColumns = pages == 0 || !firstName.equals(predicate.slice_range.start) ? columns.size()
+ : columns.size() - 1;
+
+ totalCount += newColumns;
+ requestedCount -= newColumns;
+ pages++;
+ // We're done if either:
+ // - We've querying the number of columns requested by the user
+ // - The last page wasn't full
+ if (requestedCount == 0 || columns.size() < predicate.slice_range.count)
+ break;
+ else
+ predicate.slice_range.start = getName(columns.get(columns.size() - 1));
+ }
- return totalCount;
+ return totalCount;
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
private static ByteBuffer getName(ColumnOrSuperColumn cosc)
@@ -460,19 +545,44 @@ public class CassandraServer implements Cassandra.Iface
public Map<ByteBuffer, Integer> multiget_count(List<ByteBuffer> keys, ColumnParent column_parent, SlicePredicate predicate, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
- logger.debug("multiget_count");
+ if (startSessionIfRequested())
+ {
+ List<String> keysList = Lists.newArrayList();
+ for (ByteBuffer key : keys)
+ {
+ keysList.add(ByteBufferUtil.bytesToHex(key));
+ }
+ Map<String, String> traceParameters = ImmutableMap.of("keys", keysList.toString(),
+ "column_parent", column_parent.toString(),
+ "predicate", predicate.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("multiget_count", traceParameters);
+ }
+ else
+ {
+ logger.debug("multiget_count");
+ }
- ClientState cState = state();
- cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
- String keyspace = cState.getKeyspace();
+ try
+ {
+ ClientState cState = state();
+ cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+ String keyspace = cState.getKeyspace();
- Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
- Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys, column_parent, predicate, consistency_level);
+ Map<ByteBuffer, Integer> counts = new HashMap<ByteBuffer, Integer>();
+ Map<ByteBuffer, List<ColumnOrSuperColumn>> columnFamiliesMap = multigetSliceInternal(keyspace, keys,
+ column_parent, predicate, consistency_level);
- for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet()) {
- counts.put(cf.getKey(), cf.getValue().size());
+ for (Map.Entry<ByteBuffer, List<ColumnOrSuperColumn>> cf : columnFamiliesMap.entrySet())
+ {
+ counts.put(cf.getKey(), cf.getValue().size());
+ }
+ return counts;
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
}
- return counts;
}
private void internal_insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
@@ -507,9 +617,27 @@ public class CassandraServer implements Cassandra.Iface
public void insert(ByteBuffer key, ColumnParent column_parent, Column column, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
- logger.debug("insert");
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+ "column_parent", column_parent.toString(),
+ "column", column.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("insert", traceParameters);
+ }
+ else
+ {
+ logger.debug("insert");
+ }
- internal_insert(key, column_parent, column, consistency_level);
+ try
+ {
+ internal_insert(key, column_parent, column, consistency_level);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
private void internal_batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
@@ -583,9 +711,30 @@ public class CassandraServer implements Cassandra.Iface
public void batch_mutate(Map<ByteBuffer,Map<String,List<Mutation>>> mutation_map, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
- logger.debug("batch_mutate");
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = Maps.newLinkedHashMap();
+ for (Map.Entry<ByteBuffer, Map<String, List<Mutation>>> mutationEntry : mutation_map.entrySet())
+ {
+ traceParameters.put(ByteBufferUtil.bytesToHex(mutationEntry.getKey()),
+ Joiner.on(";").withKeyValueSeparator(":").join(mutationEntry.getValue()));
+ }
+ traceParameters.put("consistency_level", consistency_level.name());
+ Tracing.instance().begin("batch_mutate", traceParameters);
+ }
+ else
+ {
+ logger.debug("batch_mutate");
+ }
- internal_batch_mutate(mutation_map, consistency_level);
+ try
+ {
+ internal_batch_mutate(mutation_map, consistency_level);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
private void internal_remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level, boolean isCommutativeOp)
@@ -612,9 +761,27 @@ public class CassandraServer implements Cassandra.Iface
public void remove(ByteBuffer key, ColumnPath column_path, long timestamp, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException
{
- logger.debug("remove");
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+ "column_path", column_path.toString(),
+ "timestamp", timestamp+"",
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("remove", traceParameters);
+ }
+ else
+ {
+ logger.debug("remove");
+ }
- internal_remove(key, column_path, timestamp, consistency_level, false);
+ try
+ {
+ internal_remove(key, column_path, timestamp, consistency_level, false);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
private void doInsert(ConsistencyLevel consistency_level, List<? extends IMutation> mutations) throws UnavailableException, TimedOutException, InvalidRequestException
@@ -648,117 +815,167 @@ public class CassandraServer implements Cassandra.Iface
public List<KeySlice> get_range_slices(ColumnParent column_parent, SlicePredicate predicate, KeyRange range, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TException, TimedOutException
{
- logger.debug("range_slice");
-
- ClientState cState = state();
- String keyspace = cState.getKeyspace();
- cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
- CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
- ThriftValidation.validateColumnParent(metadata, column_parent);
- ThriftValidation.validatePredicate(metadata, column_parent, predicate);
- ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = ImmutableMap.of(
+ "column_parent", column_parent.toString(),
+ "predicate", predicate.toString(),
+ "range", range.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("get_range_slices", traceParameters);
+ }
+ else
+ {
+ logger.debug("range_slice");
+ }
- List<Row> rows;
try
{
- IPartitioner p = StorageService.getPartitioner();
- AbstractBounds<RowPosition> bounds;
- if (range.start_key == null)
- {
- Token.TokenFactory tokenFactory = p.getTokenFactory();
- Token left = tokenFactory.fromString(range.start_token);
- Token right = tokenFactory.fromString(range.end_token);
- bounds = Range.makeRowRange(left, right, p);
- }
- else
+
+ String keyspace = null;
+ CFMetaData metadata = null;
+
+ ClientState cState = state();
+ keyspace = cState.getKeyspace();
+ cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+
+ metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family);
+ ThriftValidation.validateColumnParent(metadata, column_parent);
+ ThriftValidation.validatePredicate(metadata, column_parent, predicate);
+ ThriftValidation.validateKeyRange(metadata, column_parent.super_column, range);
+ ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+
+ List<Row> rows = null;
+ try
{
- bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(range.end_key, p));
+ IPartitioner p = StorageService.getPartitioner();
+ AbstractBounds<RowPosition> bounds;
+ if (range.start_key == null)
+ {
+ Token.TokenFactory tokenFactory = p.getTokenFactory();
+ Token left = tokenFactory.fromString(range.start_token);
+ Token right = tokenFactory.fromString(range.end_token);
+ bounds = Range.makeRowRange(left, right, p);
+ }
+ else
+ {
+ bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), RowPosition.forKey(
+ range.end_key, p));
+ }
+ schedule(DatabaseDescriptor.getRangeRpcTimeout());
+ try
+ {
+ IFilter filter = ThriftValidation.asIFilter(predicate,
+ metadata.getComparatorFor(column_parent.super_column));
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, filter, bounds,
+ range.row_filter, range.count), consistency_level);
+ }
+ finally
+ {
+ release();
+ }
+ assert rows != null;
}
- schedule(DatabaseDescriptor.getRangeRpcTimeout());
- try
+ catch (TimeoutException e)
{
- IFilter filter = ThriftValidation.asIFilter(predicate, metadata.getComparatorFor(column_parent.super_column));
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_parent, filter, bounds, range.row_filter, range.count), consistency_level);
+ logger.debug("... timed out");
+ throw new TimedOutException();
}
- finally
+ catch (IOException e)
{
- release();
+ throw new RuntimeException(e);
}
- assert rows != null;
- }
- catch (TimeoutException e)
- {
- logger.debug("... timed out");
- throw new TimedOutException();
+
+ return thriftifyKeySlices(rows, column_parent, predicate);
}
- catch (IOException e)
+ finally
{
- throw new RuntimeException(e);
+ Tracing.instance().stopSession();
}
-
- return thriftifyKeySlices(rows, column_parent, predicate);
}
public List<KeySlice> get_paged_slice(String column_family, KeyRange range, ByteBuffer start_column, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
- logger.debug("get_paged_slice");
-
- ClientState cState = state();
- String keyspace = cState.getKeyspace();
- cState.hasColumnFamilyAccess(column_family, Permission.READ);
-
- CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
- ThriftValidation.validateKeyRange(metadata, null, range);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
-
- SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(start_column, ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1));
-
- IPartitioner p = StorageService.getPartitioner();
- AbstractBounds<RowPosition> bounds;
- if (range.start_key == null)
+ if (startSessionIfRequested())
{
- // (token, key) is unsupported, assume (token, token)
- Token.TokenFactory tokenFactory = p.getTokenFactory();
- Token left = tokenFactory.fromString(range.start_token);
- Token right = tokenFactory.fromString(range.end_token);
- bounds = Range.makeRowRange(left, right, p);
+ Map<String, String> traceParameters = ImmutableMap.of(
+ "column_family", column_family,
+ "range", range.toString(),
+ "start_column", ByteBufferUtil.bytesToHex(start_column),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("get_paged_slice", traceParameters);
}
else
{
- RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token).maxKeyBound(p)
- : RowPosition.forKey(range.end_key, p);
- bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
+ logger.debug("get_paged_slice");
}
- List<Row> rows;
try
{
- schedule(DatabaseDescriptor.getRangeRpcTimeout());
+
+ ClientState cState = state();
+ String keyspace = cState.getKeyspace();
+ cState.hasColumnFamilyAccess(column_family, Permission.READ);
+
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_family);
+ ThriftValidation.validateKeyRange(metadata, null, range);
+ ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+
+ SlicePredicate predicate = new SlicePredicate().setSlice_range(new SliceRange(start_column,
+ ByteBufferUtil.EMPTY_BYTE_BUFFER, false, -1));
+
+ IPartitioner p = StorageService.getPartitioner();
+ AbstractBounds<RowPosition> bounds;
+ if (range.start_key == null)
+ {
+ // (token, key) is unsupported, assume (token, token)
+ Token.TokenFactory tokenFactory = p.getTokenFactory();
+ Token left = tokenFactory.fromString(range.start_token);
+ Token right = tokenFactory.fromString(range.end_token);
+ bounds = Range.makeRowRange(left, right, p);
+ }
+ else
+ {
+ RowPosition end = range.end_key == null ? p.getTokenFactory().fromString(range.end_token)
+ .maxKeyBound(p)
+ : RowPosition.forKey(range.end_key, p);
+ bounds = new Bounds<RowPosition>(RowPosition.forKey(range.start_key, p), end);
+ }
+
+ List<Row> rows;
try
{
- IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
- rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, null, filter, bounds, range.row_filter, range.count, true, true), consistency_level);
+ schedule(DatabaseDescriptor.getRangeRpcTimeout());
+ try
+ {
+ IFilter filter = ThriftValidation.asIFilter(predicate, metadata.comparator);
+ rows = StorageProxy.getRangeSlice(new RangeSliceCommand(keyspace, column_family, null, filter,
+ bounds, range.row_filter, range.count, true, true), consistency_level);
+ }
+ finally
+ {
+ release();
+ }
+ assert rows != null;
}
- finally
+ catch (TimeoutException e)
{
- release();
+ logger.debug("... timed out");
+ throw new TimedOutException();
}
- assert rows != null;
- }
- catch (TimeoutException e)
- {
- logger.debug("... timed out");
- throw new TimedOutException();
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate);
}
- catch (IOException e)
+ finally
{
- throw new RuntimeException(e);
+ Tracing.instance().stopSession();
}
-
- return thriftifyKeySlices(rows, new ColumnParent(column_family), predicate);
}
private List<KeySlice> thriftifyKeySlices(List<Row> rows, ColumnParent column_parent, SlicePredicate predicate)
@@ -776,46 +993,68 @@ public class CassandraServer implements Cassandra.Iface
public List<KeySlice> get_indexed_slices(ColumnParent column_parent, IndexClause index_clause, SlicePredicate column_predicate, ConsistencyLevel consistency_level) throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
- logger.debug("scan");
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = ImmutableMap.of(
+ "column_parent", column_parent.toString(),
+ "index_clause", index_clause.toString(),
+ "slice_predicate", column_predicate.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("get_indexed_slices", traceParameters);
+ }
+ else
+ {
+ logger.debug("scan");
+ }
- ClientState cState = state();
- cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
- String keyspace = cState.getKeyspace();
- CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
- ThriftValidation.validateColumnParent(metadata, column_parent);
- ThriftValidation.validatePredicate(metadata, column_parent, column_predicate);
- ThriftValidation.validateIndexClauses(metadata, index_clause);
- ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
+ try
+ {
- IPartitioner p = StorageService.getPartitioner();
- AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(RowPosition.forKey(index_clause.start_key, p),
- p.getMinimumToken().minKeyBound());
+ ClientState cState = state();
+ cState.hasColumnFamilyAccess(column_parent.column_family, Permission.READ);
+ String keyspace = cState.getKeyspace();
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, false);
+ ThriftValidation.validateColumnParent(metadata, column_parent);
+ ThriftValidation.validatePredicate(metadata, column_parent, column_predicate);
+ ThriftValidation.validateIndexClauses(metadata, index_clause);
+ ThriftValidation.validateConsistencyLevel(keyspace, consistency_level, RequestType.READ);
- IFilter filter = ThriftValidation.asIFilter(column_predicate, metadata.getComparatorFor(column_parent.super_column));
- RangeSliceCommand command = new RangeSliceCommand(keyspace,
- column_parent.column_family,
- null,
- filter,
- bounds,
- index_clause.expressions,
- index_clause.count);
+ IPartitioner p = StorageService.getPartitioner();
+ AbstractBounds<RowPosition> bounds = new Bounds<RowPosition>(RowPosition.forKey(index_clause.start_key, p),
+ p.getMinimumToken().minKeyBound());
+
+ IFilter filter = ThriftValidation.asIFilter(column_predicate,
+ metadata.getComparatorFor(column_parent.super_column));
+ RangeSliceCommand command = new RangeSliceCommand(keyspace,
+ column_parent.column_family,
+ null,
+ filter,
+ bounds,
+ index_clause.expressions,
+ index_clause.count);
+
+ List<Row> rows;
+ try
+ {
+ rows = StorageProxy.getRangeSlice(command, consistency_level);
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (TimeoutException e)
+ {
+ logger.debug("... timed out");
+ throw new TimedOutException();
+ }
+
+ return thriftifyKeySlices(rows, column_parent, column_predicate);
- List<Row> rows;
- try
- {
- rows = StorageProxy.getRangeSlice(command, consistency_level);
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
}
- catch (TimeoutException e)
+ finally
{
- logger.debug("... timed out");
- throw new TimedOutException();
+ Tracing.instance().stopSession();
}
-
- return thriftifyKeySlices(rows, column_parent, column_predicate);
}
public List<KsDef> describe_keyspaces() throws TException, InvalidRequestException
@@ -1071,8 +1310,17 @@ public class CassandraServer implements Cassandra.Iface
public void truncate(String cfname) throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
ClientState cState = state();
- logger.debug("truncating {} in {}", cfname, cState.getKeyspace());
cState.hasColumnFamilyAccess(cfname, Permission.WRITE);
+
+ if (startSessionIfRequested())
+ {
+ Tracing.instance().begin("truncate", ImmutableMap.of("cf", cfname, "ks", cState.getKeyspace()));
+ }
+ else
+ {
+ logger.debug("truncating {}.{}", cState.getKeyspace(), cfname);
+ }
+
try
{
schedule(DatabaseDescriptor.getTruncateRpcTimeout());
@@ -1094,6 +1342,10 @@ public class CassandraServer implements Cassandra.Iface
{
throw (UnavailableException) new UnavailableException().initCause(e);
}
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
public void set_keyspace(String keyspace) throws InvalidRequestException, TException
@@ -1114,41 +1366,78 @@ public class CassandraServer implements Cassandra.Iface
public void add(ByteBuffer key, ColumnParent column_parent, CounterColumn column, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
- logger.debug("add");
-
- ClientState cState = state();
- cState.hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
- String keyspace = cState.getKeyspace();
-
- CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
- ThriftValidation.validateKey(metadata, key);
- ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
- ThriftValidation.validateColumnParent(metadata, column_parent);
- // SuperColumn field is usually optional, but not when we're adding
- if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
+ if (startSessionIfRequested())
{
- throw new InvalidRequestException("missing mandatory super column name for super CF " + column_parent.column_family);
+ Map<String, String> traceParameters = ImmutableMap.of(
+ "column_parent", column_parent.toString(),
+ "column", column.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("add", traceParameters);
+ }
+ else
+ {
+ logger.debug("add");
}
- ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
- RowMutation rm = new RowMutation(keyspace, key);
try
{
- rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name), column.value);
+ ClientState cState = state();
+ cState.hasColumnFamilyAccess(column_parent.column_family, Permission.WRITE);
+ String keyspace = cState.getKeyspace();
+
+ CFMetaData metadata = ThriftValidation.validateColumnFamily(keyspace, column_parent.column_family, true);
+ ThriftValidation.validateKey(metadata, key);
+ ThriftValidation.validateCommutativeForWrite(metadata, consistency_level);
+ ThriftValidation.validateColumnParent(metadata, column_parent);
+ // SuperColumn field is usually optional, but not when we're adding
+ if (metadata.cfType == ColumnFamilyType.Super && column_parent.super_column == null)
+ {
+ throw new InvalidRequestException("missing mandatory super column name for super CF "
+ + column_parent.column_family);
+ }
+ ThriftValidation.validateColumnNames(metadata, column_parent, Arrays.asList(column.name));
+
+ RowMutation rm = new RowMutation(keyspace, key);
+ try
+ {
+ rm.addCounter(new QueryPath(column_parent.column_family, column_parent.super_column, column.name),
+ column.value);
+ }
+ catch (MarshalException e)
+ {
+ throw new InvalidRequestException(e.getMessage());
+ }
+ doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
}
- catch (MarshalException e)
+ finally
{
- throw new InvalidRequestException(e.getMessage());
+ Tracing.instance().stopSession();
}
- doInsert(consistency_level, Arrays.asList(new CounterMutation(rm, consistency_level)));
}
public void remove_counter(ByteBuffer key, ColumnPath path, ConsistencyLevel consistency_level)
throws InvalidRequestException, UnavailableException, TimedOutException, TException
{
- logger.debug("remove_counter");
+ if (startSessionIfRequested())
+ {
+ Map<String, String> traceParameters = ImmutableMap.of("key", ByteBufferUtil.bytesToHex(key),
+ "column_path", path.toString(),
+ "consistency_level", consistency_level.name());
+ Tracing.instance().begin("remove_counter", traceParameters);
+ }
+ else
+ {
+ logger.debug("remove_counter");
+ }
- internal_remove(key, path, System.currentTimeMillis(), consistency_level, true);
+ try
+ {
+ internal_remove(key, path, System.currentTimeMillis(), consistency_level, true);
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
private static String uncompress(ByteBuffer query, Compression compression) throws InvalidRequestException
@@ -1212,21 +1501,36 @@ public class CassandraServer implements Cassandra.Iface
public CqlResult execute_cql_query(ByteBuffer query, Compression compression)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
{
- if (logger.isDebugEnabled()) logger.debug("execute_cql_query");
-
- String queryString = uncompress(query,compression);
+ try
+ {
+ String queryString = uncompress(query, compression);
+ if (startSessionIfRequested())
+ {
+ Tracing.instance().begin("execute_cql_query",
+ ImmutableMap.of("query", queryString));
+ }
+ else
+ {
+ logger.debug("execute_cql_query");
+ }
- ClientState cState = state();
- if (cState.getCQLVersion().major == 2)
- return QueryProcessor.process(queryString, state());
- else
- return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
+ ClientState cState = state();
+ if (cState.getCQLVersion().major == 2)
+ return QueryProcessor.process(queryString, state());
+ else
+ return org.apache.cassandra.cql3.QueryProcessor.process(queryString, cState).toThriftResult();
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
+ }
}
public CqlPreparedResult prepare_cql_query(ByteBuffer query, Compression compression)
throws InvalidRequestException, TException
{
- if (logger.isDebugEnabled()) logger.debug("prepare_cql_query");
+ if (logger.isDebugEnabled())
+ logger.debug("prepare_cql_query");
String queryString = uncompress(query,compression);
@@ -1240,28 +1544,45 @@ public class CassandraServer implements Cassandra.Iface
public CqlResult execute_prepared_cql_query(int itemId, List<ByteBuffer> bindVariables)
throws InvalidRequestException, UnavailableException, TimedOutException, SchemaDisagreementException, TException
{
- if (logger.isDebugEnabled()) logger.debug("execute_prepared_cql_query");
-
- ClientState cState = state();
- if (cState.getCQLVersion().major == 2)
+ if (startSessionIfRequested())
{
- CQLStatement statement = cState.getPrepared().get(itemId);
-
- if (statement == null)
- throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
- logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
-
- return QueryProcessor.processPrepared(statement, cState, bindVariables);
+ // TODO we don't have [typed] access to CQL bind variables here. CASSANDRA-4560 is open to add support.
+ Tracing.instance().begin("execute_prepared_cql_query", Collections.<String, String>emptyMap());
}
else
{
- org.apache.cassandra.cql3.CQLStatement statement = cState.getCQL3Prepared().get(itemId);
+ logger.debug("execute_prepared_cql_query");
+ }
- if (statement == null)
- throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
- logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.getBoundsTerms());
+ try
+ {
+ ClientState cState = state();
+ if (cState.getCQLVersion().major == 2)
+ {
+ CQLStatement statement = cState.getPrepared().get(itemId);
+
+ if (statement == null)
+ throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
+ logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId, statement.boundTerms);
+
+ return QueryProcessor.processPrepared(statement, cState, bindVariables);
+ }
+ else
+ {
+ org.apache.cassandra.cql3.CQLStatement statement = cState.getCQL3Prepared().get(itemId);
+
+ if (statement == null)
+ throw new InvalidRequestException(String.format("Prepared query with ID %d not found", itemId));
+ logger.trace("Retrieved prepared statement #{} with {} bind markers", itemId,
+ statement.getBoundsTerms());
- return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables).toThriftResult();
+ return org.apache.cassandra.cql3.QueryProcessor.processPrepared(statement, cState, bindVariables)
+ .toThriftResult();
+ }
+ }
+ finally
+ {
+ Tracing.instance().stopSession();
}
}
@@ -1272,5 +1593,22 @@ public class CassandraServer implements Cassandra.Iface
state().setCQLVersion(version);
}
+ public ByteBuffer trace_next_query() throws TException
+ {
+ UUID sessionId = UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress());
+ state().prepareTracingSession(sessionId);
+ return TimeUUIDType.instance.decompose(sessionId);
+ }
+
+ private boolean startSessionIfRequested()
+ {
+ if (state().traceNextQuery())
+ {
+ state().createSession();
+ return true;
+ }
+ return false;
+ }
+
// main method moved to CassandraDaemon
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/thrift/ThriftValidation.java b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
index cf42f9e..d31515e 100644
--- a/src/java/org/apache/cassandra/thrift/ThriftValidation.java
+++ b/src/java/org/apache/cassandra/thrift/ThriftValidation.java
@@ -605,7 +605,7 @@ public class ThriftValidation
public static void validateKeyspaceNotSystem(String modifiedKeyspace) throws InvalidRequestException
{
- if (modifiedKeyspace.equalsIgnoreCase(Table.SYSTEM_TABLE))
+ if (modifiedKeyspace.equalsIgnoreCase(Table.SYSTEM_KS))
throw new InvalidRequestException("system keyspace is not user-modifiable");
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java
index 6bede1b..8cb7dbe 100644
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@ -36,6 +36,7 @@ import org.apache.commons.cli.*;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
import org.apache.cassandra.config.ConfigurationException;
import org.apache.cassandra.db.ColumnFamilyStoreMBean;
+import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.compaction.CompactionManagerMBean;
import org.apache.cassandra.db.compaction.OperationType;
import org.apache.cassandra.locator.EndpointSnitchInfoMBean;
@@ -46,7 +47,7 @@ import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.utils.EstimatedHistogram;
import org.apache.cassandra.utils.Pair;
-public class NodeCmd
+public class trace_next_queryNodeCmd
{
private static final Pair<String, String> SNAPSHOT_COLUMNFAMILY_OPT = new Pair<String, String>("cf", "column-family");
private static final Pair<String, String> HOST_OPT = new Pair<String, String>("h", "host");
@@ -120,6 +121,7 @@ public class NodeCmd
SETCOMPACTIONTHRESHOLD,
SETCOMPACTIONTHROUGHPUT,
SETSTREAMTHROUGHPUT,
+ SETTRACEPROBABILITY,
SNAPSHOT,
STATUS,
STATUSTHRIFT,
@@ -145,7 +147,7 @@ public class NodeCmd
// No args
addCmdHelp(header, "ring", "Print information about the token ring");
addCmdHelp(header, "join", "Join the ring");
- addCmdHelp(header, "info [-T/--tokens]", "Print node information (uptime, load, ...)");
+ addCmdHelp(header, "igit nfo [-T/--tokens]", "Print node information (uptime, load, ...)");
addCmdHelp(header, "status", "Print cluster information (state, load, IDs, ...)");
addCmdHelp(header, "cfstats", "Print statistics on column families");
addCmdHelp(header, "version", "Print cassandra version");
@@ -173,6 +175,7 @@ public class NodeCmd
addCmdHelp(header, "describering [keyspace]", "Shows the token ranges info of a given keyspace.");
addCmdHelp(header, "rangekeysample", "Shows the sampled keys held across all keyspaces.");
addCmdHelp(header, "rebuild [src-dc-name]", "Rebuild data by streaming from other nodes (similarly to bootstrap)");
+ addCmdHelp(header, "settraceprobability [value]", "Sets the probability for tracing any given request to value. 0 disables, 1 enables for all requests, 0 is the default");
// Two args
addCmdHelp(header, "snapshot [keyspaces...] -cf [columnfamilyName] -t [snapshotName]", "Take a snapshot of the optionally specified column family of the specified keyspaces using optional name snapshotName");
@@ -1033,6 +1036,11 @@ public class NodeCmd
probe.setStreamThroughput(Integer.valueOf(arguments[0]));
break;
+ case SETTRACEPROBABILITY :
+ if (arguments.length != 1) { badUse("Missing value argument."); }
+ probe.setTraceProbability(Double.valueOf(arguments[0]));
+ break;
+
case REBUILD :
if (arguments.length > 1) { badUse("Too many arguments."); }
probe.rebuild(arguments.length == 1 ? arguments[0] : null);
@@ -1284,7 +1292,7 @@ public class NodeCmd
catch (ExecutionException ee) { err(ee, "Error occured during compaction"); }
break;
case CLEANUP :
- if (keyspace.equals("system")) { break; } // Skip cleanup on system cfs.
+ if (keyspace.equals(Table.SYSTEM_KS)) { break; } // Skip cleanup on system cfs.
try { probe.forceTableCleanup(keyspace, columnFamilies); }
catch (ExecutionException ee) { err(ee, "Error occured during cleanup"); }
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 494fcda..2d57911 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -701,6 +701,11 @@ public class NodeProbe
ssProxy.setStreamThroughputMbPerSec(value);
}
+ public void setTraceProbability(double value)
+ {
+ ssProxy.setTraceProbability(value);
+ }
+
public String getSchemaVersion()
{
return ssProxy.getSchemaVersion();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/5c94432b/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
new file mode 100644
index 0000000..7fca842
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.tracing;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.net.InetAddress;
+import java.nio.ByteBuffer;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.base.Stopwatch;
+
+import org.apache.cassandra.thrift.ColumnParent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.UUIDGen;
+
+/**
+ * ThreadLocal state for a tracing session. The presence of an instance of this class as a ThreadLocal denotes that an
+ * operation is being traced.
+ */
+public class TraceState
+{
+ public final UUID sessionId;
+ public final InetAddress coordinator;
+ public final Stopwatch watch;
+ public final ByteBuffer sessionIdBytes;
+
+ public TraceState(InetAddress coordinator, UUID sessionId)
+ {
+ assert coordinator != null;
+ assert sessionId != null;
+
+ this.coordinator = coordinator;
+ this.sessionId = sessionId;
+ sessionIdBytes = ByteBufferUtil.bytes(sessionId);
+ watch = new Stopwatch();
+ watch.start();
+ }
+
+ public int elapsed()
+ {
+ long elapsed = watch.elapsedTime(TimeUnit.MICROSECONDS);
+ return elapsed < Integer.MAX_VALUE ? (int) elapsed : Integer.MAX_VALUE;
+ }
+}