You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2012/02/24 16:51:02 UTC

[2/3] git commit: Add a wrapper to the Runnables executed by DSTPE to prevent exceptions from killing the tasks. Replace ExpiringMap's timer with a DSTPE. patch by jbellis; reviewed by slebresne for CASSANDRA-3537

Add a wrapper to the Runnables executed by DSTPE to prevent exceptions from killing the tasks. Replace ExpiringMap's timer with a DSTPE.
patch by jbellis; reviewed by slebresne for CASSANDRA-3537


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/eac19fee
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/eac19fee
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/eac19fee

Branch: refs/heads/trunk
Commit: eac19fee0384fdab25ae6d4eaa065199ce04fb4d
Parents: 257d36e
Author: Jonathan Ellis <jb...@apache.org>
Authored: Fri Feb 24 09:24:19 2012 -0600
Committer: Jonathan Ellis <jb...@apache.org>
Committed: Fri Feb 24 09:25:39 2012 -0600

----------------------------------------------------------------------
 .../DebuggableScheduledThreadPoolExecutor.java     |   45 ++++++++++++++-
 .../concurrent/DebuggableThreadPoolExecutor.java   |   29 +++++++---
 .../org/apache/cassandra/net/MessagingService.java |    2 +-
 .../cassandra/service/AbstractCassandraDaemon.java |    2 +-
 .../org/apache/cassandra/utils/ExpiringMap.java    |   38 ++++++-------
 5 files changed, 84 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
