You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2013/06/21 16:27:52 UTC
[2/3] git commit: Fix tracing when operation completesbefore all
responses arrive patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Fix tracing when operation completesbefore all responses arrive
patch by jbellis; reviewed by slebresne for CASSANDRA-5668
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/fbe8a6eb
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/fbe8a6eb
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/fbe8a6eb
Branch: refs/heads/trunk
Commit: fbe8a6eb213ee3558be701c73c31a0da79446657
Parents: 7dc2eb9
Author: Jonathan Ellis <jb...@apache.org>
Authored: Thu Jun 20 10:19:13 2013 -0500
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Jun 21 09:23:39 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../DebuggableThreadPoolExecutor.java | 22 ++++++++---
.../cassandra/concurrent/StageManager.java | 29 +++++++++------
.../concurrent/TracingAwareExecutorService.java | 33 +++++++++++++++++
.../apache/cassandra/net/MessagingService.java | 14 ++++---
.../cassandra/net/OutboundTcpConnection.java | 13 ++++++-
.../cassandra/service/MigrationManager.java | 7 +---
.../cassandra/tracing/ExpiredTraceState.java | 39 ++++++++++++++++++++
.../apache/cassandra/tracing/TraceState.java | 12 ++++--
.../org/apache/cassandra/tracing/Tracing.java | 26 +++++++------
.../service/AntiEntropyServiceTestAbstract.java | 10 ++---
11 files changed, 156 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 3847d6a..593bf7c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
1.2.6
+ * Fix tracing when operation completes before all responses arrive (CASSANDRA-5668)
* Fix cross-DC mutation forwarding (CASSANDRA-5632)
* Scale hinted_handoff_throttle_in_kb to cluster size (CASSANDRA-5272)
* (Hadoop) Add CQL3 input/output formats (CASSANDRA-4421, 5622)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 25f15ee..26441ec 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -44,7 +44,7 @@ import static org.apache.cassandra.tracing.Tracing.isTracing;
* threads and the queue is full, we want the enqueuer to block. But to allow the number of threads to drop if a
* stage is less busy, core thread timeout is enabled.
*/
-public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
+public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
{
protected static final Logger logger = LoggerFactory.getLogger(DebuggableThreadPoolExecutor.class);
public static final RejectedExecutionHandler blockingExecutionHandler = new RejectedExecutionHandler()
@@ -131,12 +131,17 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
protected void onFinalAccept(Runnable task) {}
protected void onFinalRejection(Runnable task) {}
+ public void execute(Runnable command, TraceState state)
+ {
+ super.execute(state == null ? command : new TraceSessionWrapper<Object>(command, state));
+ }
+
// execute does not call newTaskFor
@Override
public void execute(Runnable command)
{
super.execute(isTracing() && !(command instanceof TraceSessionWrapper)
- ? new TraceSessionWrapper<Object>(command, null)
+ ? new TraceSessionWrapper<Object>(command)
: command);
}
@@ -145,7 +150,7 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
if (isTracing() && !(runnable instanceof TraceSessionWrapper))
{
- return new TraceSessionWrapper<T>(runnable, result);
+ return new TraceSessionWrapper<T>(Executors.callable(runnable, result));
}
return super.newTaskFor(runnable, result);
}
@@ -256,10 +261,9 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
{
private final TraceState state;
- public TraceSessionWrapper(Runnable runnable, T result)
+ public TraceSessionWrapper(Runnable command)
{
- super(runnable, result);
- state = Tracing.instance().get();
+ this(command, null);
}
public TraceSessionWrapper(Callable<T> callable)
@@ -268,6 +272,12 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
state = Tracing.instance().get();
}
+ public TraceSessionWrapper(Runnable command, TraceState state)
+ {
+ super(command, null);
+ this.state = state;
+ }
+
private void setupContext()
{
Tracing.instance().set(state);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/StageManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/StageManager.java b/src/java/org/apache/cassandra/concurrent/StageManager.java
index 287b19e..2960f22 100644
--- a/src/java/org/apache/cassandra/concurrent/StageManager.java
+++ b/src/java/org/apache/cassandra/concurrent/StageManager.java
@@ -24,6 +24,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.utils.FBUtilities;
import static org.apache.cassandra.config.DatabaseDescriptor.*;
@@ -38,7 +39,7 @@ public class StageManager
{
private static final Logger logger = LoggerFactory.getLogger(StageManager.class);
- private static final EnumMap<Stage, ThreadPoolExecutor> stages = new EnumMap<Stage, ThreadPoolExecutor>(Stage.class);
+ private static final EnumMap<Stage, TracingAwareExecutorService> stages = new EnumMap<Stage, TracingAwareExecutorService>(Stage.class);
public static final long KEEPALIVE = 60; // seconds to keep "extra" threads alive for when idle
@@ -60,7 +61,7 @@ public class StageManager
stages.put(Stage.TRACING, tracingExecutor());
}
- private static ThreadPoolExecutor tracingExecutor()
+ private static ExecuteOnlyExecutor tracingExecutor()
{
RejectedExecutionHandler reh = new RejectedExecutionHandler()
{
@@ -78,7 +79,7 @@ public class StageManager
reh);
}
- private static ThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
+ private static JMXEnabledThreadPoolExecutor multiThreadedStage(Stage stage, int numThreads)
{
return new JMXEnabledThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -88,7 +89,7 @@ public class StageManager
stage.getJmxType());
}
- private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
+ private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads)
{
return new JMXConfigurableThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -98,7 +99,7 @@ public class StageManager
stage.getJmxType());
}
- private static ThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
+ private static JMXConfigurableThreadPoolExecutor multiThreadedConfigurableStage(Stage stage, int numThreads, int maxTasksBeforeBlock)
{
return new JMXConfigurableThreadPoolExecutor(numThreads,
KEEPALIVE,
@@ -111,8 +112,8 @@ public class StageManager
/**
* Retrieve a stage from the StageManager
* @param stage name of the stage to be retrieved.
- */
- public static ThreadPoolExecutor getStage(Stage stage)
+ */
+ public static TracingAwareExecutorService getStage(Stage stage)
{
return stages.get(stage);
}
@@ -132,29 +133,35 @@ public class StageManager
* A TPE that disallows submit so that we don't need to worry about unwrapping exceptions on the
* tracing stage. See CASSANDRA-1123 for background.
*/
- private static class ExecuteOnlyExecutor extends ThreadPoolExecutor
+ private static class ExecuteOnlyExecutor extends ThreadPoolExecutor implements TracingAwareExecutorService
{
public ExecuteOnlyExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
{
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
+ public void execute(Runnable command, TraceState state)
+ {
+ assert state == null;
+ super.execute(command);
+ }
+
@Override
public Future<?> submit(Runnable task)
{
- return super.submit(task);
+ throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Runnable task, T result)
{
- return super.submit(task, result);
+ throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Callable<T> task)
{
- return super.submit(task);
+ throw new UnsupportedOperationException();
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
new file mode 100644
index 0000000..e5dcd7e
--- /dev/null
+++ b/src/java/org/apache/cassandra/concurrent/TracingAwareExecutorService.java
@@ -0,0 +1,33 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.concurrent;
+
+import java.util.concurrent.ExecutorService;
+
+import org.apache.cassandra.tracing.TraceState;
+
+public interface TracingAwareExecutorService extends ExecutorService
+{
+ // we need a way to inject a TraceState directly into the Executor context without going through
+ // the global Tracing sessions; see CASSANDRA-5668
+ public void execute(Runnable command, TraceState state);
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 5e36f44..2964d35 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -29,7 +29,6 @@ import java.nio.channels.ClosedChannelException;
import java.nio.channels.ServerSocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.MBeanServer;
@@ -37,12 +36,14 @@ import javax.management.ObjectName;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
+import org.cliffc.high_scale_lib.NonBlockingHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.concurrent.TracingAwareExecutorService;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.EncryptionOptions.ServerEncryptionOptions;
import org.apache.cassandra.db.*;
@@ -61,9 +62,9 @@ import org.apache.cassandra.security.SSLFactory;
import org.apache.cassandra.service.*;
import org.apache.cassandra.streaming.*;
import org.apache.cassandra.streaming.compress.CompressedFileStreamTask;
+import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.*;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
public final class MessagingService implements MessagingServiceMBean
{
@@ -702,15 +703,16 @@ public final class MessagingService implements MessagingServiceMBean
public void receive(MessageIn message, String id, long timestamp)
{
- Tracing.instance().initializeFromMessage(message);
- Tracing.trace("Message received from {}", message.from);
+ TraceState state = Tracing.instance().initializeFromMessage(message);
+ if (state != null)
+ state.trace("Message received from {}", message.from);
message = SinkManager.processInboundMessage(message, id);
if (message == null)
return;
Runnable runnable = new MessageDeliveryTask(message, id, timestamp);
- ExecutorService stage = StageManager.getStage(message.getMessageType());
+ TracingAwareExecutorService stage = StageManager.getStage(message.getMessageType());
assert stage != null : "No stage for message type " + message.verb;
if (message.verb == Verb.REQUEST_RESPONSE && PBSPredictor.instance().isLoggingEnabled())
@@ -727,7 +729,7 @@ public final class MessagingService implements MessagingServiceMBean
}
}
- stage.execute(runnable);
+ stage.execute(runnable, state);
}
public void setCallbackForTests(String messageId, CallbackInfo callback)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
index 32ce224..ee30d36 100644
--- a/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
+++ b/src/java/org/apache/cassandra/net/OutboundTcpConnection.java
@@ -179,8 +179,17 @@ public class OutboundTcpConnection extends Thread
{
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState state = Tracing.instance().get(sessionId);
- state.trace("Sending message to {}", poolReference.endPoint());
- Tracing.instance().stopIfNonLocal(state);
+ String message = String.format("Sending message to %s", poolReference.endPoint());
+ // session may have already finished; see CASSANDRA-5668
+ if (state == null)
+ {
+ TraceState.trace(ByteBuffer.wrap(sessionBytes), message, -1);
+ }
+ else
+ {
+ state.trace(message);
+ Tracing.instance().stopIfNonLocal(state);
+ }
}
write(qm.message, qm.id, qm.timestamp, out, targetVersion);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/service/MigrationManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/MigrationManager.java b/src/java/org/apache/cassandra/service/MigrationManager.java
index 127b2b8..de34785 100644
--- a/src/java/org/apache/cassandra/service/MigrationManager.java
+++ b/src/java/org/apache/cassandra/service/MigrationManager.java
@@ -23,10 +23,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
@@ -160,7 +157,7 @@ public class MigrationManager implements IEndpointStateChangeSubscriber
public static boolean isReadyForBootstrap()
{
- return StageManager.getStage(Stage.MIGRATION).getActiveCount() == 0;
+ return ((ThreadPoolExecutor) StageManager.getStage(Stage.MIGRATION)).getActiveCount() == 0;
}
public void notifyCreateKeyspace(KSMetaData ksm)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
new file mode 100644
index 0000000..6b4f90b
--- /dev/null
+++ b/src/java/org/apache/cassandra/tracing/ExpiredTraceState.java
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.cassandra.tracing;
+
+import java.util.UUID;
+
+import org.apache.cassandra.utils.FBUtilities;
+
+public class ExpiredTraceState extends TraceState
+{
+ public ExpiredTraceState(UUID sessionId)
+ {
+ super(FBUtilities.getBroadcastAddress(), sessionId, true);
+ }
+
+ public int elapsed()
+ {
+ return -1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/TraceState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/TraceState.java b/src/java/org/apache/cassandra/tracing/TraceState.java
index 703c4ee..25599c4 100644
--- a/src/java/org/apache/cassandra/tracing/TraceState.java
+++ b/src/java/org/apache/cassandra/tracing/TraceState.java
@@ -81,11 +81,14 @@ public class TraceState
trace(MessageFormatter.arrayFormat(format, args).getMessage());
}
- public void trace(final String message)
+ public void trace(String message)
{
- final int elapsed = elapsed();
- final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
+ TraceState.trace(sessionIdBytes, message, elapsed());
+ }
+ public static void trace(final ByteBuffer sessionIdBytes, final String message, final int elapsed)
+ {
+ final ByteBuffer eventId = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes());
final String threadName = Thread.currentThread().getName();
StageManager.getStage(Stage.TRACING).execute(new WrappedRunnable()
@@ -96,7 +99,8 @@ public class TraceState
ColumnFamily cf = ColumnFamily.create(cfMeta);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source")), FBUtilities.getBroadcastAddress());
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("thread")), threadName);
- Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
+ if (elapsed >= 0)
+ Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("source_elapsed")), elapsed);
Tracing.addColumn(cf, Tracing.buildName(cfMeta, eventId, ByteBufferUtil.bytes("activity")), message);
RowMutation mutation = new RowMutation(Tracing.TRACE_KS, sessionIdBytes);
mutation.add(cf);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/src/java/org/apache/cassandra/tracing/Tracing.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/tracing/Tracing.java b/src/java/org/apache/cassandra/tracing/Tracing.java
index 17241b9..eb5bad9 100644
--- a/src/java/org/apache/cassandra/tracing/Tracing.java
+++ b/src/java/org/apache/cassandra/tracing/Tracing.java
@@ -39,6 +39,7 @@ import org.apache.cassandra.db.ExpiringColumn;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.marshal.TimeUUIDType;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageProxy;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
@@ -229,31 +230,34 @@ public class Tracing
}
/**
- * Updates the threads query context from a message
+ * Determines the tracing context from a message. Does NOT set the threadlocal state.
*
- * @param message
- * The internode message
+ * @param message The internode message
*/
- public void initializeFromMessage(final MessageIn<?> message)
+ public TraceState initializeFromMessage(final MessageIn<?> message)
{
final byte[] sessionBytes = message.parameters.get(Tracing.TRACE_HEADER);
- // if the message has no session context header don't do tracing
if (sessionBytes == null)
- {
- state.set(null);
- return;
- }
+ return null;
assert sessionBytes.length == 16;
UUID sessionId = UUIDGen.getUUID(ByteBuffer.wrap(sessionBytes));
TraceState ts = sessions.get(sessionId);
- if (ts == null)
+ if (ts != null)
+ return ts;
+
+ if (message.verb == MessagingService.Verb.REQUEST_RESPONSE)
+ {
+ // received a message for a session we've already closed out. see CASSANDRA-5668
+ return new ExpiredTraceState(sessionId);
+ }
+ else
{
ts = new TraceState(message.from, sessionId, false);
sessions.put(sessionId, ts);
+ return ts;
}
- state.set(ts);
}
public static void trace(String message)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/fbe8a6eb/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
index fc0b832..b80c272 100644
--- a/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
+++ b/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java
@@ -20,10 +20,11 @@ package org.apache.cassandra.service;
import java.net.InetAddress;
import java.util.*;
-import java.util.concurrent.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -42,13 +43,12 @@ import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.Gossiper;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.TokenMetadata;
-import static org.apache.cassandra.service.AntiEntropyService.*;
-
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
+import static org.apache.cassandra.service.AntiEntropyService.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -305,7 +305,7 @@ public abstract class AntiEntropyServiceTestAbstract extends SchemaLoader
void flushAES() throws Exception
{
- final ThreadPoolExecutor stage = StageManager.getStage(Stage.ANTI_ENTROPY);
+ final ExecutorService stage = StageManager.getStage(Stage.ANTI_ENTROPY);
final Callable noop = new Callable<Object>()
{
public Boolean call()