You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/06/11 04:55:54 UTC
[08/50] hbase git commit: HBASE-15931 Add log for long-running tasks
in AsyncProcess
HBASE-15931 Add log for long-running tasks in AsyncProcess
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53eb27bb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53eb27bb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53eb27bb
Branch: refs/heads/hbase-12439
Commit: 53eb27bb60bc736f0c56c7a2facfb6e6ccb91be5
Parents: cbb95cd
Author: Yu Li <li...@apache.org>
Authored: Thu Jun 2 12:00:42 2016 +0800
Committer: Yu Li <li...@apache.org>
Committed: Thu Jun 2 12:00:42 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 53 ++++++++++++++++----
.../hbase/client/BufferedMutatorImpl.java | 3 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 5 +-
3 files changed, 49 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 6f7ba59..812e4bf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -120,6 +120,12 @@ class AsyncProcess {
*/
public static final String LOG_DETAILS_FOR_BATCH_ERROR = "hbase.client.log.batcherrors.details";
+ private final int thresholdToLogUndoneTaskDetails;
+ private static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS =
+ "hbase.client.threshold.log.details";
+ private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10;
+ private final int THRESHOLD_TO_LOG_REGION_DETAILS = 2;
+
/**
* The context used to wait for results from one submit call.
* 1) If AsyncProcess is set to track errors globally, and not per call (for HTable puts),
@@ -332,6 +338,10 @@ class AsyncProcess {
this.rpcCallerFactory = rpcCaller;
this.rpcFactory = rpcFactory;
this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
+
+ this.thresholdToLogUndoneTaskDetails =
+ conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+ DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
}
/**
@@ -389,7 +399,7 @@ class AsyncProcess {
List<Integer> locationErrorRows = null;
do {
// Wait until there is at least one slot for a new task.
- waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1);
+ waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, tableName.getNameAsString());
// Remember the previous decisions about regions or region servers we put in the
// final multi.
@@ -1765,18 +1775,19 @@ class AsyncProcess {
@VisibleForTesting
/** Waits until all outstanding tasks are done. Used in tests. */
void waitUntilDone() throws InterruptedIOException {
- waitForMaximumCurrentTasks(0);
+ waitForMaximumCurrentTasks(0, null);
}
/** Wait until the async does not have more than max tasks in progress. */
- private void waitForMaximumCurrentTasks(int max) throws InterruptedIOException {
- waitForMaximumCurrentTasks(max, tasksInProgress, id);
+ private void waitForMaximumCurrentTasks(int max, String tableName)
+ throws InterruptedIOException {
+ waitForMaximumCurrentTasks(max, tasksInProgress, id, tableName);
}
// Break out this method so testable
@VisibleForTesting
- static void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id)
- throws InterruptedIOException {
+ void waitForMaximumCurrentTasks(int max, final AtomicLong tasksInProgress, final long id,
+ String tableName) throws InterruptedIOException {
long lastLog = EnvironmentEdgeManager.currentTime();
long currentInProgress, oldInProgress = Long.MAX_VALUE;
while ((currentInProgress = tasksInProgress.get()) > max) {
@@ -1785,7 +1796,11 @@ class AsyncProcess {
if (now > lastLog + 10000) {
lastLog = now;
LOG.info("#" + id + ", waiting for some tasks to finish. Expected max="
- + max + ", tasksInProgress=" + currentInProgress);
+ + max + ", tasksInProgress=" + currentInProgress +
+ " hasError=" + hasError() + tableName == null ? "" : ", tableName=" + tableName);
+ if (currentInProgress <= thresholdToLogUndoneTaskDetails) {
+ logDetailsOfUndoneTasks(currentInProgress);
+ }
}
}
oldInProgress = currentInProgress;
@@ -1802,6 +1817,25 @@ class AsyncProcess {
}
}
+ private void logDetailsOfUndoneTasks(long taskInProgress) {
+ ArrayList<ServerName> servers = new ArrayList<ServerName>();
+ for (Map.Entry<ServerName, AtomicInteger> entry : taskCounterPerServer.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ servers.add(entry.getKey());
+ }
+ }
+ LOG.info("Left over " + taskInProgress + " task(s) are processed on server(s): " + servers);
+ if (taskInProgress <= THRESHOLD_TO_LOG_REGION_DETAILS) {
+ ArrayList<String> regions = new ArrayList<String>();
+ for (Map.Entry<byte[], AtomicInteger> entry : taskCounterPerRegion.entrySet()) {
+ if (entry.getValue().get() > 0) {
+ regions.add(Bytes.toString(entry.getKey()));
+ }
+ }
+ LOG.info("Regions against which left over task(s) are processed: " + regions);
+ }
+ }
+
/**
* Only used w/useGlobalErrors ctor argument, for HTable backward compat.
* @return Whether there were any errors in any request since the last time
@@ -1817,12 +1851,13 @@ class AsyncProcess {
* failed operations themselves.
* @param failedRows an optional list into which the rows that failed since the last time
* {@link #waitForAllPreviousOpsAndReset(List)} was called, or AP was created, are saved.
+ * @param tableName name of the table
* @return all the errors since the last time {@link #waitForAllPreviousOpsAndReset(List)}
* was called, or AP was created.
*/
public RetriesExhaustedWithDetailsException waitForAllPreviousOpsAndReset(
- List<Row> failedRows) throws InterruptedIOException {
- waitForMaximumCurrentTasks(0);
+ List<Row> failedRows, String tableName) throws InterruptedIOException {
+ waitForMaximumCurrentTasks(0, tableName);
if (!globalErrors.hasErrors()) {
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 2a7effe..e98ad4e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -238,7 +238,8 @@ public class BufferedMutatorImpl implements BufferedMutator {
while (!buffer.isEmpty()) {
ap.submit(tableName, buffer, true, null, false);
}
- RetriesExhaustedWithDetailsException error = ap.waitForAllPreviousOpsAndReset(null);
+ RetriesExhaustedWithDetailsException error =
+ ap.waitForAllPreviousOpsAndReset(null, tableName.getNameAsString());
if (error != null) {
if (listener == null) {
throw error;
http://git-wip-us.apache.org/repos/asf/hbase/blob/53eb27bb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 839a33a..d943316 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -1135,16 +1135,17 @@ public class TestAsyncProcess {
}
@Test
- public void testWaitForMaximumCurrentTasks() throws InterruptedException, BrokenBarrierException {
+ public void testWaitForMaximumCurrentTasks() throws Exception {
final AtomicLong tasks = new AtomicLong(0);
final AtomicInteger max = new AtomicInteger(0);
final CyclicBarrier barrier = new CyclicBarrier(2);
+ final AsyncProcess ap = new MyAsyncProcess(createHConnection(), conf);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
barrier.await();
- AsyncProcess.waitForMaximumCurrentTasks(max.get(), tasks, 1);
+ ap.waitForMaximumCurrentTasks(max.get(), tasks, 1, null);
} catch (InterruptedIOException e) {
Assert.fail(e.getMessage());
} catch (InterruptedException e) {