You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/02/24 16:51:02 UTC
[2/3] git commit: Add a wrapper to the Runnables executed by DSTPE to
prevent exceptions from killing the tasks. Replace ExpiringMap's timer with a
DSTPE. patch by jbellis; reviewed by slebresne for CASSANDRA-3537
Add a wrapper to the Runnables executed by DSTPE to prevent exceptions from killing the tasks. Replace ExpiringMap's timer with a DSTPE.
patch by jbellis; reviewed by slebresne for CASSANDRA-3537
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eac19fee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eac19fee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eac19fee
Branch: refs/heads/trunk
Commit: eac19fee0384fdab25ae6d4eaa065199ce04fb4d
Parents: 257d36e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Feb 24 09:24:19 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Feb 24 09:25:39 2012 -0600
----------------------------------------------------------------------
.../DebuggableScheduledThreadPoolExecutor.java | 45 ++++++++++++++-
.../concurrent/DebuggableThreadPoolExecutor.java | 29 +++++++---
.../org/apache/cassandra/net/MessagingService.java | 2 +-
.../cassandra/service/AbstractCassandraDaemon.java | 2 +-
.../org/apache/cassandra/utils/ExpiringMap.java | 38 ++++++-------
5 files changed, 84 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
index f5f1565..fcbaf67 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
@@ -26,10 +26,15 @@ import java.util.concurrent.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Like DebuggableThreadPoolExecutor, DebuggableScheduledThreadPoolExecutor always
+ * logs exceptions from the tasks it is given, even if Future.get is never called elsewhere.
+ *
+ * DebuggableScheduledThreadPoolExecutor also catches exceptions during Task execution
+ * so that they don't supress subsequent invocations of the task.
+ */
public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
{
- private static Logger logger = LoggerFactory.getLogger(DebuggableScheduledThreadPoolExecutor.class);
-
public DebuggableScheduledThreadPoolExecutor(int corePoolSize, String threadPoolName, int priority)
{
super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
@@ -40,10 +45,46 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
this(1, threadPoolName, Thread.NORM_PRIORITY);
}
+ // We need this as well as the wrapper for the benefit of non-repeating tasks
@Override
public void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r,t);
DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
}
+
+ // override scheduling to supress exceptions that would cancel future executions
+ @Override
+ public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+ {
+ return super.scheduleAtFixedRate(new UncomplainingRunnable(command), initialDelay, period, unit);
+ }
+
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+ {
+ return super.scheduleWithFixedDelay(new UncomplainingRunnable(command), initialDelay, delay, unit);
+ }
+
+ private static class UncomplainingRunnable implements Runnable
+ {
+ private final Runnable runnable;
+
+ public UncomplainingRunnable(Runnable runnable)
+ {
+ this.runnable = runnable;
+ }
+
+ public void run()
+ {
+ try
+ {
+ runnable.run();
+ }
+ catch (Throwable e)
+ {
+ DebuggableThreadPoolExecutor.handleOrLog(e);
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 60265b1..29a457a 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -114,15 +114,26 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
logExceptionsAfterExecute(r, t);
}
+ /**
+ * Send @param t and any exception wrapped by @param r to the default uncaught exception handler,
+ * or log them if none such is set up
+ */
public static void logExceptionsAfterExecute(Runnable r, Throwable t)
{
- if (t == null)
- t = extractThrowable(r);
-
- if (t != null)
+ Throwable hiddenThrowable = extractThrowable(r);
+ if (hiddenThrowable != null)
+ handleOrLog(hiddenThrowable);
+
+ // ThreadPoolExecutor will re-throw exceptions thrown by its Task (which will be seen by
+ // the default uncaught exception handler) so we only need to do anything if that handler
+ // isn't set up yet.
+ if (t != null && Thread.getDefaultUncaughtExceptionHandler() == null)
handleOrLog(t);
}
+ /**
+ * Send @param t to the default uncaught exception handler, or log it if none such is set up
+ */
public static void handleOrLog(Throwable t)
{
if (Thread.getDefaultUncaughtExceptionHandler() == null)
@@ -131,18 +142,21 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
}
- public static Throwable extractThrowable(Runnable r)
+ /**
+ * @return any exception wrapped by @param runnable, i.e., if it is a FutureTask
+ */
+ public static Throwable extractThrowable(Runnable runnable)
{
// Check for exceptions wrapped by FutureTask. We do this by calling get(), which will
// cause it to throw any saved exception.
//
// Complicating things, calling get() on a ScheduledFutureTask will block until the task
// is cancelled. Hence, the extra isDone check beforehand.
- if ((r instanceof Future<?>) && ((Future<?>) r).isDone())
+ if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone())
{
try
{
- ((Future<?>) r).get();
+ ((Future<?>) runnable).get();
}
catch (InterruptedException e)
{
@@ -160,5 +174,4 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
return null;
}
-
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0bb36fb..b0e7de9 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -543,7 +543,7 @@ public final class MessagingService implements MessagingServiceMBean
assert !StageManager.getStage(Stage.MUTATION).isShutdown();
// the important part
- callbacks.shutdown();
+ callbacks.shutdownBlocking();
// attempt to humor tests that try to stop and restart MS
try
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
index 8cc85ae..f124f9f 100644
--- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
@@ -131,7 +131,7 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
public void uncaughtException(Thread t, Throwable e)
{
exceptions.incrementAndGet();
- logger.error("Fatal exception in thread " + t, e);
+ logger.error("Exception in thread " + t, e);
for (Throwable e2 = e; e2 != null; e2 = e2.getCause())
{
// some code, like FileChannel.map, will wrap an OutOfMemoryError in another exception
http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 000af72..ff9f2da 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -18,13 +18,16 @@
package org.apache.cassandra.utils;
-import java.util.*;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import com.google.common.base.Function;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
public class ExpiringMap<K, V>
@@ -57,9 +60,10 @@ public class ExpiringMap<K, V>
}
}
+ // if we use more ExpiringMaps we may want to add multiple threads to this executor
+ private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
+
private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K, CacheableObject<V>>();
- private final Timer timer;
- private static int counter = 0;
private final long defaultExpiration;
public ExpiringMap(long defaultExpiration)
@@ -80,8 +84,7 @@ public class ExpiringMap<K, V>
throw new IllegalArgumentException("Argument specified must be a positive number");
}
- timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
- TimerTask task = new TimerTask()
+ Runnable runnable = new Runnable()
{
public void run()
{
@@ -100,25 +103,20 @@ public class ExpiringMap<K, V>
logger.trace("Expired {} entries", n);
}
};
- timer.schedule(task, defaultExpiration / 2, defaultExpiration / 2);
+ service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
}
- public void shutdown()
+ public void shutdownBlocking()
{
- shutdown = true;
- while (!cache.isEmpty())
+ service.shutdown();
+ try
{
- logger.trace("Waiting for {} entries before shutting down ExpiringMap", cache.size());
- try
- {
- Thread.sleep(100);
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
+ service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
}
- timer.cancel();
}
public void reset()