You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2016/07/29 03:54:06 UTC

hbase git commit: HBASE-15931 Add log for long-running tasks in AsyncProcess

Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 909f06147 -> 6f91b1200


HBASE-15931 Add log for long-running tasks in AsyncProcess


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6f91b120
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6f91b120
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6f91b120

Branch: refs/heads/branch-1.3
Commit: 6f91b1200403f729eac64de8bcb334d7fc46ddf8
Parents: 909f061
Author: Yu Li <li...@apache.org>
Authored: Thu Jun 2 12:00:42 2016 +0800
Committer: Yu Li <li...@apache.org>
Committed: Fri Jul 29 11:39:09 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AsyncProcess.java       | 58 +++++++++++++++++---
 .../hbase/client/BufferedMutatorImpl.java       |  3 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   |  5 +-
 3 files changed, 54 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6f91b120/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 3513387..c3ce708 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -115,6 +115,17 @@ class AsyncProcess {
   public static final int DEFAULT_START_LOG_ERRORS_AFTER_COUNT = 9;
 
   /**
+   * Configuration to decide whether to log details for batch error
+   */
+  public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
+
+  private final int thresholdToLogUndoneTaskDetails;
+  private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
+      "hbase.client.threshold.log.details";
+  private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
+  private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
+
+  /**
    * The context used to wait for results from one submit call.
    * 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
    *    then errors and failed operations in this object will reflect global errors.
@@ -319,6 +330,10 @@ class AsyncProcess {
 
     this.rpcCallerFactory = rpcCaller;
     this.rpcFactory = rpcFactory;
+
+    this.thresholdToLogUndoneTaskDetails =
+        conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
   }
 
   /**
@@ -376,7 +391,7 @@ class AsyncProcess {
     List<Integer> locationErrorRows = null;
     do {
       // Wait until there is at least one slot for a new task.
-      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
+      waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
 
       // Remember the previous decisions about regions or region servers we put in the
       //  final multi.
@@ -1754,18 +1769,19 @@ class AsyncProcess {
   @VisibleForTesting
   /** Waits until all outstanding tasks are done. Used in tests. */
   void waitUntilDone() throws InterruptedIOException {
-    waitForMaximumCurrentTasks(0);
+    waitForMaximumCurrentTasks(0, null);
   }
 
   /** Wait until the async does not have more than max tasks in progress. */
-  private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
-    waitForMaximumCurrentTasks(max, tasksInProgress, id);
+  private void waitForMaximumCurrentTasks(int max, String tableName)
+      throws InterruptedIOException {
+    waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
   }
 
   // Break out this method so testable
   @VisibleForTesting
-  static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id)
-  throws InterruptedIOException {
+  void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
+      String tableName) throws InterruptedIOException {
     long lastLog = EnvironmentEdgeManager.currentTime();
     long currentInProgress, oldInProgress = Long.MAX_VALUE;
     while ((currentInProgress = tasksInProgress.get()) > max) {
@@ -1774,7 +1790,11 @@ class AsyncProcess {
         if (now > lastLog + 10000) {
           lastLog = now;
           LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
-              + max + ", tasksInProgress=" + currentInProgress);
+              + max + ", tasksInProgress=" + currentInProgress +
+              " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
+          if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
+            logDetailsOfUndoneTasks(currentInProgress);
+          }
         }
       }
       oldInProgress = currentInProgress;
@@ -1791,6 +1811,25 @@ class AsyncProcess {
     }
   }
 
+  private void logDetailsOfUndoneTasks(long taskInProgress) {
+    ArrayList<ServerName> servers = new ArrayList<ServerName>();
+    for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
+      if (entry.getValue().get() > 0) {
+        servers.add(entry.getKey());
+      }
+    }
+    LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
+    if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
+      ArrayList<String> regions = new ArrayList<String>();
+      for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
+        if (entry.getValue().get() > 0) {
+          regions.add(Bytes.toString(entry.getKey()));
+        }
+      }
+      LOG.info("Regions against which left over task(s) are processed: " + regions);
+    }
+  }
+
   /**
    * Only used w/useGlobalErrors ctor argument, for HTable backward compat.
    * @return Whether there were any errors in any request since the last time
@@ -1806,12 +1845,13 @@ class AsyncProcess {
    * failed operations themselves.
    * @param failedRows an optional list into which the rows that failed since the last time
    *        {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
+   * @param tableName name of the table
    * @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
    *          was called, or AP was created.
    */
   public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
-      List<Row> failedRows) throws InterruptedIOException {
-    waitForMaximumCurrentTasks(0);
+      List<Row> failedRows, String tableName) throws InterruptedIOException {
+    waitForMaximumCurrentTasks(0, tableName);
     if (!globalErrors.hasErrors()) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f91b120/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 6220cd6..273f2e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -237,7 +237,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
         while (!buffer.isEmpty()) {
           ap.submit(tableName, buffer, true, null, false);
         }
-        RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
+        RetriesExhaustedWithDetailsException error =
+            ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
         if (error != null) {
           if (listener == null) {
             throw error;

http://git-wip-us.apache.org/repos/asf/hbase/blob/6f91b120/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 9809c28..3a0c08d 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -1139,16 +1139,17 @@ public class TestAsyncProcess {
   }
 
   @Test
-  public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException {
+  public void testWaitForMaximumCurrentTasks() throws Exception {
     final AtomicLong tasks = new AtomicLong(0);
     final AtomicInteger max = new AtomicInteger(0);
     final CyclicBarrier barrier = new CyclicBarrier(2);
+    final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
     Runnable runnable = new Runnable() {
       @Override
       public void run() {
         try {
           barrier.await();
-          AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
+          ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
         } catch (InterruptedIOException e) {
           Assert.fail(e.getMessage());
         } catch (InterruptedException e) {