You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2014/04/16 00:48:58 UTC

svn commit: r1587743 - /hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java

Author: jdcryans
Date: Tue Apr 15 22:48:57 2014
New Revision: 1587743

URL: http://svn.apache.org/r1587743
Log:
HBASE-10312 Flooding the cluster with administrative actions leads to collapse

Modified:
    hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java

Modified: hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1587743&r1=1587742&r2=1587743&view=diff
==============================================================================
--- hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (original)
+++ hbase/branches/0.98/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Tue Apr 15 22:48:57 2014
@@ -27,6 +27,7 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -51,8 +52,7 @@ public class TaskMonitor {
   static final int MAX_TASKS = 1000;
   
   private static TaskMonitor instance;
-  private List<TaskAndWeakRefPair> tasks =
-    Lists.newArrayList();
+  private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS);
 
   /**
    * Get singleton instance.
@@ -74,9 +74,6 @@ public class TaskMonitor {
         new PassthroughInvocationHandler<MonitoredTask>(stat));
     TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
     tasks.add(pair);
-    if (tasks.size() > MAX_TASKS) {
-      purgeExpiredTasks();
-    }
     return proxy;
   }
 
@@ -89,15 +86,10 @@ public class TaskMonitor {
         new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
     TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
     tasks.add(pair);
-    if (tasks.size() > MAX_TASKS) {
-      purgeExpiredTasks();
-    }
     return proxy;
   }
 
   private synchronized void purgeExpiredTasks() {
-    int size = 0;
-    
     for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
          it.hasNext();) {
       TaskAndWeakRefPair pair = it.next();
@@ -114,15 +106,8 @@ public class TaskMonitor {
       
       if (canPurge(stat)) {
         it.remove();
-      } else {
-        size++;
       }
     }
-    
-    if (size > MAX_TASKS) {
-      LOG.warn("Too many actions in action monitor! Purging some.");
-      tasks = tasks.subList(size - MAX_TASKS, size);
-    }
   }
 
   /**
@@ -133,7 +118,9 @@ public class TaskMonitor {
   public synchronized List<MonitoredTask> getTasks() {
     purgeExpiredTasks();
     ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
-    for (TaskAndWeakRefPair pair : tasks) {
+    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
+         it.hasNext();) {
+      TaskAndWeakRefPair pair = it.next();
       MonitoredTask t = pair.get();
       ret.add(t.clone());
     }