You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2014/04/16 00:50:28 UTC
svn commit: r1587744 -
/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
Author: jdcryans
Date: Tue Apr 15 22:50:27 2014
New Revision: 1587744
URL: http://svn.apache.org/r1587744
Log:
HBASE-10312 Flooding the cluster with administrative actions leads to collapse
Modified:
hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java?rev=1587744&r1=1587743&r2=1587744&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java Tue Apr 15 22:50:27 2014
@@ -27,6 +27,7 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -51,8 +52,7 @@ public class TaskMonitor {
static final int MAX_TASKS = 1000;
private static TaskMonitor instance;
- private List<TaskAndWeakRefPair> tasks =
- Lists.newArrayList();
+ private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS);
/**
* Get singleton instance.
@@ -74,9 +74,6 @@ public class TaskMonitor {
new PassthroughInvocationHandler<MonitoredTask>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair);
- if (tasks.size() > MAX_TASKS) {
- purgeExpiredTasks();
- }
return proxy;
}
@@ -89,15 +86,10 @@ public class TaskMonitor {
new PassthroughInvocationHandler<MonitoredRPCHandler>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
tasks.add(pair);
- if (tasks.size() > MAX_TASKS) {
- purgeExpiredTasks();
- }
return proxy;
}
private synchronized void purgeExpiredTasks() {
- int size = 0;
-
for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
it.hasNext();) {
TaskAndWeakRefPair pair = it.next();
@@ -114,15 +106,8 @@ public class TaskMonitor {
if (canPurge(stat)) {
it.remove();
- } else {
- size++;
}
}
-
- if (size > MAX_TASKS) {
- LOG.warn("Too many actions in action monitor! Purging some.");
- tasks = tasks.subList(size - MAX_TASKS, size);
- }
}
/**
@@ -133,7 +118,9 @@ public class TaskMonitor {
public synchronized List<MonitoredTask> getTasks() {
purgeExpiredTasks();
ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size());
- for (TaskAndWeakRefPair pair : tasks) {
+ for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
+ it.hasNext();) {
+ TaskAndWeakRefPair pair = it.next();
MonitoredTask t = pair.get();
ret.add(t.clone());
}