You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2017/08/17 19:15:29 UTC

[31/50] [abbrv] hbase git commit: HBASE-17064 Add TaskMonitor#getTasks() variant which accepts type selection

HBASE-17064 Add TaskMonitor#getTasks() variant which accepts type selection

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/effd1093
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/effd1093
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/effd1093

Branch: refs/heads/HBASE-14070.HLC
Commit: effd1093b559aeba2bf66a4cf81cd4a0013de184
Parents: d37266f
Author: Reid Chan <re...@outlook.com>
Authored: Tue Aug 15 15:50:22 2017 +0800
Committer: tedyu <yu...@gmail.com>
Committed: Tue Aug 15 09:45:19 2017 -0700

----------------------------------------------------------------------
 .../hbase/tmpl/common/TaskMonitorTmpl.jamon     | 21 +----
 .../hadoop/hbase/monitoring/TaskMonitor.java    | 97 +++++++++++++++++---
 .../hbase/monitoring/TestTaskMonitor.java       | 48 ++++++++++
 3 files changed, 133 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
index b4a5fea..986bc3a 100644
--- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
+++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/common/TaskMonitorTmpl.jamon
@@ -27,27 +27,8 @@ String filter = "general";
 String format = "html";
 </%args>
 <%java>
-List<? extends MonitoredTask> tasks = taskMonitor.getTasks();
-Iterator<? extends MonitoredTask> iter = tasks.iterator();
 // apply requested filter
-while (iter.hasNext()) {
-  MonitoredTask t = iter.next();
-  if (filter.equals("general")) {
-    if (t instanceof MonitoredRPCHandler)
-      iter.remove();
-  } else if (filter.equals("handler")) {
-    if (!(t instanceof MonitoredRPCHandler))
-      iter.remove();
-  } else if (filter.equals("rpc")) {
-    if (!(t instanceof MonitoredRPCHandler) || 
-        !((MonitoredRPCHandler) t).isRPCRunning())
-      iter.remove();
-  } else if (filter.equals("operation")) {
-    if (!(t instanceof MonitoredRPCHandler) || 
-        !((MonitoredRPCHandler) t).isOperationRunning())
-      iter.remove();
-  }
-}
+List<? extends MonitoredTask> tasks = taskMonitor.getTasks(filter);
 long now = System.currentTimeMillis();
 Collections.reverse(tasks);
 boolean first = true;

