You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/12 01:16:50 UTC
[2/3] hbase git commit: HBASE-13606 AssignmentManager.assign() is not
sync in both path
HBASE-13606 AssignmentManager.assign() is not sync in both path
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30ecf990
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30ecf990
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30ecf990
Branch: refs/heads/master
Commit: 30ecf990fe2a343e418eedcffd1d8d5c94ab1fd3
Parents: c3f83a9
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon May 11 23:42:11 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue May 12 00:13:29 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/master/AssignmentManager.java | 89 +++++++++++++++++---
.../hbase/master/GeneralBulkAssigner.java | 29 +------
2 files changed, 80 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/30ecf990/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 4a1e71f..eae9999 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -156,6 +156,7 @@ public class AssignmentManager {
// bulk assigning may be not as efficient.
private final int bulkAssignThresholdRegions;
private final int bulkAssignThresholdServers;
+ private final int bulkPerRegionOpenTimeGuesstimate;
// Should bulk assignment wait till all regions are assigned,
// or it is timed out? This is useful to measure bulk assignment
@@ -194,7 +195,7 @@ public class AssignmentManager {
/** Listeners that are called on assignment events. */
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
-
+
private RegionStateListener regionStateListener;
/**
@@ -244,6 +245,8 @@ public class AssignmentManager {
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+ this.bulkPerRegionOpenTimeGuesstimate =
+ conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
this.metricsAssignmentManager = new MetricsAssignmentManager();
this.tableLockManager = tableLockManager;
@@ -831,6 +834,18 @@ public class AssignmentManager {
}
}
}
+
+ // wait for assignment completion
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
+ for (HRegionInfo region: regions) {
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
+ }
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
LOG.debug("Bulk assigning done for " + destination);
return true;
} finally {
@@ -1349,22 +1364,62 @@ public class AssignmentManager {
* If the region is already assigned, returns immediately. Otherwise, method
* blocks until the region is assigned.
* @param regionInfo region to wait on assignment for
+ * @return true if the region is assigned false otherwise.
* @throws InterruptedException
*/
public boolean waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
- while (!regionStates.isRegionOnline(regionInfo)) {
- if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
- || this.server.isStopped()) {
- return false;
- }
+ ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
+ regionSet.add(regionInfo);
+ return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+ }
- // We should receive a notification, but it's
- // better to have a timeout to recheck the condition here:
- // it lowers the impact of a race condition if any
- regionStates.waitForUpdate(100);
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final int reassigningRegions,
+ final long minEndTime) throws InterruptedException {
+ long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+ return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+ }
+
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
+ * @param waitTillAllAssigned true if we should wait all the regions to be assigned
+ * @param deadline the timestamp after which the wait is aborted
+ * @return true if all the regions are assigned false otherwise.
+ * @throws InterruptedException
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
+ // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+ while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
+ int failedOpenCount = 0;
+ Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+ while (regionInfoIterator.hasNext()) {
+ HRegionInfo hri = regionInfoIterator.next();
+ if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+ State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+ regionInfoIterator.remove();
+ } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+ failedOpenCount++;
+ }
+ }
+ if (!waitTillAllAssigned) {
+ // No need to wait, let assignment going on asynchronously
+ break;
+ }
+ if (!regionSet.isEmpty()) {
+ if (failedOpenCount == regionSet.size()) {
+ // all the regions we are waiting had an error on open.
+ break;
+ }
+ regionStates.waitForUpdate(100);
+ }
}
- return true;
+ return regionSet.isEmpty();
}
/**
@@ -1453,15 +1508,27 @@ public class AssignmentManager {
LOG.trace("Not using bulk assignment since we are assigning only " + regions +
" region(s) to " + servers + " server(s)");
}
+
+ // invoke assignment (async)
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
for (HRegionInfo region: plan.getValue()) {
if (!regionStates.isRegionOnline(region)) {
invokeAssign(region);
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
}
}
}
}
+
+ // wait for assignment completion
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
} else {
LOG.info("Bulk assigning " + regions + " region(s) across "
+ totalServers + " server(s), " + message);
http://git-wip-us.apache.org/repos/asf/hbase/blob/30ecf990/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
index 356f4af..43ea523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.RegionState.State;
/**
* Run bulk assign. Does one RCP per regionserver passing a
@@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner {
if (!failedPlans.isEmpty() && !server.isStopped()) {
reassigningRegions = reassignFailedPlans();
}
-
- Configuration conf = server.getConfiguration();
- long perRegionOpenTimeGuesstimate =
- conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
- long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
- + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
- RegionStates regionStates = assignmentManager.getRegionStates();
- // We're not synchronizing on regionsInTransition now because we don't use any iterator.
- while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
- Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
- while (regionInfoIterator.hasNext()) {
- HRegionInfo hri = regionInfoIterator.next();
- if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
- State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
- regionInfoIterator.remove();
- }
- }
- if (!waitTillAllAssigned) {
- // No need to wait, let assignment going on asynchronously
- break;
- }
- if (!regionSet.isEmpty()) {
- regionStates.waitForUpdate(100);
- }
- }
+ assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
+ reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
if (LOG.isDebugEnabled()) {
long elapsedTime = System.currentTimeMillis() - startTime;