index f5f1565..fcbaf67 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
@@ -26,10 +26,15 @@ import java.util.concurrent.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Like DebuggableThreadPoolExecutor, DebuggableScheduledThreadPoolExecutor always
+ * logs exceptions from the tasks it is given, even if Future.get is never called elsewhere.
+ *
+ * DebuggableScheduledThreadPoolExecutor also catches exceptions during Task execution
+ * so that they don't supress subsequent invocations of the task.
+ */
 public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor
 {
-    private static Logger logger = LoggerFactory.getLogger(DebuggableScheduledThreadPoolExecutor.class);
-
     public DebuggableScheduledThreadPoolExecutor(int corePoolSize, String threadPoolName, int priority)
     {
         super(corePoolSize, new NamedThreadFactory(threadPoolName, priority));
@@ -40,10 +45,46 @@ public class DebuggableScheduledThreadPoolExecutor extends ScheduledThreadPoolEx
         this(1, threadPoolName, Thread.NORM_PRIORITY);
     }
 
+    // We need this as well as the wrapper for the benefit of non-repeating tasks
     @Override
     public void afterExecute(Runnable r, Throwable t)
     {
         super.afterExecute(r,t);
         DebuggableThreadPoolExecutor.logExceptionsAfterExecute(r, t);
     }
+
+    // override scheduling to supress exceptions that would cancel future executions
+    @Override
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
+    {
+        return super.scheduleAtFixedRate(new UncomplainingRunnable(command), initialDelay, period, unit);
+    }
+
+    @Override
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
+    {
+        return super.scheduleWithFixedDelay(new UncomplainingRunnable(command), initialDelay, delay, unit);
+    }
+
+    private static class UncomplainingRunnable implements Runnable
+    {
+        private final Runnable runnable;
+
+        public UncomplainingRunnable(Runnable runnable)
+        {
+            this.runnable = runnable;
+        }
+
+        public void run()
+        {
+            try
+            {
+                runnable.run();
+            }
+            catch (Throwable e)
+            {
+                DebuggableThreadPoolExecutor.handleOrLog(e);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
index 60265b1..29a457a 100644
--- a/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
+++ b/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
@@ -114,15 +114,26 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
         logExceptionsAfterExecute(r, t);
     }
 
+    /**
+     * Send @param t and any exception wrapped by @param r to the default uncaught exception handler,
+     * or log them if none such is set up
+     */
     public static void logExceptionsAfterExecute(Runnable r, Throwable t)
     {
-        if (t == null)
-            t = extractThrowable(r);
-
-        if (t != null)
+        Throwable hiddenThrowable = extractThrowable(r);
+        if (hiddenThrowable != null)
+            handleOrLog(hiddenThrowable);
+
+        // ThreadPoolExecutor will re-throw exceptions thrown by its Task (which will be seen by
+        // the default uncaught exception handler) so we only need to do anything if that handler
+        // isn't set up yet.
+        if (t != null && Thread.getDefaultUncaughtExceptionHandler() == null)
             handleOrLog(t);
     }
 
+    /**
+     * Send @param t to the default uncaught exception handler, or log it if none such is set up
+     */
     public static void handleOrLog(Throwable t)
     {
         if (Thread.getDefaultUncaughtExceptionHandler() == null)
@@ -131,18 +142,21 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
             Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
     }
 
-    public static Throwable extractThrowable(Runnable r)
+    /**
+     * @return any exception wrapped by @param runnable, i.e., if it is a FutureTask
+     */
+    public static Throwable extractThrowable(Runnable runnable)
     {
         // Check for exceptions wrapped by FutureTask.  We do this by calling get(), which will
         // cause it to throw any saved exception.
         //
         // Complicating things, calling get() on a ScheduledFutureTask will block until the task
         // is cancelled.  Hence, the extra isDone check beforehand.
-        if ((r instanceof Future<?>) && ((Future<?>) r).isDone())
+        if ((runnable instanceof Future<?>) && ((Future<?>) runnable).isDone())
         {
             try
             {
-                ((Future<?>) r).get();
+                ((Future<?>) runnable).get();
             }
             catch (InterruptedException e)
             {
@@ -160,5 +174,4 @@ public class DebuggableThreadPoolExecutor extends ThreadPoolExecutor
 
         return null;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java
index 0bb36fb..b0e7de9 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -543,7 +543,7 @@ public final class MessagingService implements MessagingServiceMBean
         assert !StageManager.getStage(Stage.MUTATION).isShutdown();
 
         // the important part
-        callbacks.shutdown();
+        callbacks.shutdownBlocking();
 
         // attempt to humor tests that try to stop and restart MS
         try

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
index 8cc85ae..f124f9f 100644
--- a/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/AbstractCassandraDaemon.java
@@ -131,7 +131,7 @@ public abstract class AbstractCassandraDaemon implements CassandraDaemon
             public void uncaughtException(Thread t, Throwable e)
             {
                 exceptions.incrementAndGet();
-                logger.error("Fatal exception in thread " + t, e);
+                logger.error("Exception in thread " + t, e);
                 for (Throwable e2 = e; e2 != null; e2 = e2.getCause())
                 {
                     // some code, like FileChannel.map, will wrap an OutOfMemoryError in another exception

http://git-wip-us.apache.org/repos/asf/cassandra/blob/eac19fee/src/java/org/apache/cassandra/utils/ExpiringMap.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/ExpiringMap.java b/src/java/org/apache/cassandra/utils/ExpiringMap.java
index 000af72..ff9f2da 100644
--- a/src/java/org/apache/cassandra/utils/ExpiringMap.java
+++ b/src/java/org/apache/cassandra/utils/ExpiringMap.java
@@ -18,13 +18,16 @@
 
 package org.apache.cassandra.utils;
 
-import java.util.*;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.base.Function;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
 
 public class ExpiringMap<K, V>
@@ -57,9 +60,10 @@ public class ExpiringMap<K, V>
         }
     }
 
+    // if we use more ExpiringMaps we may want to add multiple threads to this executor
+    private static final ScheduledExecutorService service = new DebuggableScheduledThreadPoolExecutor("EXPIRING-MAP-REAPER");
+
     private final NonBlockingHashMap<K, CacheableObject<V>> cache = new NonBlockingHashMap<K, CacheableObject<V>>();
-    private final Timer timer;
-    private static int counter = 0;
     private final long defaultExpiration;
 
     public ExpiringMap(long defaultExpiration)
@@ -80,8 +84,7 @@ public class ExpiringMap<K, V>
             throw new IllegalArgumentException("Argument specified must be a positive number");
         }
 
-        timer = new Timer("EXPIRING-MAP-TIMER-" + (++counter), true);
-        TimerTask task = new TimerTask()
+        Runnable runnable = new Runnable()
         {
             public void run()
             {
@@ -100,25 +103,20 @@ public class ExpiringMap<K, V>
                 logger.trace("Expired {} entries", n);
             }
         };
-        timer.schedule(task, defaultExpiration / 2, defaultExpiration / 2);
+        service.scheduleWithFixedDelay(runnable, defaultExpiration / 2, defaultExpiration / 2, TimeUnit.MILLISECONDS);
     }
 
-    public void shutdown()
+    public void shutdownBlocking()
     {
-        shutdown = true;
-        while (!cache.isEmpty())
+        service.shutdown();
+        try
         {
-            logger.trace("Waiting for {} entries before shutting down ExpiringMap", cache.size());
-            try
-            {
-                Thread.sleep(100);
-            }
-            catch (InterruptedException e)
-            {
-                throw new AssertionError(e);
-            }
+            service.awaitTermination(defaultExpiration * 2, TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            throw new AssertionError(e);
         }
-        timer.cancel();
     }
 
     public void reset()