You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bi...@apache.org on 2017/12/04 10:36:55 UTC
hbase git commit: HBASE-19290 Reduce zk request when doing split log
Repository: hbase
Updated Branches:
refs/heads/branch-1 4ec59d018 -> 15d16281a
HBASE-19290 Reduce zk request when doing split log
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15d16281
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15d16281
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15d16281
Branch: refs/heads/branch-1
Commit: 15d16281a1c3d246dc991064c7828b77baaaaf6f
Parents: 4ec59d0
Author: binlijin <bi...@gmail.com>
Authored: Mon Dec 4 18:36:11 2017 +0800
Committer: binlijin <bi...@gmail.com>
Committed: Mon Dec 4 18:36:11 2017 +0800
----------------------------------------------------------------------
.../ZkSplitLogWorkerCoordination.java | 79 +++++++++++++-------
1 file changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/15d16281/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
index 7e6708e..f22a62a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZkSplitLogWorkerCoordination.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
@@ -206,27 +207,28 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
* try to grab a 'lock' on the task zk node to own and execute the task.
* <p>
* @param path zk node for the task
+ * @return boolean value when grab a task success return true otherwise false
*/
- private void grabTask(String path) {
+ private boolean grabTask(String path) {
Stat stat = new Stat();
byte[] data;
synchronized (grabTaskLock) {
currentTask = path;
workerInGrabTask = true;
if (Thread.interrupted()) {
- return;
+ return false;
}
}
try {
try {
if ((data = ZKUtil.getDataNoWatch(watcher, path, stat)) == null) {
SplitLogCounters.tot_wkr_failed_to_grab_task_no_data.incrementAndGet();
- return;
+ return false;
}
} catch (KeeperException e) {
LOG.warn("Failed to get data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
- return;
+ return false;
}
SplitLogTask slt;
try {
@@ -234,11 +236,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
} catch (DeserializationException e) {
LOG.warn("Failed parse data for znode " + path, e);
SplitLogCounters.tot_wkr_failed_to_grab_task_exception.incrementAndGet();
- return;
+ return false;
}
if (!slt.isUnassigned()) {
SplitLogCounters.tot_wkr_failed_to_grab_task_owned.incrementAndGet();
- return;
+ return false;
}
currentVersion =
@@ -246,7 +248,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
slt.getMode(), stat.getVersion());
if (currentVersion < 0) {
SplitLogCounters.tot_wkr_failed_to_grab_task_lost_race.incrementAndGet();
- return;
+ return false;
}
if (ZKSplitLog.isRescanNode(watcher, currentTask)) {
@@ -257,7 +259,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
endTask(new SplitLogTask.Done(server.getServerName(), slt.getMode()),
SplitLogCounters.tot_wkr_task_acquired_rescan, splitTaskDetails);
- return;
+ return false;
}
LOG.info("worker " + server.getServerName() + " acquired task " + path);
@@ -274,6 +276,7 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
LOG.warn("Interrupted while yielding for other region servers", e);
Thread.currentThread().interrupt();
}
+ return true;
} finally {
synchronized (grabTaskLock) {
workerInGrabTask = false;
@@ -325,12 +328,13 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
}
/**
- * This function calculates how many splitters it could create based on expected average tasks per
- * RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
+ * This function calculates how many splitters this RS should create based on expected average
+ * tasks per RS and the hard limit upper bound(maxConcurrentTasks) set by configuration. <br>
* At any given time, a RS allows spawn MIN(Expected Tasks/RS, Hard Upper Bound)
- * @param numTasks current total number of available tasks
+ * @param numTasks total number of split tasks available
+ * @return number of tasks this RS can grab
*/
- private int calculateAvailableSplitters(int numTasks) {
+ private int getNumExpectedTasksPerRS(int numTasks) {
// at lease one RS(itself) available
int availableRSs = 1;
try {
@@ -341,12 +345,17 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
// do nothing
LOG.debug("getAvailableRegionServers got ZooKeeper exception", e);
}
-
int expectedTasksPerRS = (numTasks / availableRSs) + ((numTasks % availableRSs == 0) ? 0 : 1);
- expectedTasksPerRS = Math.max(1, expectedTasksPerRS); // at least be one
- // calculate how many more splitters we could spawn
- return Math.min(expectedTasksPerRS, maxConcurrentTasks)
- - this.tasksInProgress.get();
+ return Math.max(1, expectedTasksPerRS); // at least be one
+ }
+
+ /**
+ * @param expectedTasksPerRS Average number of tasks to be handled by each RS
+ * @return true if more splitters are available, otherwise false.
+ */
+ private boolean areSplittersAvailable(int expectedTasksPerRS) {
+ return (Math.min(expectedTasksPerRS, maxConcurrentTasks)
+ - this.tasksInProgress.get()) > 0;
}
/**
@@ -415,8 +424,11 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
+ " ... worker thread exiting.");
return;
}
+ // shuffle the paths to prevent different split log worker start from the same log file after
+ // meta log (if any)
+ Collections.shuffle(paths);
// pick meta wal firstly
- int offset = (int) (Math.random() * paths.size());
+ int offset = 0;
for (int i = 0; i < paths.size(); i++) {
if (DefaultWALProvider.isMetaFile(paths.get(i))) {
offset = i;
@@ -424,21 +436,34 @@ public class ZkSplitLogWorkerCoordination extends ZooKeeperListener implements
}
}
int numTasks = paths.size();
+ int expectedTasksPerRS = getNumExpectedTasksPerRS(numTasks);
+ boolean taskGrabbed = false;
for (int i = 0; i < numTasks; i++) {
- int idx = (i + offset) % paths.size();
- // don't call ZKSplitLog.getNodeName() because that will lead to
- // double encoding of the path name
- if (this.calculateAvailableSplitters(numTasks) > 0) {
- grabTask(ZKUtil.joinZNode(watcher.splitLogZNode, paths.get(idx)));
- } else {
- LOG.debug("Current region server " + server.getServerName() + " has "
- + this.tasksInProgress.get() + " tasks in progress and can't take more.");
- break;
+ while (!shouldStop) {
+ if (this.areSplittersAvailable(expectedTasksPerRS)) {
+ LOG.debug("Current region server " + server.getServerName()
+ + " is ready to take more tasks, will get task list and try grab tasks again.");
+ int idx = (i + offset) % paths.size();
+ // don't call ZKSplitLog.getNodeName() because that will lead to
+ // double encoding of the path name
+ taskGrabbed |= grabTask(ZKUtil.joinZNode(watcher.splitLogZNode,
+ paths.get(idx)));
+ break;
+ } else {
+ LOG.debug("Current region server " + server.getServerName()
+ + " has " + this.tasksInProgress.get()
+ + " tasks in progress and can't take more.");
+ Thread.sleep(100);
+ }
}
if (shouldStop) {
return;
}
}
+ if (!taskGrabbed && !shouldStop) {
+ // do not grab any tasks, sleep a little bit to reduce zk request.
+ Thread.sleep(1000);
+ }
SplitLogCounters.tot_wkr_task_grabing.incrementAndGet();
synchronized (taskReadyLock) {
while (seq_start == taskReadySeq.get()) {