http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
index 780916f..ad9bd02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/monitoring/TaskMonitor.java
@@ -157,22 +157,52 @@ public class TaskMonitor {
    * MonitoredTasks handled by this TaskMonitor.
    * @return A complete list of MonitoredTasks.
    */
-  public synchronized List<MonitoredTask> getTasks() {
+  public List<MonitoredTask> getTasks() {
+    return getTasks(null);
+  }
+
+  /**
+   * Produces a list containing copies of the current state of all non-expired 
+   * MonitoredTasks handled by this TaskMonitor.
+   * @param filter type of wanted tasks
+   * @return A filtered list of MonitoredTasks.
+   */
+  public synchronized List<MonitoredTask> getTasks(String filter) {
     purgeExpiredTasks();
-    ArrayList<MonitoredTask> ret = Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
-    for (Iterator<TaskAndWeakRefPair> it = tasks.iterator();
-         it.hasNext();) {
-      TaskAndWeakRefPair pair = it.next();
-      MonitoredTask t = pair.get();
-      ret.add(t.clone());
+    TaskFilter taskFilter = createTaskFilter(filter);
+    ArrayList<MonitoredTask> results =
+        Lists.newArrayListWithCapacity(tasks.size() + rpcTasks.size());
+    processTasks(tasks, taskFilter, results);
+    processTasks(rpcTasks, taskFilter, results);
+    return results;
+  }
+
+  /**
+   * Create a task filter according to a given filter type.
+   * @param filter type of monitored task
+   * @return a task filter
+   */
+  private static TaskFilter createTaskFilter(String filter) {
+    switch (TaskFilter.TaskType.getTaskType(filter)) {
+      case GENERAL: return task -> task instanceof MonitoredRPCHandler;
+      case HANDLER: return task -> !(task instanceof MonitoredRPCHandler);
+      case RPC: return task -> !(task instanceof MonitoredRPCHandler) ||
+                               !((MonitoredRPCHandler) task).isRPCRunning();
+      case OPERATION: return task -> !(task instanceof MonitoredRPCHandler) ||
+                                     !((MonitoredRPCHandler) task).isOperationRunning();
+      default: return task -> false;
     }
-    for (Iterator<TaskAndWeakRefPair> it = rpcTasks.iterator();
-         it.hasNext();) {
-      TaskAndWeakRefPair pair = it.next();
-      MonitoredTask t = pair.get();
-      ret.add(t.clone());
+  }
+
+  private static void processTasks(Iterable<TaskAndWeakRefPair> tasks,
+                                   TaskFilter filter,
+                                   List<MonitoredTask> results) {
+    for (TaskAndWeakRefPair task : tasks) {
+      MonitoredTask t = task.get();
+      if (!filter.filter(t)) {
+        results.add(t.clone());
+      }
     }
-    return ret;
   }
 
   private boolean canPurge(MonitoredTask stat) {
@@ -280,4 +310,45 @@ public class TaskMonitor {
       }
     }
   }
+
+  private interface TaskFilter {
+    enum TaskType {
+      GENERAL("general"),
+      HANDLER("handler"),
+      RPC("rpc"),
+      OPERATION("operation"),
+      ALL("all");
+
+      private String type;
+
+      private TaskType(String type) {
+        this.type = type.toLowerCase();
+      }
+
+      static TaskType getTaskType(String type) {
+        if (type == null || type.isEmpty()) {
+          return ALL;
+        }
+        type = type.toLowerCase();
+        for (TaskType taskType : values()) {
+          if (taskType.toString().equals(type)) {
+            return taskType;
+          }
+        }
+        return ALL;
+      }
+
+      @Override
+      public String toString() {
+        return type;
+      }
+    }
+
+    /**
+     * Filter out unwanted task.
+     * @param task monitored task
+     * @return false if a task is accepted, true if it is filtered
+     */
+    boolean filter(MonitoredTask t);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/effd1093/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
index 718339a..7abcde8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/monitoring/TestTaskMonitor.java
@@ -20,9 +20,15 @@ package org.apache.hadoop.hbase.monitoring;
 
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Query;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -143,5 +149,47 @@ public class TestTaskMonitor {
     tm.shutdown();
   }
 
+  @Test
+  public void testGetTasksWithFilter() throws Exception {
+    TaskMonitor tm = new TaskMonitor(new Configuration());
+    assertTrue("Task monitor should start empty", tm.getTasks().isEmpty());
+    // Create 5 general tasks
+    tm.createStatus("General task1");
+    tm.createStatus("General task2");
+    tm.createStatus("General task3");
+    tm.createStatus("General task4");
+    tm.createStatus("General task5");
+    // Create 5 rpc tasks, and mark 1 completed
+    int length = 5;
+    ArrayList<MonitoredRPCHandler> rpcHandlers = new ArrayList<>(length);
+    for (int i = 0; i < length; i++) {
+      MonitoredRPCHandler rpcHandler = tm.createRPCStatus("Rpc task" + i);
+      rpcHandlers.add(rpcHandler);
+    }
+    // Create rpc opertions
+    byte[] row = new byte[] { 0x01 };
+    Mutation m = new Put(row);
+    Query q = new Scan();
+    String notOperation = "for test";
+    rpcHandlers.get(0).setRPC("operations", new Object[]{ m, q }, 3000);
+    rpcHandlers.get(1).setRPC("operations", new Object[]{ m, q }, 3000);
+    rpcHandlers.get(2).setRPC("operations", new Object[]{ m, q }, 3000);
+    rpcHandlers.get(3).setRPC("operations", new Object[]{ notOperation }, 3000);
+    rpcHandlers.get(4).setRPC("operations", new Object[]{ m, q }, 3000);
+    MonitoredRPCHandler completed = rpcHandlers.get(4);
+    completed.markComplete("Completed!");
+    // Test get tasks with filter
+    List<MonitoredTask> generalTasks = tm.getTasks("general");
+    assertEquals(5, generalTasks.size());
+    List<MonitoredTask> handlerTasks = tm.getTasks("handler");
+    assertEquals(5, handlerTasks.size());
+    List<MonitoredTask> rpcTasks = tm.getTasks("rpc");
+    // The last rpc handler is stopped
+    assertEquals(4, rpcTasks.size());
+    List<MonitoredTask> operationTasks = tm.getTasks("operation");
+    // Handler 3 doesn't handle Operation.
+    assertEquals(3, operationTasks.size());
+    tm.shutdown();
+  }
 }