You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/03 22:50:10 UTC
[2/4] hbase git commit: HBASE-19144 [RSgroups] Retry assignments in
FAILED_OPEN state when servers (re)join the cluster
HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1e227acd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1e227acd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1e227acd
Branch: refs/heads/branch-1
Commit: 1e227acd654bde80e27799ae16e5a806467141c8
Parents: e61f6ff
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Nov 3 15:03:08 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Nov 3 15:05:24 2017 -0700
----------------------------------------------------------------------
.../hbase/rsgroup/RSGroupInfoManager.java | 4 +
.../hbase/rsgroup/RSGroupInfoManagerImpl.java | 90 ++++++++++++++++++--
2 files changed, 89 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1e227acd/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
index ab423e9..2330605 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
@@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
*/
@InterfaceAudience.Private
public interface RSGroupInfoManager {
+
+ String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
+ long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
+
//Assigned before user tables
TableName RSGROUP_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");
http://git-wip-us.apache.org/repos/asf/hbase/blob/1e227acd/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 80eaefb..cfaa632 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.constraint.ConstraintException;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.master.ServerListener;
import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -81,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -119,6 +121,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
private volatile Set<String> prevRSGroups;
private RSGroupSerDe rsGroupSerDe;
private DefaultServerUpdater defaultServerUpdater;
+ private FailedOpenUpdater failedOpenUpdater;
private boolean isInit = false;
public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
@@ -136,8 +139,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
refresh();
rsGroupStartupWorker.start();
defaultServerUpdater = new DefaultServerUpdater(this);
+ Threads.setDaemonThreadRunning(defaultServerUpdater);
+ failedOpenUpdater = new FailedOpenUpdater(this);
+ Threads.setDaemonThreadRunning(failedOpenUpdater);
master.getServerManager().registerListener(this);
- defaultServerUpdater.start();
isInit = true;
}
@@ -493,6 +498,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
@Override
public void serverAdded(ServerName serverName) {
defaultServerUpdater.serverChanged();
+ failedOpenUpdater.serverChanged();
}
@Override
@@ -503,18 +509,22 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
private static class DefaultServerUpdater extends Thread {
private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
private RSGroupInfoManagerImpl mgr;
- private boolean hasChanged = false;
+ private volatile boolean hasChanged = false;
public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
this.mgr = mgr;
+ setName(DefaultServerUpdater.class.getName()+"-" + mgr.master.getServerName());
+ setDaemon(true);
}
@Override
public void run() {
List<Address> prevDefaultServers = new LinkedList<Address>();
- while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
+ while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
try {
- LOG.info("Updating default servers.");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Updating default servers");
+ }
List<Address> servers = mgr.getDefaultServers();
Collections.sort(servers, new Comparator<Address>() {
@Override
@@ -533,12 +543,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
try {
synchronized (this) {
- if(!hasChanged) {
+ while (!hasChanged) {
wait();
}
hasChanged = false;
}
} catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
}
} catch (IOException e) {
LOG.warn("Failed to update default servers", e);
@@ -546,6 +557,75 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
}
}
+ // Called for both server additions and removals
+ public void serverChanged() {
+ synchronized (this) {
+ hasChanged = true;
+ this.notify();
+ }
+ }
+ }
+
+ private static class FailedOpenUpdater extends Thread {
+ private static final Log LOG = LogFactory.getLog(FailedOpenUpdater.class);
+
+ private final RSGroupInfoManagerImpl mgr;
+ private final long waitInterval;
+ private volatile boolean hasChanged = false;
+
+ public FailedOpenUpdater(RSGroupInfoManagerImpl mgr) {
+ this.mgr = mgr;
+ this.waitInterval = mgr.master.getConfiguration().getLong(REASSIGN_WAIT_INTERVAL_KEY,
+ DEFAULT_REASSIGN_WAIT_INTERVAL);
+ setName(FailedOpenUpdater.class.getName()+"-" + mgr.master.getServerName());
+ setDaemon(true);
+ }
+
+ @Override
+ public void run() {
+ while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
+ boolean interrupted = false;
+ try {
+ synchronized (this) {
+ while (!hasChanged) {
+ wait();
+ }
+ hasChanged = false;
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
+ interrupted = true;
+ }
+ if (mgr.master.isAborted() || mgr.master.isStopped() || interrupted) {
+ continue;
+ }
+
+ // First, wait a while in case more servers are about to rejoin the cluster
+ try {
+ Thread.sleep(waitInterval);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted", e);
+ }
+ if (mgr.master.isAborted() || mgr.master.isStopped()) {
+ continue;
+ }
+
+ // Kick all regions in FAILED_OPEN state
+ List<HRegionInfo> failedAssignments = Lists.newArrayList();
+ for (RegionState state:
+ mgr.master.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
+ if (state.isFailedOpen()) {
+ failedAssignments.add(state.getRegion());
+ }
+ }
+ for (HRegionInfo region: failedAssignments) {
+ LOG.info("Retrying assignment of " + region);
+ mgr.master.getAssignmentManager().unassign(region);
+ }
+ }
+ }
+
+ // Only called for server additions
public void serverChanged() {
synchronized (this) {
hasChanged = true;