You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/05/12 01:16:49 UTC
[1/3] hbase git commit: HBASE-13606 AssignmentManager.assign() is not
sync in both path
Repository: hbase
Updated Branches:
refs/heads/branch-1 33fe79cf6 -> 5d553adac
refs/heads/branch-1.1 ef56f47b5 -> 8c5717743
refs/heads/master c3f83a9ef -> 30ecf990f
HBASE-13606 AssignmentManager.assign() is not sync in both path
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8c571774
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8c571774
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8c571774
Branch: refs/heads/branch-1.1
Commit: 8c57177438a57ab433b60f787f310aa0429501d6
Parents: ef56f47
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon May 11 23:49:08 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue May 12 00:01:13 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/master/AssignmentManager.java | 99 ++++++++++++++++----
.../hbase/master/GeneralBulkAssigner.java | 29 +-----
2 files changed, 85 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c571774/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index bf9b207..34c4963 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -214,6 +214,7 @@ public class AssignmentManager extends ZooKeeperListener {
// bulk assigning may be not as efficient.
private final int bulkAssignThresholdRegions;
private final int bulkAssignThresholdServers;
+ private final int bulkPerRegionOpenTimeGuesstimate;
// Should bulk assignment wait till all regions are assigned,
// or it is timed out? This is useful to measure bulk assignment
@@ -255,7 +256,7 @@ public class AssignmentManager extends ZooKeeperListener {
/** Listeners that are called on assignment events. */
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
-
+
private RegionStateListener regionStateListener;
/**
@@ -312,6 +313,8 @@ public class AssignmentManager extends ZooKeeperListener {
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+ this.bulkPerRegionOpenTimeGuesstimate =
+ conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
@@ -1096,7 +1099,7 @@ public class AssignmentManager extends ZooKeeperListener {
return;
}
// Handle OPENED by removing from transition and deleted zk node
- regionState =
+ regionState =
regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
if (regionState != null) {
failedOpenTracker.remove(encodedName); // reset the count, if any
@@ -1788,6 +1791,18 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
+
+ // wait for assignment completion
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
+ for (HRegionInfo region: regions) {
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
+ }
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
LOG.debug("Bulk assigning done for " + destination);
return true;
} finally {
@@ -2617,22 +2632,62 @@ public class AssignmentManager extends ZooKeeperListener {
* If the region is already assigned, returns immediately. Otherwise, method
* blocks until the region is assigned.
* @param regionInfo region to wait on assignment for
+ * @return true if the region is assigned false otherwise.
* @throws InterruptedException
*/
public boolean waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
- while (!regionStates.isRegionOnline(regionInfo)) {
- if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
- || this.server.isStopped()) {
- return false;
- }
+ ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
+ regionSet.add(regionInfo);
+ return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+ }
- // We should receive a notification, but it's
- // better to have a timeout to recheck the condition here:
- // it lowers the impact of a race condition if any
- regionStates.waitForUpdate(100);
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final int reassigningRegions,
+ final long minEndTime) throws InterruptedException {
+ long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+ return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+ }
+
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
+ * @param waitTillAllAssigned true if we should wait all the regions to be assigned
+ * @param deadline the timestamp after which the wait is aborted
+ * @return true if all the regions are assigned false otherwise.
+ * @throws InterruptedException
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
+ // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+ while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
+ int failedOpenCount = 0;
+ Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+ while (regionInfoIterator.hasNext()) {
+ HRegionInfo hri = regionInfoIterator.next();
+ if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+ State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+ regionInfoIterator.remove();
+ } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+ failedOpenCount++;
+ }
+ }
+ if (!waitTillAllAssigned) {
+ // No need to wait, let assignment going on asynchronously
+ break;
+ }
+ if (!regionSet.isEmpty()) {
+ if (failedOpenCount == regionSet.size()) {
+ // all the regions we are waiting had an error on open.
+ break;
+ }
+ regionStates.waitForUpdate(100);
+ }
}
- return true;
+ return regionSet.isEmpty();
}
/**
@@ -2725,15 +2780,27 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.trace("Not using bulk assignment since we are assigning only " + regions +
" region(s) to " + servers + " server(s)");
}
+
+ // invoke assignment (async)
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
if (!assign(plan.getKey(), plan.getValue())) {
for (HRegionInfo region: plan.getValue()) {
if (!regionStates.isRegionOnline(region)) {
invokeAssign(region);
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
}
}
}
}
+
+ // wait for assignment completion
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
} else {
LOG.info("Bulk assigning " + regions + " region(s) across "
+ totalServers + " server(s), " + message);
@@ -3044,11 +3111,11 @@ public class AssignmentManager extends ZooKeeperListener {
if (serverName != null
&& !serverManager.getOnlineServers().containsKey(serverName)) {
LOG.info("Server " + serverName + " isn't online. SSH will handle this");
- continue;
+ continue;
}
HRegionInfo regionInfo = regionState.getRegion();
State state = regionState.getState();
-
+
switch (state) {
case CLOSED:
invokeAssign(regionInfo);
@@ -3060,7 +3127,7 @@ public class AssignmentManager extends ZooKeeperListener {
retrySendRegionClose(regionState);
break;
case FAILED_CLOSE:
- case FAILED_OPEN:
+ case FAILED_OPEN:
invokeUnAssign(regionInfo);
break;
default:
@@ -4270,7 +4337,7 @@ public class AssignmentManager extends ZooKeeperListener {
getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
return getRegionStates().getRegionAssignments(infos);
}
-
+
void setRegionStateListener(RegionStateListener listener) {
this.regionStateListener = listener;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/8c571774/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
index 356f4af..43ea523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.RegionState.State;
/**
* Run bulk assign. Does one RCP per regionserver passing a
@@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner {
if (!failedPlans.isEmpty() && !server.isStopped()) {
reassigningRegions = reassignFailedPlans();
}
-
- Configuration conf = server.getConfiguration();
- long perRegionOpenTimeGuesstimate =
- conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
- long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
- + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
- RegionStates regionStates = assignmentManager.getRegionStates();
- // We're not synchronizing on regionsInTransition now because we don't use any iterator.
- while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
- Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
- while (regionInfoIterator.hasNext()) {
- HRegionInfo hri = regionInfoIterator.next();
- if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
- State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
- regionInfoIterator.remove();
- }
- }
- if (!waitTillAllAssigned) {
- // No need to wait, let assignment going on asynchronously
- break;
- }
- if (!regionSet.isEmpty()) {
- regionStates.waitForUpdate(100);
- }
- }
+ assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
+ reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
if (LOG.isDebugEnabled()) {
long elapsedTime = System.currentTimeMillis() - startTime;
[3/3] hbase git commit: HBASE-13606 AssignmentManager.assign() is not
sync in both path
Posted by mb...@apache.org.
HBASE-13606 AssignmentManager.assign() is not sync in both path
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5d553ada
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5d553ada
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5d553ada
Branch: refs/heads/branch-1
Commit: 5d553adacdd2e1ad06abd9ebea9b954604683b01
Parents: 33fe79c
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon May 11 23:49:08 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue May 12 00:14:04 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/master/AssignmentManager.java | 99 ++++++++++++++++----
.../hbase/master/GeneralBulkAssigner.java | 29 +-----
2 files changed, 85 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5d553ada/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index bf9b207..34c4963 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -214,6 +214,7 @@ public class AssignmentManager extends ZooKeeperListener {
// bulk assigning may be not as efficient.
private final int bulkAssignThresholdRegions;
private final int bulkAssignThresholdServers;
+ private final int bulkPerRegionOpenTimeGuesstimate;
// Should bulk assignment wait till all regions are assigned,
// or it is timed out? This is useful to measure bulk assignment
@@ -255,7 +256,7 @@ public class AssignmentManager extends ZooKeeperListener {
/** Listeners that are called on assignment events. */
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
-
+
private RegionStateListener regionStateListener;
/**
@@ -312,6 +313,8 @@ public class AssignmentManager extends ZooKeeperListener {
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+ this.bulkPerRegionOpenTimeGuesstimate =
+ conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
int workers = conf.getInt("hbase.assignment.zkevent.workers", 20);
ThreadFactory threadFactory = Threads.newDaemonThreadFactory("AM.ZK.Worker");
@@ -1096,7 +1099,7 @@ public class AssignmentManager extends ZooKeeperListener {
return;
}
// Handle OPENED by removing from transition and deleted zk node
- regionState =
+ regionState =
regionStates.transitionOpenFromPendingOpenOrOpeningOnServer(rt,regionState, sn);
if (regionState != null) {
failedOpenTracker.remove(encodedName); // reset the count, if any
@@ -1788,6 +1791,18 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
}
+
+ // wait for assignment completion
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
+ for (HRegionInfo region: regions) {
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
+ }
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
LOG.debug("Bulk assigning done for " + destination);
return true;
} finally {
@@ -2617,22 +2632,62 @@ public class AssignmentManager extends ZooKeeperListener {
* If the region is already assigned, returns immediately. Otherwise, method
* blocks until the region is assigned.
* @param regionInfo region to wait on assignment for
+ * @return true if the region is assigned false otherwise.
* @throws InterruptedException
*/
public boolean waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
- while (!regionStates.isRegionOnline(regionInfo)) {
- if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
- || this.server.isStopped()) {
- return false;
- }
+ ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
+ regionSet.add(regionInfo);
+ return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+ }
- // We should receive a notification, but it's
- // better to have a timeout to recheck the condition here:
- // it lowers the impact of a race condition if any
- regionStates.waitForUpdate(100);
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final int reassigningRegions,
+ final long minEndTime) throws InterruptedException {
+ long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+ return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+ }
+
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
+ * @param waitTillAllAssigned true if we should wait all the regions to be assigned
+ * @param deadline the timestamp after which the wait is aborted
+ * @return true if all the regions are assigned false otherwise.
+ * @throws InterruptedException
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
+ // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+ while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
+ int failedOpenCount = 0;
+ Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+ while (regionInfoIterator.hasNext()) {
+ HRegionInfo hri = regionInfoIterator.next();
+ if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+ State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+ regionInfoIterator.remove();
+ } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+ failedOpenCount++;
+ }
+ }
+ if (!waitTillAllAssigned) {
+ // No need to wait, let assignment going on asynchronously
+ break;
+ }
+ if (!regionSet.isEmpty()) {
+ if (failedOpenCount == regionSet.size()) {
+ // all the regions we are waiting had an error on open.
+ break;
+ }
+ regionStates.waitForUpdate(100);
+ }
}
- return true;
+ return regionSet.isEmpty();
}
/**
@@ -2725,15 +2780,27 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.trace("Not using bulk assignment since we are assigning only " + regions +
" region(s) to " + servers + " server(s)");
}
+
+ // invoke assignment (async)
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
if (!assign(plan.getKey(), plan.getValue())) {
for (HRegionInfo region: plan.getValue()) {
if (!regionStates.isRegionOnline(region)) {
invokeAssign(region);
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
}
}
}
}
+
+ // wait for assignment completion
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
} else {
LOG.info("Bulk assigning " + regions + " region(s) across "
+ totalServers + " server(s), " + message);
@@ -3044,11 +3111,11 @@ public class AssignmentManager extends ZooKeeperListener {
if (serverName != null
&& !serverManager.getOnlineServers().containsKey(serverName)) {
LOG.info("Server " + serverName + " isn't online. SSH will handle this");
- continue;
+ continue;
}
HRegionInfo regionInfo = regionState.getRegion();
State state = regionState.getState();
-
+
switch (state) {
case CLOSED:
invokeAssign(regionInfo);
@@ -3060,7 +3127,7 @@ public class AssignmentManager extends ZooKeeperListener {
retrySendRegionClose(regionState);
break;
case FAILED_CLOSE:
- case FAILED_OPEN:
+ case FAILED_OPEN:
invokeUnAssign(regionInfo);
break;
default:
@@ -4270,7 +4337,7 @@ public class AssignmentManager extends ZooKeeperListener {
getSnapShotOfAssignment(Collection<HRegionInfo> infos) {
return getRegionStates().getRegionAssignments(infos);
}
-
+
void setRegionStateListener(RegionStateListener listener) {
this.regionStateListener = listener;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5d553ada/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
index 356f4af..43ea523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.RegionState.State;
/**
* Run bulk assign. Does one RCP per regionserver passing a
@@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner {
if (!failedPlans.isEmpty() && !server.isStopped()) {
reassigningRegions = reassignFailedPlans();
}
-
- Configuration conf = server.getConfiguration();
- long perRegionOpenTimeGuesstimate =
- conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
- long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
- + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
- RegionStates regionStates = assignmentManager.getRegionStates();
- // We're not synchronizing on regionsInTransition now because we don't use any iterator.
- while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
- Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
- while (regionInfoIterator.hasNext()) {
- HRegionInfo hri = regionInfoIterator.next();
- if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
- State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
- regionInfoIterator.remove();
- }
- }
- if (!waitTillAllAssigned) {
- // No need to wait, let assignment going on asynchronously
- break;
- }
- if (!regionSet.isEmpty()) {
- regionStates.waitForUpdate(100);
- }
- }
+ assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
+ reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
if (LOG.isDebugEnabled()) {
long elapsedTime = System.currentTimeMillis() - startTime;
[2/3] hbase git commit: HBASE-13606 AssignmentManager.assign() is not
sync in both path
Posted by mb...@apache.org.
HBASE-13606 AssignmentManager.assign() is not sync in both path
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30ecf990
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30ecf990
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30ecf990
Branch: refs/heads/master
Commit: 30ecf990fe2a343e418eedcffd1d8d5c94ab1fd3
Parents: c3f83a9
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Mon May 11 23:42:11 2015 +0100
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Tue May 12 00:13:29 2015 +0100
----------------------------------------------------------------------
.../hadoop/hbase/master/AssignmentManager.java | 89 +++++++++++++++++---
.../hbase/master/GeneralBulkAssigner.java | 29 +------
2 files changed, 80 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/30ecf990/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 4a1e71f..eae9999 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -156,6 +156,7 @@ public class AssignmentManager {
// bulk assigning may be not as efficient.
private final int bulkAssignThresholdRegions;
private final int bulkAssignThresholdServers;
+ private final int bulkPerRegionOpenTimeGuesstimate;
// Should bulk assignment wait till all regions are assigned,
// or it is timed out? This is useful to measure bulk assignment
@@ -194,7 +195,7 @@ public class AssignmentManager {
/** Listeners that are called on assignment events. */
private List<AssignmentListener> listeners = new CopyOnWriteArrayList<AssignmentListener>();
-
+
private RegionStateListener regionStateListener;
/**
@@ -244,6 +245,8 @@ public class AssignmentManager {
conf.getBoolean("hbase.bulk.assignment.waittillallassigned", false);
this.bulkAssignThresholdRegions = conf.getInt("hbase.bulk.assignment.threshold.regions", 7);
this.bulkAssignThresholdServers = conf.getInt("hbase.bulk.assignment.threshold.servers", 3);
+ this.bulkPerRegionOpenTimeGuesstimate =
+ conf.getInt("hbase.bulk.assignment.perregion.open.time", 10000);
this.metricsAssignmentManager = new MetricsAssignmentManager();
this.tableLockManager = tableLockManager;
@@ -831,6 +834,18 @@ public class AssignmentManager {
}
}
}
+
+ // wait for assignment completion
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions.size());
+ for (HRegionInfo region: regions) {
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
+ }
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
LOG.debug("Bulk assigning done for " + destination);
return true;
} finally {
@@ -1349,22 +1364,62 @@ public class AssignmentManager {
* If the region is already assigned, returns immediately. Otherwise, method
* blocks until the region is assigned.
* @param regionInfo region to wait on assignment for
+ * @return true if the region is assigned false otherwise.
* @throws InterruptedException
*/
public boolean waitForAssignment(HRegionInfo regionInfo)
throws InterruptedException {
- while (!regionStates.isRegionOnline(regionInfo)) {
- if (regionStates.isRegionInState(regionInfo, State.FAILED_OPEN)
- || this.server.isStopped()) {
- return false;
- }
+ ArrayList<HRegionInfo> regionSet = new ArrayList<HRegionInfo>(1);
+ regionSet.add(regionInfo);
+ return waitForAssignment(regionSet, true, Long.MAX_VALUE);
+ }
- // We should receive a notification, but it's
- // better to have a timeout to recheck the condition here:
- // it lowers the impact of a race condition if any
- regionStates.waitForUpdate(100);
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final int reassigningRegions,
+ final long minEndTime) throws InterruptedException {
+ long deadline = minEndTime + bulkPerRegionOpenTimeGuesstimate * (reassigningRegions + 1);
+ return waitForAssignment(regionSet, waitTillAllAssigned, deadline);
+ }
+
+ /**
+ * Waits until the specified region has completed assignment, or the deadline is reached.
+ * @param regionSet set of region to wait on. the set is modified and the assigned regions removed
+ * @param waitTillAllAssigned true if we should wait all the regions to be assigned
+ * @param deadline the timestamp after which the wait is aborted
+ * @return true if all the regions are assigned false otherwise.
+ * @throws InterruptedException
+ */
+ protected boolean waitForAssignment(final Collection<HRegionInfo> regionSet,
+ final boolean waitTillAllAssigned, final long deadline) throws InterruptedException {
+ // We're not synchronizing on regionsInTransition now because we don't use any iterator.
+ while (!regionSet.isEmpty() && !server.isStopped() && deadline > System.currentTimeMillis()) {
+ int failedOpenCount = 0;
+ Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
+ while (regionInfoIterator.hasNext()) {
+ HRegionInfo hri = regionInfoIterator.next();
+ if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
+ State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
+ regionInfoIterator.remove();
+ } else if (regionStates.isRegionInState(hri, State.FAILED_OPEN)) {
+ failedOpenCount++;
+ }
+ }
+ if (!waitTillAllAssigned) {
+ // No need to wait, let assignment going on asynchronously
+ break;
+ }
+ if (!regionSet.isEmpty()) {
+ if (failedOpenCount == regionSet.size()) {
+ // all the regions we are waiting had an error on open.
+ break;
+ }
+ regionStates.waitForUpdate(100);
+ }
}
- return true;
+ return regionSet.isEmpty();
}
/**
@@ -1453,15 +1508,27 @@ public class AssignmentManager {
LOG.trace("Not using bulk assignment since we are assigning only " + regions +
" region(s) to " + servers + " server(s)");
}
+
+ // invoke assignment (async)
+ ArrayList<HRegionInfo> userRegionSet = new ArrayList<HRegionInfo>(regions);
for (Map.Entry<ServerName, List<HRegionInfo>> plan: bulkPlan.entrySet()) {
if (!assign(plan.getKey(), plan.getValue()) && !server.isStopped()) {
for (HRegionInfo region: plan.getValue()) {
if (!regionStates.isRegionOnline(region)) {
invokeAssign(region);
+ if (!region.getTable().isSystemTable()) {
+ userRegionSet.add(region);
+ }
}
}
}
}
+
+ // wait for assignment completion
+ if (!waitForAssignment(userRegionSet, true, userRegionSet.size(),
+ System.currentTimeMillis())) {
+ LOG.debug("some user regions are still in transition: " + userRegionSet);
+ }
} else {
LOG.info("Bulk assigning " + regions + " region(s) across "
+ totalServers + " server(s), " + message);
http://git-wip-us.apache.org/repos/asf/hbase/blob/30ecf990/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
index 356f4af..43ea523 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/GeneralBulkAssigner.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.master;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -35,7 +34,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.master.RegionState.State;
/**
* Run bulk assign. Does one RCP per regionserver passing a
@@ -118,31 +116,8 @@ public class GeneralBulkAssigner extends BulkAssigner {
if (!failedPlans.isEmpty() && !server.isStopped()) {
reassigningRegions = reassignFailedPlans();
}
-
- Configuration conf = server.getConfiguration();
- long perRegionOpenTimeGuesstimate =
- conf.getLong("hbase.bulk.assignment.perregion.open.time", 1000);
- long endTime = Math.max(System.currentTimeMillis(), rpcWaitTime)
- + perRegionOpenTimeGuesstimate * (reassigningRegions + 1);
- RegionStates regionStates = assignmentManager.getRegionStates();
- // We're not synchronizing on regionsInTransition now because we don't use any iterator.
- while (!regionSet.isEmpty() && !server.isStopped() && endTime > System.currentTimeMillis()) {
- Iterator<HRegionInfo> regionInfoIterator = regionSet.iterator();
- while (regionInfoIterator.hasNext()) {
- HRegionInfo hri = regionInfoIterator.next();
- if (regionStates.isRegionOnline(hri) || regionStates.isRegionInState(hri,
- State.SPLITTING, State.SPLIT, State.MERGING, State.MERGED)) {
- regionInfoIterator.remove();
- }
- }
- if (!waitTillAllAssigned) {
- // No need to wait, let assignment going on asynchronously
- break;
- }
- if (!regionSet.isEmpty()) {
- regionStates.waitForUpdate(100);
- }
- }
+ assignmentManager.waitForAssignment(regionSet, waitTillAllAssigned,
+ reassigningRegions, Math.max(System.currentTimeMillis(), rpcWaitTime));
if (LOG.isDebugEnabled()) {
long elapsedTime = System.currentTimeMillis() - startTime;