You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/28 20:45:10 UTC

svn commit: r819665 - in /incubator/cassandra/trunk/src/java/org/apache/cassandra: concurrent/ db/ service/

Author: jbellis
Date: Mon Sep 28 18:45:10 2009
New Revision: 819665

URL: http://svn.apache.org/viewvc?rev=819665&view=rev
Log:
Replace DebuggableScheduledThreadPoolExecutor with non-Scheduled Executors and Timers.  This allows logging exceptions from repeated tasks, which is basically impossible with STPE.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-455

Removed:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
Modified:
    incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Mon Sep 28 18:45:10 2009
@@ -68,31 +68,16 @@
         return getTaskCount() - getCompletedTaskCount();
     }
 
-    /*
-     * 
-     *  (non-Javadoc)
-     * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
-     * Helps us in figuring out why sometimes the threads are getting 
-     * killed and replaced by new ones.
-     */
     public void afterExecute(Runnable r, Throwable t)
     {
         super.afterExecute(r,t);
 
-        logFutureExceptions(r);
-        if (t != null)
-        {
-            logger_.error("Error in ThreadPoolExecutor", t);
-        }
-    }
-
-    public static void logFutureExceptions(Runnable r)
-    {
+        // exceptions wrapped by FutureTask
         if (r instanceof FutureTask)
         {
             try
             {
-                ((FutureTask)r).get();
+                ((FutureTask) r).get();
             }
             catch (InterruptedException e)
             {
@@ -103,5 +88,12 @@
                 logger_.error("Error in executor futuretask", e);
             }
         }
+
+        // exceptions for non-FutureTask runnables [i.e., added via execute() instead of submit()]
+        if (t != null)
+        {
+            logger_.error("Error in ThreadPoolExecutor", t);
+        }
     }
+
 }

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Sep 28 18:45:10 2009
@@ -85,6 +85,7 @@
                                                new LinkedBlockingQueue<Runnable>(),
                                                new NamedThreadFactory("FLUSH-WRITER-POOL"));
     private static ExecutorService commitLogUpdater_ = new DebuggableThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+    private static Timer flushTimer_ = new Timer("FLUSH-TIMER");
 
     private final String table_;
     public final String columnFamily_;
@@ -218,14 +219,27 @@
         // schedule hinted handoff
         if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(HintedHandOffManager.HINTS_CF))
         {
-            HintedHandOffManager.instance().submit(this);
+            HintedHandOffManager.instance().scheduleHandoffsFor(this);
         }
 
         // schedule periodic flusher if required
-        int flushPeriod = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_);
-        if (flushPeriod > 0)
+        int flushPeriodMS = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_) * 60 * 1000;
+        if (flushPeriodMS > 0)
         {
-            PeriodicFlushManager.instance().submitPeriodicFlusher(this, flushPeriod);
+            flushTimer_.schedule(new TimerTask()
+            {
+                public void run()
+                {
+                    try
+                    {
+                        forceFlush();
+                    }
+                    catch (IOException e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }, flushPeriodMS, flushPeriodMS);
         }
     }
 

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon Sep 28 18:45:10 2009
@@ -22,20 +22,19 @@
 import java.lang.management.ManagementFactory;
 import java.util.List;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
 import org.apache.cassandra.io.SSTableReader;
-
-import org.apache.log4j.Logger;
+import org.apache.cassandra.net.EndPoint;
 
 public class CompactionManager implements CompactionManagerMBean
 {
@@ -159,7 +158,7 @@
     }
     
     
-    private ScheduledExecutorService compactor_ = new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("COMPACTION-POOL"));
+    private ExecutorService compactor_ = new DebuggableThreadPoolExecutor("COMPACTION-POOL");
 
     /**
      * Call this whenever a compaction might be needed on the given columnfamily.

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon Sep 28 18:45:10 2009
@@ -19,17 +19,18 @@
 package org.apache.cassandra.db;
 
 import java.util.Collection;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.io.IOException;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.EndPoint;
@@ -54,8 +55,9 @@
     private static HintedHandOffManager instance_;
     private static Lock lock_ = new ReentrantLock();
     private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
-    final static long intervalInMins_ = 60;
-    private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("HINTED-HANDOFF-POOL"));
+    final static long INTERVAL_IN_MS = 3600 * 1000;
+    private ExecutorService executor_ = new DebuggableThreadPoolExecutor("HINTED-HANDOFF-POOL");
+    Timer timer = new Timer("HINTED-HANDOFF-TIMER");
     public static final String HINTS_CF = "HintsColumnFamily";
 
 
@@ -234,9 +236,9 @@
           logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
     }
 
-    public void submit(final ColumnFamilyStore columnFamilyStore)
+    public void scheduleHandoffsFor(final ColumnFamilyStore columnFamilyStore)
     {
-        Runnable r = new Runnable()
+        final Runnable r = new Runnable()
         {
             public void run()
             {
@@ -250,7 +252,13 @@
                 }
             }
         };
-    	executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
+        timer.schedule(new TimerTask()
+        {
+            public void run()
+            {
+                executor_.execute(r);
+            }
+        }, INTERVAL_IN_MS, INTERVAL_IN_MS);
     }
 
     /*

Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Sep 28 18:45:10 2009
@@ -22,16 +22,13 @@
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.log4j.Logger;
 
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
 import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.SingleThreadedStage;
 import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.ApplicationState;
 import org.apache.cassandra.gms.EndPointState;
@@ -184,10 +181,7 @@
     /* This map is a clone of the one above and is used for various calculations during LB operation */
     private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
     /* This thread pool is used for initiating load balancing operations */
-    private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
-            1,
-            new NamedThreadFactory("LB-OPERATIONS")
-            );
+    private ExecutorService lb_ = new DebuggableThreadPoolExecutor("LB-OPERATIONS");
     /* This thread pool is used by target node to leave the ring. */
     private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");