You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/10 01:20:55 UTC
[08/11] hbase git commit: HBASE-18248 Warn if monitored RPC task has
been tied up beyond a configurable threshold
HBASE-18248 Warn if monitored RPC task has been tied up beyond a configurable threshold
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d0941127
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d0941127
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d0941127
Branch: refs/heads/master
Commit: d0941127d424bc76688aec7952388f858d917b14
Parents: 794a3b1
Author: Andrew Purtell <ap...@apache.org>
Authored: Wed Aug 9 18:11:28 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed Aug 9 18:16:38 2017 -0700
----------------------------------------------------------------------
.../monitoring/MonitoredRPCHandlerImpl.java | 8 +-
.../hadoop/hbase/monitoring/MonitoredTask.java | 2 +
.../hbase/monitoring/MonitoredTaskImpl.java | 16 +++-
.../hadoop/hbase/monitoring/TaskMonitor.java | 88 +++++++++++++++++---
.../hbase/monitoring/TestTaskMonitor.java | 44 +++++++---
5 files changed, 130 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
index b49df28..3ebe3b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredRPCHandlerImpl.java
@@ -251,6 +251,12 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl
if (getState() != State.RUNNING) {
return super.toString();
}
- return super.toString() + ", rpcMethod=" + getRPC();
+ return super.toString()
+ + ", queuetimems=" + getRPCQueueTime()
+ + ", starttimems=" + getRPCStartTime()
+ + ", clientaddress=" + clientAddress
+ + ", remoteport=" + remotePort
+ + ", packetlength=" + getRPCPacketLength()
+ + ", rpcMethod=" + getRPC();
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
index ff3667b..48fba1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTask.java
@@ -39,6 +39,7 @@ public interface MonitoredTask extends Cloneable {
State getState();
long getStateTime();
long getCompletionTimestamp();
+ long getWarnTime();
void markComplete(String msg);
void pause(String msg);
@@ -48,6 +49,7 @@ public interface MonitoredTask extends Cloneable {
void setStatus(String status);
void setDescription(String description);
+ void setWarnTime(final long t);
/**
* Explicitly mark this status as able to be cleaned up,
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
index dda77ac..754e3d6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/MonitoredTaskImpl.java
@@ -30,7 +30,8 @@ class MonitoredTaskImpl implements MonitoredTask {
private long startTime;
private long statusTime;
private long stateTime;
-
+ private long warnTime;
+
private volatile String status;
private volatile String description;
@@ -42,6 +43,7 @@ class MonitoredTaskImpl implements MonitoredTask {
startTime = System.currentTimeMillis();
statusTime = startTime;
stateTime = startTime;
+ warnTime = startTime;
}
@Override
@@ -82,7 +84,12 @@ class MonitoredTaskImpl implements MonitoredTask {
public long getStateTime() {
return stateTime;
}
-
+
+ @Override
+ public long getWarnTime() {
+ return warnTime;
+ }
+
@Override
public long getCompletionTimestamp() {
if (state == State.COMPLETE || state == State.ABORTED) {
@@ -132,6 +139,11 @@ class MonitoredTaskImpl implements MonitoredTask {
}
@Override
+ public void setWarnTime(long t) {
+ this.warnTime = t;
+ }
+
+ @Override
public void cleanup() {
if (state == State.RUNNING) {
setState(State.ABORTED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
index dc96179..780916f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
@@ -30,9 +30,12 @@ import java.util.List;
import org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Threads;
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
/**
@@ -44,16 +47,35 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
public class TaskMonitor {
private static final Log LOG = LogFactory.getLog(TaskMonitor.class);
- // Don't keep around any tasks that have completed more than
- // 60 seconds ago
- private static final long EXPIRATION_TIME = 60*1000;
+ public static final String MAX_TASKS_KEY = "hbase.taskmonitor.max.tasks";
+ public static final int DEFAULT_MAX_TASKS = 1000;
+ public static final String RPC_WARN_TIME_KEY = "hbase.taskmonitor.rpc.warn.time";
+ public static final long DEFAULT_RPC_WARN_TIME = 0;
+ public static final String EXPIRATION_TIME_KEY = "hbase.taskmonitor.expiration.time";
+ public static final long DEFAULT_EXPIRATION_TIME = 60*1000;
+ public static final String MONITOR_INTERVAL_KEY = "hbase.taskmonitor.monitor.interval";
+ public static final long DEFAULT_MONITOR_INTERVAL = 10*1000;
- @VisibleForTesting
- static final int MAX_TASKS = 1000;
-
private static TaskMonitor instance;
- private CircularFifoBuffer tasks = new CircularFifoBuffer(MAX_TASKS);
- private List<TaskAndWeakRefPair> rpcTasks = Lists.newArrayList();
+
+ private final int maxTasks;
+ private final long rpcWarnTime;
+ private final long expirationTime;
+ private final CircularFifoBuffer tasks;
+ private final List<TaskAndWeakRefPair> rpcTasks;
+ private final long monitorInterval;
+ private Thread monitorThread;
+
+ TaskMonitor(Configuration conf) {
+ maxTasks = conf.getInt(MAX_TASKS_KEY, DEFAULT_MAX_TASKS);
+ expirationTime = conf.getLong(EXPIRATION_TIME_KEY, DEFAULT_EXPIRATION_TIME);
+ rpcWarnTime = conf.getLong(RPC_WARN_TIME_KEY, DEFAULT_RPC_WARN_TIME);
+ tasks = new CircularFifoBuffer(maxTasks);
+ rpcTasks = Lists.newArrayList();
+ monitorInterval = conf.getLong(MONITOR_INTERVAL_KEY, DEFAULT_MONITOR_INTERVAL);
+ monitorThread = new Thread(new MonitorRunnable());
+ Threads.setDaemonThreadRunning(monitorThread, "Monitor thread for TaskMonitor");
+ }
/**
* Get singleton instance.
@@ -61,7 +83,7 @@ public class TaskMonitor {
*/
public static synchronized TaskMonitor get() {
if (instance == null) {
- instance = new TaskMonitor();
+ instance = new TaskMonitor(HBaseConfiguration.create());
}
return instance;
}
@@ -93,6 +115,22 @@ public class TaskMonitor {
return proxy;
}
+ private synchronized void warnStuckTasks() {
+ if (rpcWarnTime > 0) {
+ final long now = EnvironmentEdgeManager.currentTime();
+ for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
+ it.hasNext();) {
+ TaskAndWeakRefPair pair = it.next();
+ MonitoredTask stat = pair.get();
+ if ((stat.getState() == MonitoredTaskImpl.State.RUNNING) &&
+ (now >= stat.getWarnTime() + rpcWarnTime)) {
+ LOG.warn("Task may be stuck: " + stat);
+ stat.setWarnTime(now);
+ }
+ }
+ }
+ }
+
private synchronized void purgeExpiredTasks() {
for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
it.hasNext();) {
@@ -139,12 +177,11 @@ public class TaskMonitor {
private boolean canPurge(MonitoredTask stat) {
long cts = stat.getCompletionTimestamp();
- return (cts > 0 && System.currentTimeMillis() - cts > EXPIRATION_TIME);
+ return (cts > 0 && EnvironmentEdgeManager.currentTime() - cts > expirationTime);
}
-
public void dumpAsText(PrintWriter out) {
- long now = System.currentTimeMillis();
+ long now = EnvironmentEdgeManager.currentTime();
List<MonitoredTask> tasks = getTasks();
for (MonitoredTask task : tasks) {
@@ -164,6 +201,12 @@ public class TaskMonitor {
}
}
+ public synchronized void shutdown() {
+ if (this.monitorThread != null) {
+ monitorThread.interrupt();
+ }
+ }
+
/**
* This class encapsulates an object as well as a weak reference to a proxy
* that passes through calls to that object. In art form:
@@ -218,4 +261,23 @@ public class TaskMonitor {
return method.invoke(delegatee, args);
}
}
+
+ private class MonitorRunnable implements Runnable {
+ private boolean running = true;
+
+ @Override
+ public void run() {
+ while (running) {
+ try {
+ Thread.sleep(monitorInterval);
+ if (tasks.isFull()) {
+ purgeExpiredTasks();
+ }
+ warnStuckTasks();
+ } catch (InterruptedException e) {
+ running = false;
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d0941127/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
index 5464d9f..718339a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
@@ -22,8 +22,10 @@ import static org.junit.Assert.*;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -32,7 +34,7 @@ public class TestTaskMonitor {
@Test
public void testTaskMonitorBasics() {
- TaskMonitor tm = new TaskMonitor();
+ TaskMonitor tm = new TaskMonitor(new Configuration());
assertTrue("Task monitor should start empty",
tm.getTasks().isEmpty());
@@ -55,11 +57,13 @@ public class TestTaskMonitor {
// If we mark its completion time back a few minutes, it should get gced
task.expireNow();
assertEquals(0, tm.getTasks().size());
+
+ tm.shutdown();
}
@Test
public void testTasksGetAbortedOnLeak() throws InterruptedException {
- final TaskMonitor tm = new TaskMonitor();
+ final TaskMonitor tm = new TaskMonitor(new Configuration());
assertTrue("Task monitor should start empty",
tm.getTasks().isEmpty());
@@ -86,42 +90,58 @@ public class TestTaskMonitor {
// Now it should be aborted
MonitoredTask taskFromTm = tm.getTasks().get(0);
assertEquals(MonitoredTask.State.ABORTED, taskFromTm.getState());
+
+ tm.shutdown();
}
@Test
public void testTaskLimit() throws Exception {
- TaskMonitor tm = new TaskMonitor();
- for (int i = 0; i < TaskMonitor.MAX_TASKS + 10; i++) {
+ TaskMonitor tm = new TaskMonitor(new Configuration());
+ for (int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS + 10; i++) {
tm.createStatus("task " + i);
}
// Make sure it was limited correctly
- assertEquals(TaskMonitor.MAX_TASKS, tm.getTasks().size());
+ assertEquals(TaskMonitor.DEFAULT_MAX_TASKS, tm.getTasks().size());
// Make sure we culled the earlier tasks, not later
// (i.e. tasks 0 through 9 should have been deleted)
assertEquals("task 10", tm.getTasks().get(0).getDescription());
+ tm.shutdown();
}
@Test
public void testDoNotPurgeRPCTask() throws Exception {
int RPCTaskNums = 10;
+ TaskMonitor tm = TaskMonitor.get();
for(int i = 0; i < RPCTaskNums; i++) {
- TaskMonitor.get().createRPCStatus("PRCTask" + i);
+ tm.createRPCStatus("PRCTask" + i);
}
- for(int i = 0; i < TaskMonitor.MAX_TASKS; i++) {
- TaskMonitor.get().createStatus("otherTask" + i);
+ for(int i = 0; i < TaskMonitor.DEFAULT_MAX_TASKS; i++) {
+ tm.createStatus("otherTask" + i);
}
int remainRPCTask = 0;
- for(MonitoredTask task :TaskMonitor.get().getTasks()) {
+ for(MonitoredTask task: tm.getTasks()) {
if(task instanceof MonitoredRPCHandler) {
remainRPCTask++;
}
}
assertEquals("RPC Tasks have been purged!", RPCTaskNums, remainRPCTask);
-
+ tm.shutdown();
}
-
-
+ @Test
+ public void testWarnStuckTasks() throws Exception {
+ final int INTERVAL = 1000;
+ Configuration conf = new Configuration();
+ conf.setLong(TaskMonitor.RPC_WARN_TIME_KEY, INTERVAL);
+ conf.setLong(TaskMonitor.MONITOR_INTERVAL_KEY, INTERVAL);
+ final TaskMonitor tm = new TaskMonitor(conf);
+ MonitoredRPCHandler t = tm.createRPCStatus("test task");
+ long then = EnvironmentEdgeManager.currentTime();
+ t.setRPC("testMethod", new Object[0], then);
+ Thread.sleep(INTERVAL * 2);
+ assertTrue("We did not warn", t.getWarnTime() > then);
+ tm.shutdown();
+ }
}