You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/21 16:27:51 UTC
[1/3] git commit: Fix tracing when operation completesbefore all
responses arrive patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Updated Branches:
refs/heads/cassandra-1.2 7dc2eb95c -> fbe8a6eb2
refs/heads/trunk b73f9d423 -> 140b0311d
Fix tracing when operation completesbefore all responses arrive
patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe8a6eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe8a6eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe8a6eb
Branch: refs/heads/cassandra-1.2
Commit: fbe8a6eb213ee3558be701c73c31a0da79446657
Parents: 7dc2eb9
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 20 10:19:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 09:23:39 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DebuggableThreadPoolExecutor.java | 22 ++++++++---
.../cassandra/concurrent/StageManager.java | 29 +++++++++------
.../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
.../apache/cassandra/net/MessagingService.java | 14 ++++---
.../cassandra/net/OutboundTcpConnection.java | 13 ++++++-
.../cassandra/service/MigrationManager.java | 7 +---
.../cassandra/tracing/ExpiredTraceState.java | 39 ++++++++++++++++++++
.../apache/cassandra/tracing/TraceState.java | 12 ++++--
.../org/apache/cassandra/tracing/Tracing.java | 26 +++++++------
.../service/AntiEntropyServiceTestAbstract.java | 10 ++---
11 files changed, 156 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3847d6a..593bf7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.6
+ * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 25f15ee..26441ec 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
* threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a
* stage is less busy, core thread timeout is enabled.
*/
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
{
protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -131,12 +131,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
protected void onFinalAccept(Runnable task) {}
protected void onFinalRejection(Runnable task) {}
+ public void execute(Runnable command, TraceState state)
+ {
+ super.execute(state == null ? command : new TraceSessionWrapper<Object>(command, state));
+ }
+
// execute does not call newTaskFor
@Override
public void execute(Runnable command)
{
super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
- ? new TraceSessionWrapper<Object>(command, null)
+ ? new TraceSessionWrapper<Object>(command)
: command);
}
@@ -145,7 +150,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
if (isTracing() && !(runnable instanceof TraceSessionWrapper))
{
- return new TraceSessionWrapper<T>(runnable, result);
+ return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
}
return super.newTaskFor(runnable, result);
}
@@ -256,10 +261,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
private final TraceState state;
- public TraceSessionWrapper(Runnable runnable, T result)
+ public TraceSessionWrapper(Runnable command)
{
- super(runnable, result);
- state = Tracing.instance().get();
+ this(command, null);
}
public TraceSessionWrapper(Callable<T> callable)
@@ -268,6 +272,12 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
state = Tracing.instance().get();
}
+ public TraceSessionWrapper(Runnable command, TraceState state)
+ {
+ super(command, null);
+ this.state = state;
+ }
+
private void setupContext()
{
Tracing.instance().set(state);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 287b19e..2960f22 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -38,7 +39,7 @@ public class StageManager
{
private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
- private static final EnumMap<Stage, ThreadPoolExecutor> stages = new EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
+ private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class);
public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
@@ -60,7 +61,7 @@ public class StageManager
stages.put(Stage.TRACING, tracingExecutor());
}
- private static ThreadPoolExecutor tracingExecutor()
+ private static ExecuteOnlyExecutor tracingExecutor()
{
RejectedExecutionHandler reh = new RejectedExecutionHandler()
{
@@ -78,7 +79,7 @@ public class StageManager
reh);
}
- private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
+ private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
{
return new JMXEnabledThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -88,7 +89,7 @@ public class StageManager
stage.getJmxType());
}
- private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+ private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
{
return new JMXConfigurableThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -98,7 +99,7 @@ public class StageManager
stage.getJmxType());
}
- private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
+ private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
{
return new JMXConfigurableThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -111,8 +112,8 @@ public class StageManager
/**
* Retrieve a stage from the StageManager
* @param stage name of the stage to be retrieved.
- */
- public static ThreadPoolExecutor getStage(Stage stage)
+ */
+ public static TracingAwareExecutorService getStage(Stage stage)
{
return stages.get(stage);
}
@@ -132,29 +133,35 @@ public class StageManager
* A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
* tracing stage. See CASSANDRA-1123 for background.
*/
- private static class ExecuteOnlyExecutor extends ThreadPoolExecutor
+ private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
{
public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
+ public void execute(Runnable command, TraceState state)
+ {
+ assert state == null;
+ super.execute(command);
+ }
+
@Override
public Future<?> submit(Runnable task)
{
- return super.submit(task);
+ throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Runnable task, T result)
{
- return super.submit(task, result);
+ throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Callable<T> task)
{
- return super.submit(task);
+ throw new UnsupportedOperationException();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
new file mode 100644
index 0000000..e5dcd7e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.tracing.TraceState;
+
+public interface TracingAwareExecutorService extends ExecutorService
+{
+ // we need a way to inject a TraceState directly into the Executor context without going through
+ // the global Tracing sessions; see CASSANDRA-5668
+ public void execute(Runnable command, TraceState state);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5e36f44..2964d35 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -29,7 +29,6 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
@@ -37,12 +36,14 @@ import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.*;
@@ -61,9 +62,9 @@ import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
public final class MessagingService implements MessagingServiceMBean
{
@@ -702,15 +703,16 @@ public final class MessagingService implements MessagingServiceMBean
public void receive(MessageIn message, String id, long timestamp)
{
- Tracing.instance().initializeFromMessage(message);
- Tracing.trace("Message received from {}", message.from);
+ TraceState state = Tracing.instance().initializeFromMessage(message);
+ if (state != null)
+ state.trace("Message received from {}", message.from);
message = SinkManager.processInboundMessage(message, id);
if (message == null)
return;
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
- ExecutorService stage = StageManager.getStage(message.getMessageType());
+ TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
@@ -727,7 +729,7 @@ public final class MessagingService implements MessagingServiceMBean
}
}
- stage.execute(runnable);
+ stage.execute(runnable, state);
}
public void setCallbackForTests(String messageId, CallbackInfo callback)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 32ce224..ee30d36 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -179,8 +179,17 @@ public class OutboundTcpConnection extends Thread
{
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState state = Tracing.instance().get(sessionId);
- state.trace("Sending message to {}", poolReference.endPoint());
- Tracing.instance().stopIfNonLocal(state);
+ String message = String.format("Sending message to %s", poolReference.endPoint());
+ // session may have already finished; see CASSANDRA-5668
+ if (state == null)
+ {
+ TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+ }
+ else
+ {
+ state.trace(message);
+ Tracing.instance().stopIfNonLocal(state);
+ }
}
write(qm.message, qm.id, qm.timestamp, out, targetVersion);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 127b2b8..de34785 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -23,10 +23,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
@@ -160,7 +157,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public static boolean isReadyForBootstrap()
{
- return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
+ return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
}
public void notifyCreateKeyspace(KSMetaData ksm)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
new file mode 100644
index 0000000..6b4f90b
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.tracing;
+
+import java.util.UUID;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ExpiredTraceState extends TraceState
+{
+ public ExpiredTraceState(UUID sessionId)
+ {
+ super(FBUtilities.getBroadcastAddress(), sessionId, true);
+ }
+
+ public int elapsed()
+ {
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 703c4ee..25599c4 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -81,11 +81,14 @@ public class TraceState
trace(MessageFormatter.arrayFormat(format, args).getMessage());
}
- public void trace(final String message)
+ public void trace(String message)
{
- final int elapsed = elapsed();
- final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+ TraceState.trace(sessionIdBytes, message, elapsed());
+ }
+ public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
+ {
+ final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
final String threadName = Thread.currentThread().getName();
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
@@ -96,7 +99,8 @@ public class TraceState
ColumnFamily cf = ColumnFamily.create(cfMeta);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
+ if (elapsed >= 0)
+ Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
mutation.add(cf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 17241b9..eb5bad9 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -229,31 +230,34 @@ public class Tracing
}
/**
- * Updates the threads query context from a message
+ * Determines the tracing context from a message. Does NOT set the threadlocal state.
*
- * @param message
- * The internode message
+ * @param message The internode message
*/
- public void initializeFromMessage(final MessageIn<?> message)
+ public TraceState initializeFromMessage(final MessageIn<?> message)
{
final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
- // if the message has no session context header don't do tracing
if (sessionBytes == null)
- {
- state.set(null);
- return;
- }
+ return null;
assert sessionBytes.length == 16;
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState ts = sessions.get(sessionId);
- if (ts == null)
+ if (ts != null)
+ return ts;
+
+ if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+ {
+ // received a message for a session we've already closed out. see CASSANDRA-5668
+ return new ExpiredTraceState(sessionId);
+ }
+ else
{
ts = new TraceState(message.from, sessionId, false);
sessions.put(sessionId, ts);
+ return ts;
}
- state.set(ts);
}
public static void trace(String message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index fc0b832..b80c272 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -20,10 +20,11 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,13 +43,12 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
-import static org.apache.cassandra.service.AntiEntropyService.*;
-
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import static org.apache.cassandra.service.AntiEntropyService.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -305,7 +305,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
void flushAES() throws Exception
{
- final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY);
+ final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY);
final Callable noop = new Callable<Object>()
{
public Boolean call()
[3/3] git commit: merge from 1.2
Posted by jb...@apache.org.
merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/140b0311
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/140b0311
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/140b0311
Branch: refs/heads/trunk
Commit: 140b0311df890d2258f19f3df98f1a996b6f2a6e
Parents: b73f9d4 fbe8a6e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Jun 21 09:27:21 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 09:27:21 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DebuggableThreadPoolExecutor.java | 22 ++++++++---
.../cassandra/concurrent/StageManager.java | 29 +++++++++------
.../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
.../apache/cassandra/net/MessagingService.java | 14 ++++---
.../cassandra/net/OutboundTcpConnection.java | 13 ++++++-
.../cassandra/service/MigrationManager.java | 3 +-
.../cassandra/tracing/ExpiredTraceState.java | 39 ++++++++++++++++++++
.../apache/cassandra/tracing/TraceState.java | 12 ++++--
.../org/apache/cassandra/tracing/Tracing.java | 26 +++++++------
.../service/AntiEntropyServiceTestAbstract.java | 9 +++--
11 files changed, 156 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 044add4,593bf7c..29c6fd2
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,69 -1,5 +1,70 @@@
+2.0
+ * Removed on-heap row cache (CASSANDRA-5348)
+ * use nanotime consistently for node-local timeouts (CASSANDRA-5581)
+ * Avoid unnecessary second pass on name-based queries (CASSANDRA-5577)
+ * Experimental triggers (CASSANDRA-1311)
+ * JEMalloc support for off-heap allocation (CASSANDRA-3997)
+ * Single-pass compaction (CASSANDRA-4180)
+ * Removed token range bisection (CASSANDRA-5518)
+ * Removed compatibility with pre-1.2.5 sstables and network messages
+ (CASSANDRA-5511)
+ * removed PBSPredictor (CASSANDRA-5455)
+ * CAS support (CASSANDRA-5062, 5441, 5442, 5443, 5619)
+ * Leveled compaction performs size-tiered compactions in L0
+ (CASSANDRA-5371, 5439)
+ * Add yaml network topology snitch for mixed ec2/other envs (CASSANDRA-5339)
+ * Log when a node is down longer than the hint window (CASSANDRA-4554)
+ * Optimize tombstone creation for ExpiringColumns (CASSANDRA-4917)
+ * Improve LeveledScanner work estimation (CASSANDRA-5250, 5407)
+ * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
+ * Change Message IDs to ints (CASSANDRA-5307)
+ * Move sstable level information into the Stats component, removing the
+ need for a separate Manifest file (CASSANDRA-4872)
+ * avoid serializing to byte[] on commitlog append (CASSANDRA-5199)
+ * make index_interval configurable per columnfamily (CASSANDRA-3961, CASSANDRA-5650)
+ * add default_time_to_live (CASSANDRA-3974)
+ * add memtable_flush_period_in_ms (CASSANDRA-4237)
+ * replace supercolumns internally by composites (CASSANDRA-3237, 5123)
+ * upgrade thrift to 0.9.0 (CASSANDRA-3719)
+ * drop unnecessary keyspace parameter from user-defined compaction API
+ (CASSANDRA-5139)
+ * more robust solution to incomplete compactions + counters (CASSANDRA-5151)
+ * Change order of directory searching for c*.in.sh (CASSANDRA-3983)
+ * Add tool to reset SSTable compaction level for LCS (CASSANDRA-5271)
+ * Allow custom configuration loader (CASSANDRA-5045)
+ * Remove memory emergency pressure valve logic (CASSANDRA-3534)
+ * Reduce request latency with eager retry (CASSANDRA-4705)
+ * cqlsh: Remove ASSUME command (CASSANDRA-5331)
+ * Rebuild BF when loading sstables if bloom_filter_fp_chance
+ has changed since compaction (CASSANDRA-5015)
+ * remove row-level bloom filters (CASSANDRA-4885)
+ * Change Kernel Page Cache skipping into row preheating (disabled by default)
+ (CASSANDRA-4937)
+ * Improve repair by deciding on a gcBefore before sending
+ out TreeRequests (CASSANDRA-4932)
+ * Add an official way to disable compactions (CASSANDRA-5074)
+ * Reenable ALTER TABLE DROP with new semantics (CASSANDRA-3919)
+ * Add binary protocol versioning (CASSANDRA-5436)
+ * Swap THshaServer for TThreadedSelectorServer (CASSANDRA-5530)
+ * Add alias support to SELECT statement (CASSANDRA-5075)
+ * Don't create empty RowMutations in CommitLogReplayer (CASSANDRA-5541)
+ * Use range tombstones when dropping cfs/columns from schema (CASSANDRA-5579)
+ * cqlsh: drop CQL2/CQL3-beta support (CASSANDRA-5585)
+ * Track max/min column names in sstables to be able to optimize slice
+ queries (CASSANDRA-5514, CASSANDRA-5595, CASSANDRA-5600)
+ * Binary protocol: allow batching already prepared statements (CASSANDRA-4693)
+ * Allow preparing timestamp, ttl and limit in CQL3 queries (CASSANDRA-4450)
+ * Support native link w/o JNA in Java7 (CASSANDRA-3734)
+ * Use SASL authentication in binary protocol v2 (CASSANDRA-5545)
+ * Replace Thrift HsHa with LMAX Disruptor based implementation (CASSANDRA-5582)
+ * cqlsh: Add row count to SELECT output (CASSANDRA-5636)
+ * Include a timestamp with all read commands to determine column expiration
+ (CASSANDRA-5149)
+ * Streaming 2.0 (CASSANDRA-5286)
+ * Conditional create/drop ks/table/index statements in CQL3 (CASSANDRA-2737)
+
1.2.6
+ * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/MessagingService.java
index 8e94f8e,2964d35..e01183c
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@@ -55,12 -60,11 +56,12 @@@ import org.apache.cassandra.metrics.Dro
import org.apache.cassandra.net.sink.SinkManager;
import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
+import org.apache.cassandra.service.paxos.Commit;
+import org.apache.cassandra.service.paxos.PrepareResponse;
import org.apache.cassandra.streaming.*;
-import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+ import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
- import org.cliffc.high_scale_lib.NonBlockingHashMap;
public final class MessagingService implements MessagingServiceMBean
{
@@@ -677,23 -701,38 +678,24 @@@
}
}
- public void receive(MessageIn message, String id, long timestamp)
+ public void receive(MessageIn message, int id, long timestamp)
{
- Tracing.instance().initializeFromMessage(message);
- Tracing.trace("Message received from {}", message.from);
+ TraceState state = Tracing.instance().initializeFromMessage(message);
+ if (state != null)
+ state.trace("Message received from {}", message.from);
message = SinkManager.processInboundMessage(message, id);
if (message == null)
return;
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
- ExecutorService stage = StageManager.getStage(message.getMessageType());
+ TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
- stage.execute(runnable);
- if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
- {
- IMessageCallback cb = MessagingService.instance().getRegisteredCallback(id).callback;
-
- if (cb instanceof AbstractWriteResponseHandler)
- {
- PBSPredictor.instance().logWriteResponse(id, timestamp);
- }
- else if (cb instanceof ReadCallback)
- {
- PBSPredictor.instance().logReadResponse(id, timestamp);
- }
- }
-
+ stage.execute(runnable, state);
}
- public void setCallbackForTests(String messageId, CallbackInfo callback)
+ public void setCallbackForTests(int messageId, CallbackInfo callback)
{
callbacks.put(messageId, callback);
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 52a415c,ee30d36..eaab3ad
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@@ -182,15 -179,25 +182,24 @@@ public class OutboundTcpConnection exte
{
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState state = Tracing.instance().get(sessionId);
- state.trace("Sending message to {}", poolReference.endPoint());
- Tracing.instance().stopIfNonLocal(state);
+ String message = String.format("Sending message to %s", poolReference.endPoint());
+ // session may have already finished; see CASSANDRA-5668
+ if (state == null)
+ {
+ TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+ }
+ else
+ {
+ state.trace(message);
+ Tracing.instance().stopIfNonLocal(state);
+ }
}
- write(qm.message, qm.id, qm.timestamp, out, targetVersion);
+ writeInternal(qm.message, qm.id, qm.timestamp);
+
completed++;
if (active.peek() == null)
- {
out.flush();
- }
}
catch (Exception e)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/MigrationManager.java
index d602d25,de34785..cce674f
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@@ -23,9 -23,7 +23,10 @@@ import java.io.IOException
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/TraceState.java
index 52e6d04,25599c4..b4cff93
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@@ -91,12 -96,14 +94,13 @@@ public class TraceStat
public void runMayThrow() throws Exception
{
CFMetaData cfMeta = CFMetaData.TraceEventsCf;
- ColumnFamily cf = ColumnFamily.create(cfMeta);
+ ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfMeta);
+ Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
+ if (elapsed >= 0)
+ Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
- RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
- mutation.add(cf);
+ Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
+ RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes, cf);
StorageProxy.mutate(Arrays.asList(mutation), ConsistencyLevel.ANY);
}
});
http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tracing/Tracing.java
index 5637f48,eb5bad9..8782ee5
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@@ -33,9 -33,13 +33,10 @@@ import org.apache.cassandra.concurrent.
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.cql3.ColumnNameBuilder;
-import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.ExpiringColumn;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.net.MessageIn;
+ import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/140b0311/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index 79aed42,b80c272..c930cc3
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@@ -48,6 -48,7 +48,7 @@@ import org.apache.cassandra.utils.ByteB
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
-import static org.apache.cassandra.service.AntiEntropyService.*;
++import static org.apache.cassandra.service.ActiveRepairService.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
[2/3] git commit: Fix tracing when operation completesbefore all
responses arrive patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Posted by jb...@apache.org.
Fix tracing when operation completesbefore all responses arrive
patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe8a6eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe8a6eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe8a6eb
Branch: refs/heads/trunk
Commit: fbe8a6eb213ee3558be701c73c31a0da79446657
Parents: 7dc2eb9
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 20 10:19:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 09:23:39 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DebuggableThreadPoolExecutor.java | 22 ++++++++---
.../cassandra/concurrent/StageManager.java | 29 +++++++++------
.../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
.../apache/cassandra/net/MessagingService.java | 14 ++++---
.../cassandra/net/OutboundTcpConnection.java | 13 ++++++-
.../cassandra/service/MigrationManager.java | 7 +---
.../cassandra/tracing/ExpiredTraceState.java | 39 ++++++++++++++++++++
.../apache/cassandra/tracing/TraceState.java | 12 ++++--
.../org/apache/cassandra/tracing/Tracing.java | 26 +++++++------
.../service/AntiEntropyServiceTestAbstract.java | 10 ++---
11 files changed, 156 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3847d6a..593bf7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.6
+ * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 25f15ee..26441ec 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
* threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a
* stage is less busy, core thread timeout is enabled.
*/
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
{
protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -131,12 +131,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
protected void onFinalAccept(Runnable task) {}
protected void onFinalRejection(Runnable task) {}
+ public void execute(Runnable command, TraceState state)
+ {
+ super.execute(state == null ? command : new TraceSessionWrapper<Object>(command, state));
+ }
+
// execute does not call newTaskFor
@Override
public void execute(Runnable command)
{
super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
- ? new TraceSessionWrapper<Object>(command, null)
+ ? new TraceSessionWrapper<Object>(command)
: command);
}
@@ -145,7 +150,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
if (isTracing() && !(runnable instanceof TraceSessionWrapper))
{
- return new TraceSessionWrapper<T>(runnable, result);
+ return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
}
return super.newTaskFor(runnable, result);
}
@@ -256,10 +261,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
private final TraceState state;
- public TraceSessionWrapper(Runnable runnable, T result)
+ public TraceSessionWrapper(Runnable command)
{
- super(runnable, result);
- state = Tracing.instance().get();
+ this(command, null);
}
public TraceSessionWrapper(Callable<T> callable)
@@ -268,6 +272,12 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
state = Tracing.instance().get();
}
+ public TraceSessionWrapper(Runnable command, TraceState state)
+ {
+ super(command, null);
+ this.state = state;
+ }
+
private void setupContext()
{
Tracing.instance().set(state);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 287b19e..2960f22 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -38,7 +39,7 @@ public class StageManager
{
private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
- private static final EnumMap<Stage, ThreadPoolExecutor> stages = new EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
+ private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class);
public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
@@ -60,7 +61,7 @@ public class StageManager
stages.put(Stage.TRACING, tracingExecutor());
}
- private static ThreadPoolExecutor tracingExecutor()
+ private static ExecuteOnlyExecutor tracingExecutor()
{
RejectedExecutionHandler reh = new RejectedExecutionHandler()
{
@@ -78,7 +79,7 @@ public class StageManager
reh);
}
- private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
+ private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
{
return new JMXEnabledThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -88,7 +89,7 @@ public class StageManager
stage.getJmxType());
}
- private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+ private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
{
return new JMXConfigurableThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -98,7 +99,7 @@ public class StageManager
stage.getJmxType());
}
- private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
+ private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
{
return new JMXConfigurableThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -111,8 +112,8 @@ public class StageManager
/**
* Retrieve a stage from the StageManager
* @param stage name of the stage to be retrieved.
- */
- public static ThreadPoolExecutor getStage(Stage stage)
+ */
+ public static TracingAwareExecutorService getStage(Stage stage)
{
return stages.get(stage);
}
@@ -132,29 +133,35 @@ public class StageManager
* A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
* tracing stage. See CASSANDRA-1123 for background.
*/
- private static class ExecuteOnlyExecutor extends ThreadPoolExecutor
+ private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
{
public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
+ public void execute(Runnable command, TraceState state)
+ {
+ assert state == null;
+ super.execute(command);
+ }
+
@Override
public Future<?> submit(Runnable task)
{
- return super.submit(task);
+ throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Runnable task, T result)
{
- return super.submit(task, result);
+ throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Callable<T> task)
{
- return super.submit(task);
+ throw new UnsupportedOperationException();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
new file mode 100644
index 0000000..e5dcd7e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.tracing.TraceState;
+
+public interface TracingAwareExecutorService extends ExecutorService
+{
+ // we need a way to inject a TraceState directly into the Executor context without going through
+ // the global Tracing sessions; see CASSANDRA-5668
+ public void execute(Runnable command, TraceState state);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5e36f44..2964d35 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -29,7 +29,6 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
@@ -37,12 +36,14 @@ import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.*;
@@ -61,9 +62,9 @@ import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
public final class MessagingService implements MessagingServiceMBean
{
@@ -702,15 +703,16 @@ public final class MessagingService implements MessagingServiceMBean
public void receive(MessageIn message, String id, long timestamp)
{
- Tracing.instance().initializeFromMessage(message);
- Tracing.trace("Message received from {}", message.from);
+ TraceState state = Tracing.instance().initializeFromMessage(message);
+ if (state != null)
+ state.trace("Message received from {}", message.from);
message = SinkManager.processInboundMessage(message, id);
if (message == null)
return;
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
- ExecutorService stage = StageManager.getStage(message.getMessageType());
+ TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
@@ -727,7 +729,7 @@ public final class MessagingService implements MessagingServiceMBean
}
}
- stage.execute(runnable);
+ stage.execute(runnable, state);
}
public void setCallbackForTests(String messageId, CallbackInfo callback)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 32ce224..ee30d36 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -179,8 +179,17 @@ public class OutboundTcpConnection extends Thread
{
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState state = Tracing.instance().get(sessionId);
- state.trace("Sending message to {}", poolReference.endPoint());
- Tracing.instance().stopIfNonLocal(state);
+ String message = String.format("Sending message to %s", poolReference.endPoint());
+ // session may have already finished; see CASSANDRA-5668
+ if (state == null)
+ {
+ TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+ }
+ else
+ {
+ state.trace(message);
+ Tracing.instance().stopIfNonLocal(state);
+ }
}
write(qm.message, qm.id, qm.timestamp, out, targetVersion);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 127b2b8..de34785 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -23,10 +23,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
@@ -160,7 +157,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public static boolean isReadyForBootstrap()
{
- return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
+ return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
}
public void notifyCreateKeyspace(KSMetaData ksm)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
new file mode 100644
index 0000000..6b4f90b
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.tracing;
+
+import java.util.UUID;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ExpiredTraceState extends TraceState
+{
+ public ExpiredTraceState(UUID sessionId)
+ {
+ super(FBUtilities.getBroadcastAddress(), sessionId, true);
+ }
+
+ public int elapsed()
+ {
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 703c4ee..25599c4 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -81,11 +81,14 @@ public class TraceState
trace(MessageFormatter.arrayFormat(format, args).getMessage());
}
- public void trace(final String message)
+ public void trace(String message)
{
- final int elapsed = elapsed();
- final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+ TraceState.trace(sessionIdBytes, message, elapsed());
+ }
+ public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
+ {
+ final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
final String threadName = Thread.currentThread().getName();
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
@@ -96,7 +99,8 @@ public class TraceState
ColumnFamily cf = ColumnFamily.create(cfMeta);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
+ if (elapsed >= 0)
+ Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
mutation.add(cf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 17241b9..eb5bad9 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -229,31 +230,34 @@ public class Tracing
}
/**
- * Updates the threads query context from a message
+ * Determines the tracing context from a message. Does NOT set the threadlocal state.
*
- * @param message
- * The internode message
+ * @param message The internode message
*/
- public void initializeFromMessage(final MessageIn<?> message)
+ public TraceState initializeFromMessage(final MessageIn<?> message)
{
final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
- // if the message has no session context header don't do tracing
if (sessionBytes == null)
- {
- state.set(null);
- return;
- }
+ return null;
assert sessionBytes.length == 16;
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState ts = sessions.get(sessionId);
- if (ts == null)
+ if (ts != null)
+ return ts;
+
+ if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+ {
+ // received a message for a session we've already closed out. see CASSANDRA-5668
+ return new ExpiredTraceState(sessionId);
+ }
+ else
{
ts = new TraceState(message.from, sessionId, false);
sessions.put(sessionId, ts);
+ return ts;
}
- state.set(ts);
}
public static void trace(String message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index fc0b832..b80c272 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -20,10 +20,11 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,13 +43,12 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
-import static org.apache.cassandra.service.AntiEntropyService.*;
-
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import static org.apache.cassandra.service.AntiEntropyService.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -305,7 +305,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
void flushAES() throws Exception
{
- final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY);
+ final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY);
final Callable noop = new Callable<Object>()
{
public Boolean call()