You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by jb...@apache.org on 2009/09/28 20:45:10 UTC
svn commit: r819665 - in
/incubator/cassandra/trunk/src/java/org/apache/cassandra: concurrent/ db/
service/
Author: jbellis
Date: Mon Sep 28 18:45:10 2009
New Revision: 819665
URL: http://svn.apache.org/viewvc?rev=819665&view=rev
Log:
Replace DebuggableScheduledThreadPoolExecutor with non-Scheduled Executors and Timers. This allows logging exceptions from repeated tasks, which is basically impossible with STPE.
patch by jbellis; reviewed by Jun Rao for CASSANDRA-455
Removed:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableScheduledThreadPoolExecutor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/PeriodicFlushManager.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/concurrent/DebuggableThreadPoolExecutor.java Mon Sep 28 18:45:10 2009
@@ -68,31 +68,16 @@
return getTaskCount() - getCompletedTaskCount();
}
- /*
- *
- * (non-Javadoc)
- * @see java.util.concurrent.ThreadPoolExecutor#afterExecute(java.lang.Runnable, java.lang.Throwable)
- * Helps us in figuring out why sometimes the threads are getting
- * killed and replaced by new ones.
- */
public void afterExecute(Runnable r, Throwable t)
{
super.afterExecute(r,t);
- logFutureExceptions(r);
- if (t != null)
- {
- logger_.error("Error in ThreadPoolExecutor", t);
- }
- }
-
- public static void logFutureExceptions(Runnable r)
- {
+ // exceptions wrapped by FutureTask
if (r instanceof FutureTask)
{
try
{
- ((FutureTask)r).get();
+ ((FutureTask) r).get();
}
catch (InterruptedException e)
{
@@ -103,5 +88,12 @@
logger_.error("Error in executor futuretask", e);
}
}
+
+ // exceptions for non-FutureTask runnables [i.e., added via execute() instead of submit()]
+ if (t != null)
+ {
+ logger_.error("Error in ThreadPoolExecutor", t);
+ }
}
+
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Mon Sep 28 18:45:10 2009
@@ -85,6 +85,7 @@
new LinkedBlockingQueue<Runnable>(),
new NamedThreadFactory("FLUSH-WRITER-POOL"));
private static ExecutorService commitLogUpdater_ = new DebuggableThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
+ private static Timer flushTimer_ = new Timer("FLUSH-TIMER");
private final String table_;
public final String columnFamily_;
@@ -218,14 +219,27 @@
// schedule hinted handoff
if (table_.equals(Table.SYSTEM_TABLE) && columnFamily_.equals(HintedHandOffManager.HINTS_CF))
{
- HintedHandOffManager.instance().submit(this);
+ HintedHandOffManager.instance().scheduleHandoffsFor(this);
}
// schedule periodic flusher if required
- int flushPeriod = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_);
- if (flushPeriod > 0)
+ int flushPeriodMS = DatabaseDescriptor.getFlushPeriod(table_, columnFamily_) * 60 * 1000;
+ if (flushPeriodMS > 0)
{
- PeriodicFlushManager.instance().submitPeriodicFlusher(this, flushPeriod);
+ flushTimer_.schedule(new TimerTask()
+ {
+ public void run()
+ {
+ try
+ {
+ forceFlush();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }, flushPeriodMS, flushPeriodMS);
}
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Mon Sep 28 18:45:10 2009
@@ -22,20 +22,19 @@
import java.lang.management.ManagementFactory;
import java.util.List;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
import javax.management.ObjectName;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.log4j.Logger;
+
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.net.EndPoint;
import org.apache.cassandra.io.SSTableReader;
-
-import org.apache.log4j.Logger;
+import org.apache.cassandra.net.EndPoint;
public class CompactionManager implements CompactionManagerMBean
{
@@ -159,7 +158,7 @@
}
- private ScheduledExecutorService compactor_ = new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("COMPACTION-POOL"));
+ private ExecutorService compactor_ = new DebuggableThreadPoolExecutor("COMPACTION-POOL");
/**
* Call this whenever a compaction might be needed on the given columnfamily.
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/HintedHandOffManager.java Mon Sep 28 18:45:10 2009
@@ -19,17 +19,18 @@
package org.apache.cassandra.db;
import java.util.Collection;
-import java.util.concurrent.ScheduledExecutorService;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.io.IOException;
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.EndPoint;
@@ -54,8 +55,9 @@
private static HintedHandOffManager instance_;
private static Lock lock_ = new ReentrantLock();
private static Logger logger_ = Logger.getLogger(HintedHandOffManager.class);
- final static long intervalInMins_ = 60;
- private ScheduledExecutorService executor_ = new DebuggableScheduledThreadPoolExecutor(1, new NamedThreadFactory("HINTED-HANDOFF-POOL"));
+ final static long INTERVAL_IN_MS = 3600 * 1000;
+ private ExecutorService executor_ = new DebuggableThreadPoolExecutor("HINTED-HANDOFF-POOL");
+ Timer timer = new Timer("HINTED-HANDOFF-TIMER");
public static final String HINTS_CF = "HintsColumnFamily";
@@ -234,9 +236,9 @@
logger_.debug("Finished hinted handoff for endpoint " + endPoint.getHost());
}
- public void submit(final ColumnFamilyStore columnFamilyStore)
+ public void scheduleHandoffsFor(final ColumnFamilyStore columnFamilyStore)
{
- Runnable r = new Runnable()
+ final Runnable r = new Runnable()
{
public void run()
{
@@ -250,7 +252,13 @@
}
}
};
- executor_.scheduleWithFixedDelay(r, HintedHandOffManager.intervalInMins_, HintedHandOffManager.intervalInMins_, TimeUnit.MINUTES);
+ timer.schedule(new TimerTask()
+ {
+ public void run()
+ {
+ executor_.execute(r);
+ }
+ }, INTERVAL_IN_MS, INTERVAL_IN_MS);
}
/*
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java?rev=819665&r1=819664&r2=819665&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java (original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/service/StorageLoadBalancer.java Mon Sep 28 18:45:10 2009
@@ -22,16 +22,13 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
-import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.SingleThreadedStage;
import org.apache.cassandra.concurrent.StageManager;
-import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.EndPointState;
@@ -184,10 +181,7 @@
/* This map is a clone of the one above and is used for various calculations during LB operation */
private Map<EndPoint, LoadInfo> loadInfo2_ = new HashMap<EndPoint, LoadInfo>();
/* This thread pool is used for initiating load balancing operations */
- private ScheduledThreadPoolExecutor lb_ = new DebuggableScheduledThreadPoolExecutor(
- 1,
- new NamedThreadFactory("LB-OPERATIONS")
- );
+ private ExecutorService lb_ = new DebuggableThreadPoolExecutor("LB-OPERATIONS");
/* This thread pool is used by target node to leave the ring. */
private ExecutorService lbOperations_ = new DebuggableThreadPoolExecutor("LB-TARGET");