You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/03 22:50:11 UTC
[3/4] hbase git commit: HBASE-19144 [RSgroups] Retry assignments in
FAILED_OPEN state when servers (re)join the cluster
HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5df96515
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5df96515
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5df96515
Branch: refs/heads/branch-2
Commit: 5df9651581f599ba9bcbb0def660870ab0398ccc
Parents: b9b0f15
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Nov 3 15:03:27 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Nov 3 15:08:19 2017 -0700
----------------------------------------------------------------------
.../hbase/rsgroup/RSGroupInfoManager.java | 4 +
.../hbase/rsgroup/RSGroupInfoManagerImpl.java | 89 +++++++++++++++++++-
2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5df96515/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
index c8fee44..3fb40da 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
@@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
*/
@InterfaceAudience.Private
public interface RSGroupInfoManager {
+
+ String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
+ long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
+
//Assigned before user tables
TableName RSGROUP_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
http://git-wip-us.apache.org/repos/asf/hbase/blob/5df96515/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 9520f5f..7cf04c7 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@@ -144,6 +146,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
private Set<String> prevRSGroups = new HashSet<>();
private final ServerEventsListenerThread serverEventsListenerThread =
new ServerEventsListenerThread();
+ private FailedOpenUpdaterThread failedOpenUpdaterThread;
private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
this.masterServices = masterServices;
@@ -156,6 +159,9 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
rsGroupStartupWorker.start();
serverEventsListenerThread.start();
masterServices.getServerManager().registerListener(serverEventsListenerThread);
+ failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
+ failedOpenUpdaterThread.start();
+ masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
}
static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
@@ -564,6 +570,26 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
flushConfig(newGroupMap);
}
+ // Called by FailedOpenUpdaterThread
+ private void updateFailedAssignments() {
+ // Kick all regions in FAILED_OPEN state
+ List<RegionInfo> stuckAssignments = Lists.newArrayList();
+ for (RegionStateNode state:
+ masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
+ if (state.isStuck()) {
+ stuckAssignments.add(state.getRegionInfo());
+ }
+ }
+ for (RegionInfo region: stuckAssignments) {
+ LOG.info("Retrying assignment of " + region);
+ try {
+ masterServices.getAssignmentManager().unassign(region);
+ } catch (IOException e) {
+ LOG.warn("Unable to reassign " + region, e);
+ }
+ }
+ }
+
/**
* Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
* servers. Notifications about server changes are received by registering {@link ServerListener}.
@@ -608,7 +634,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
try {
synchronized (this) {
- if(!changed) {
+ while (!changed) {
wait();
}
changed = false;
@@ -623,6 +649,67 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
}
}
+ private class FailedOpenUpdaterThread extends Thread implements ServerListener {
+ private final long waitInterval;
+ private volatile boolean hasChanged = false;
+
+ public FailedOpenUpdaterThread(Configuration conf) {
+ this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
+ DEFAULT_REASSIGN_WAIT_INTERVAL);
+ setDaemon(true);
+ }
+
+ @Override
+ public void serverAdded(ServerName serverName) {
+ serverChanged();
+ }
+
+ @Override
+ public void serverRemoved(ServerName serverName) {
+ }
+
+ @Override
+ public void run() {
+ while (isMasterRunning(masterServices)) {
+ boolean interrupted = false;
+ try {
+ synchronized (this) {
+ while (!hasChanged) {
+ wait();
+ }
+ hasChanged = false;
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
+ interrupted = true;
+ }
+ if (!isMasterRunning(masterServices) || interrupted) {
+ continue;
+ }
+
+ // First, wait a while in case more servers are about to rejoin the cluster
+ try {
+ Thread.sleep(waitInterval);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
+ }
+ if (!isMasterRunning(masterServices)) {
+ continue;
+ }
+
+ // Kick all regions in FAILED_OPEN state
+ updateFailedAssignments();
+ }
+ }
+
+ public void serverChanged() {
+ synchronized (this) {
+ hasChanged = true;
+ this.notify();
+ }
+ }
+ }
+
private class RSGroupStartupWorker extends Thread {
private final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
private volatile boolean online = false;