You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/08/17 19:15:29 UTC
[31/50] [abbrv] hbase git commit: HBASE-17064 Add
TaskMonitor#getTasks() variant which accepts type selection
HBASE-17064 Add TaskMonitor#getTasks() variant which accepts type selection
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/effd1093
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/effd1093
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/effd1093
Branch: refs/heads/HBASE-14070.HLC
Commit: effd1093b559aeba2bf66a4cf81cd4a0013de184
Parents: d37266f
Author: Reid Chan <re...@outlook.com>
Authored: Tue Aug 15 15:50:22 2017 +0800
Committer: tedyu <yu...@gmail.com>
Committed: Tue Aug 15 09:45:19 2017 -0700
----------------------------------------------------------------------
.../hbase/tmpl/common/TaskMonitorTmpl.jamon | 21 +----
.../hadoop/hbase/monitoring/TaskMonitor.java | 97 +++++++++++++++++---
.../hbase/monitoring/TestTaskMonitor.java | 48 ++++++++++
3 files changed, 133 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
index b4a5fea..986bc3a 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
@@ -27,27 +27,8 @@ String filter = "general";
String format = "html";
</%args>
<%java>
-List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
-Iterator<? extends MonitoredTask> iter = tasks.iterator();
// apply requested filter
-while (iter.hasNext()) {
- MonitoredTask t = iter.next();
- if (filter.equals("general")) {
- if (t instanceof MonitoredRPCHandler)
- iter.remove();
- } else if (filter.equals("handler")) {
- if (!(t instanceof MonitoredRPCHandler))
- iter.remove();
- } else if (filter.equals("rpc")) {
- if (!(t instanceof MonitoredRPCHandler) ||
- !((MonitoredRPCHandler) t).isRPCRunning())
- iter.remove();
- } else if (filter.equals("operation")) {
- if (!(t instanceof MonitoredRPCHandler) ||
- !((MonitoredRPCHandler) t).isOperationRunning())
- iter.remove();
- }
-}
+List<? extends MonitoredTask> tasks = taskMonitor.getTasks(filter);
long now = System.currentTimeMillis();
Collections.reverse(tasks);
boolean first = true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
index 780916f..ad9bd02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
@@ -157,22 +157,52 @@ public class TaskMonitor {
* MonitoredTasks handled by this TaskMonitor.
* @return A complete list of MonitoredTasks.
*/
- public synchronized List<MonitoredTask> getTasks() {
+ public List<MonitoredTask> getTasks() {
+ return getTasks(null);
+ }
+
+ /**
+ * Produces a list containing copies of the current state of all non-expired
+ * MonitoredTasks handled by this TaskMonitor.
+ * @param filter type of wanted tasks
+ * @return A filtered list of MonitoredTasks.
+ */
+ public synchronized List<MonitoredTask> getTasks(String filter) {
purgeExpiredTasks();
- ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
- for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
- it.hasNext();) {
- TaskAndWeakRefPair pair = it.next();
- MonitoredTask t = pair.get();
- ret.add(t.clone());
+ TaskFilter taskFilter = createTaskFilter(filter);
+ ArrayList<MonitoredTask> results =
+ Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
+ processTasks(tasks, taskFilter, results);
+ processTasks(rpcTasks, taskFilter, results);
+ return results;
+ }
+
+ /**
+ * Create a task filter according to a given filter type.
+ * @param filter type of monitored task
+ * @return a task filter
+ */
+ private static TaskFilter createTaskFilter(String filter) {
+ switch (TaskFilter.TaskType.getTaskType(filter)) {
+ case GENERAL: return task -> task instanceof MonitoredRPCHandler;
+ case HANDLER: return task -> !(task instanceof MonitoredRPCHandler);
+ case RPC: return task -> !(task instanceof MonitoredRPCHandler) ||
+ !((MonitoredRPCHandler) task).isRPCRunning();
+ case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) ||
+ !((MonitoredRPCHandler) task).isOperationRunning();
+ default: return task -> false;
}
- for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
- it.hasNext();) {
- TaskAndWeakRefPair pair = it.next();
- MonitoredTask t = pair.get();
- ret.add(t.clone());
+ }
+
+ private static void processTasks(Iterable<TaskAndWeakRefPair> tasks,
+ TaskFilter filter,
+ List<MonitoredTask> results) {
+ for (TaskAndWeakRefPair task : tasks) {
+ MonitoredTask t = task.get();
+ if (!filter.filter(t)) {
+ results.add(t.clone());
+ }
}
- return ret;
}
private boolean canPurge(MonitoredTask stat) {
@@ -280,4 +310,45 @@ public class TaskMonitor {
}
}
}
+
+ private interface TaskFilter {
+ enum TaskType {
+ GENERAL("general"),
+ HANDLER("handler"),
+ RPC("rpc"),
+ OPERATION("operation"),
+ ALL("all");
+
+ private String type;
+
+ private TaskType(String type) {
+ this.type = type.toLowerCase();
+ }
+
+ static TaskType getTaskType(String type) {
+ if (type == null || type.isEmpty()) {
+ return ALL;
+ }
+ type = type.toLowerCase();
+ for (TaskType taskType : values()) {
+ if (taskType.toString().equals(type)) {
+ return taskType;
+ }
+ }
+ return ALL;
+ }
+
+ @Override
+ public String toString() {
+ return type;
+ }
+ }
+
+ /**
+ * Filter out unwanted task.
+ * @param task monitored task
+ * @return false if a task is accepted, true if it is filtered
+ */
+ boolean filter(MonitoredTask t);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
index 718339a..7abcde8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
@@ -20,9 +20,15 @@ package org.apache.hadoop.hbase.monitoring;
import static org.junit.Assert.*;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Query;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -143,5 +149,47 @@ public class TestTaskMonitor {
tm.shutdown();
}
+ @Test
+ public void testGetTasksWithFilter() throws Exception {
+ TaskMonitor tm = new TaskMonitor(new Configuration());
+ assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
+ // Create 5 general tasks
+ tm.createStatus("General task1");
+ tm.createStatus("General task2");
+ tm.createStatus("General task3");
+ tm.createStatus("General task4");
+ tm.createStatus("General task5");
+ // Create 5 rpc tasks, and mark 1 completed
+ int length = 5;
+ ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
+ for (int i = 0; i < length; i++) {
+ MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
+ rpcHandlers.add(rpcHandler);
+ }
+ // Create rpc opertions
+ byte[] row = new byte[] { 0x01 };
+ Mutation m = new Put(row);
+ Query q = new Scan();
+ String notOperation = "for test";
+ rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000);
+ rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000);
+ rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000);
+ rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000);
+ rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000);
+ MonitoredRPCHandler completed = rpcHandlers.get(4);
+ completed.markComplete("Completed!");
+ // Test get tasks with filter
+ List<MonitoredTask> generalTasks = tm.getTasks("general");
+ assertEquals(5, generalTasks.size());
+ List<MonitoredTask> handlerTasks = tm.getTasks("handler");
+ assertEquals(5, handlerTasks.size());
+ List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
+ // The last rpc handler is stopped
+ assertEquals(4, rpcTasks.size());
+ List<MonitoredTask> operationTasks = tm.getTasks("operation");
+ // Handler 3 doesn't handle Operation.
+ assertEquals(3, operationTasks.size());
+ tm.shutdown();
+ }
}