You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/11/03 22:50:09 UTC

[1/4] hbase git commit: HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster

Repository: hbase
Updated Branches:
  refs/heads/branch-1 e61f6ff0e -> 1e227acd6
  refs/heads/branch-1.4 7a9e1dd1b -> 28c7315e0
  refs/heads/branch-2 b9b0f15cd -> 5df965158
  refs/heads/master 3e4b86d4d -> 125f3eace


HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/125f3eac
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/125f3eac
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/125f3eac

Branch: refs/heads/master
Commit: 125f3eace9b35e7947721bba5175ca5dc48921e8
Parents: 3e4b86d
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Nov 3 15:03:27 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Nov 3 15:05:01 2017 -0700

----------------------------------------------------------------------
 .../hbase/rsgroup/RSGroupInfoManager.java       |  4 +
 .../hbase/rsgroup/RSGroupInfoManagerImpl.java   | 89 +++++++++++++++++++-
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/125f3eac/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
index c8fee44..3fb40da 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
@@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
  */
 @InterfaceAudience.Private
 public interface RSGroupInfoManager {
+
+  String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
+  long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
+
   //Assigned before user tables
   TableName RSGROUP_TABLE_NAME =
       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");

http://git-wip-us.apache.org/repos/asf/hbase/blob/125f3eac/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 9520f5f..7cf04c7 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
 import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@@ -144,6 +146,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
   private Set<String> prevRSGroups = new HashSet<>();
   private final ServerEventsListenerThread serverEventsListenerThread =
       new ServerEventsListenerThread();
+  private FailedOpenUpdaterThread failedOpenUpdaterThread;
 
   private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
     this.masterServices = masterServices;
@@ -156,6 +159,9 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
     rsGroupStartupWorker.start();
     serverEventsListenerThread.start();
     masterServices.getServerManager().registerListener(serverEventsListenerThread);
+    failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
+    failedOpenUpdaterThread.start();
+    masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
   }
 
   static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
@@ -564,6 +570,26 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
     flushConfig(newGroupMap);
   }
 
+  // Called by FailedOpenUpdaterThread
+  private void updateFailedAssignments() {
+    // Kick all regions in FAILED_OPEN state
+    List<RegionInfo> stuckAssignments = Lists.newArrayList();
+    for (RegionStateNode state:
+        masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
+      if (state.isStuck()) {
+        stuckAssignments.add(state.getRegionInfo());
+      }
+    }
+    for (RegionInfo region: stuckAssignments) {
+      LOG.info("Retrying assignment of " + region);
+      try {
+        masterServices.getAssignmentManager().unassign(region);
+      } catch (IOException e) {
+        LOG.warn("Unable to reassign " + region, e);
+      }
+    }
+  }
+
   /**
    * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
    * servers. Notifications about server changes are received by registering {@link ServerListener}.
@@ -608,7 +634,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
           }
           try {
             synchronized (this) {
-              if(!changed) {
+              while (!changed) {
                 wait();
               }
               changed = false;
@@ -623,6 +649,67 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
     }
   }
 
+  private class FailedOpenUpdaterThread extends Thread implements ServerListener {
+    private final long waitInterval;
+    private volatile boolean hasChanged = false;
+
+    public FailedOpenUpdaterThread(Configuration conf) {
+      this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
+        DEFAULT_REASSIGN_WAIT_INTERVAL);
+      setDaemon(true);
+    }
+
+    @Override
+    public void serverAdded(ServerName serverName) {
+      serverChanged();
+    }
+
+    @Override
+    public void serverRemoved(ServerName serverName) {
+    }
+
+    @Override
+    public void run() {
+      while (isMasterRunning(masterServices)) {
+        boolean interrupted = false;
+        try {
+          synchronized (this) {
+            while (!hasChanged) {
+              wait();
+            }
+            hasChanged = false;
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+          interrupted = true;
+        }
+        if (!isMasterRunning(masterServices) || interrupted) {
+          continue;
+        }
+
+        // First, wait a while in case more servers are about to rejoin the cluster
+        try {
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+        }
+        if (!isMasterRunning(masterServices)) {
+          continue;
+        }
+
+        // Kick all regions in FAILED_OPEN state
+        updateFailedAssignments();
+      }
+    }
+
+    public void serverChanged() {
+      synchronized (this) {
+        hasChanged = true;
+        this.notify();
+      }
+    }
+  }
+
   private class RSGroupStartupWorker extends Thread {
     private final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
     private volatile boolean online = false;


[2/4] hbase git commit: HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster

Posted by ap...@apache.org.
HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1e227acd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1e227acd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1e227acd

Branch: refs/heads/branch-1
Commit: 1e227acd654bde80e27799ae16e5a806467141c8
Parents: e61f6ff
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Nov 3 15:03:08 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Nov 3 15:05:24 2017 -0700

----------------------------------------------------------------------
 .../hbase/rsgroup/RSGroupInfoManager.java       |  4 +
 .../hbase/rsgroup/RSGroupInfoManagerImpl.java   | 90 ++++++++++++++++++--
 2 files changed, 89 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1e227acd/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
index ab423e9..2330605 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
@@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
  */
 @InterfaceAudience.Private
 public interface RSGroupInfoManager {
+
+  String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
+  long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
+
   //Assigned before user tables
   TableName RSGROUP_TABLE_NAME =
       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");

http://git-wip-us.apache.org/repos/asf/hbase/blob/1e227acd/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 80eaefb..cfaa632 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.constraint.ConstraintException;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -81,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -119,6 +121,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
   private volatile Set<String> prevRSGroups;
   private RSGroupSerDe rsGroupSerDe;
   private DefaultServerUpdater defaultServerUpdater;
+  private FailedOpenUpdater failedOpenUpdater;
   private boolean isInit = false;
 
   public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
@@ -136,8 +139,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
     refresh();
     rsGroupStartupWorker.start();
     defaultServerUpdater = new DefaultServerUpdater(this);
+    Threads.setDaemonThreadRunning(defaultServerUpdater);
+    failedOpenUpdater = new FailedOpenUpdater(this);
+    Threads.setDaemonThreadRunning(failedOpenUpdater);
     master.getServerManager().registerListener(this);
-    defaultServerUpdater.start();
     isInit = true;
   }
 
@@ -493,6 +498,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
   @Override
   public void serverAdded(ServerName serverName) {
     defaultServerUpdater.serverChanged();
+    failedOpenUpdater.serverChanged();
   }
 
   @Override
@@ -503,18 +509,22 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
   private static class DefaultServerUpdater extends Thread {
     private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
     private RSGroupInfoManagerImpl mgr;
-    private boolean hasChanged = false;
+    private volatile boolean hasChanged = false;
 
     public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
       this.mgr = mgr;
+      setName(DefaultServerUpdater.class.getName()+"-" + mgr.master.getServerName());
+      setDaemon(true);
     }
 
     @Override
     public void run() {
       List<Address> prevDefaultServers = new LinkedList<Address>();
-      while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
+      while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
         try {
-          LOG.info("Updating default servers.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Updating default servers");
+          }
           List<Address> servers = mgr.getDefaultServers();
           Collections.sort(servers, new Comparator<Address>() {
             @Override
@@ -533,12 +543,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
           }
           try {
             synchronized (this) {
-              if(!hasChanged) {
+              while (!hasChanged) {
                 wait();
               }
               hasChanged = false;
             }
           } catch (InterruptedException e) {
+            LOG.warn("Interrupted", e);
           }
         } catch (IOException e) {
           LOG.warn("Failed to update default servers", e);
@@ -546,6 +557,75 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
       }
     }
 
+    // Called for both server additions and removals
+    public void serverChanged() {
+      synchronized (this) {
+        hasChanged = true;
+        this.notify();
+      }
+    }
+  }
+
+  private static class FailedOpenUpdater extends Thread {
+    private static final Log LOG = LogFactory.getLog(FailedOpenUpdater.class);
+
+    private final RSGroupInfoManagerImpl mgr;
+    private final long waitInterval;
+    private volatile boolean hasChanged = false;
+
+    public FailedOpenUpdater(RSGroupInfoManagerImpl mgr) {
+      this.mgr = mgr;
+      this.waitInterval = mgr.master.getConfiguration().getLong(REASSIGN_WAIT_INTERVAL_KEY,
+        DEFAULT_REASSIGN_WAIT_INTERVAL);
+      setName(FailedOpenUpdater.class.getName()+"-" + mgr.master.getServerName());
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
+        boolean interrupted = false;
+        try {
+          synchronized (this) {
+            while (!hasChanged) {
+              wait();
+            }
+            hasChanged = false;
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+          interrupted = true;
+        }
+        if (mgr.master.isAborted() || mgr.master.isStopped() || interrupted) {
+          continue;
+        }
+
+        // First, wait a while in case more servers are about to rejoin the cluster
+        try {
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+        }
+        if (mgr.master.isAborted() || mgr.master.isStopped()) {
+          continue;
+        }
+
+        // Kick all regions in FAILED_OPEN state
+        List<HRegionInfo> failedAssignments = Lists.newArrayList();
+        for (RegionState state:
+            mgr.master.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
+          if (state.isFailedOpen()) {
+            failedAssignments.add(state.getRegion());
+          }
+        }
+        for (HRegionInfo region: failedAssignments) {
+          LOG.info("Retrying assignment of " + region);
+          mgr.master.getAssignmentManager().unassign(region);
+        }
+      }
+    }
+
+    // Only called for server additions
     public void serverChanged() {
       synchronized (this) {
         hasChanged = true;


[3/4] hbase git commit: HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster

Posted by ap...@apache.org.
HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5df96515
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5df96515
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5df96515

Branch: refs/heads/branch-2
Commit: 5df9651581f599ba9bcbb0def660870ab0398ccc
Parents: b9b0f15
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Nov 3 15:03:27 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Nov 3 15:08:19 2017 -0700

----------------------------------------------------------------------
 .../hbase/rsgroup/RSGroupInfoManager.java       |  4 +
 .../hbase/rsgroup/RSGroupInfoManagerImpl.java   | 89 +++++++++++++++++++-
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5df96515/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
index c8fee44..3fb40da 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
@@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
  */
 @InterfaceAudience.Private
 public interface RSGroupInfoManager {
+
+  String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
+  long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
+
   //Assigned before user tables
   TableName RSGROUP_TABLE_NAME =
       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");

http://git-wip-us.apache.org/repos/asf/hbase/blob/5df96515/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 9520f5f..7cf04c7 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.Coprocessor;
@@ -65,6 +66,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.master.TableStateManager;
+import org.apache.hadoop.hbase.master.assignment.RegionStates.RegionStateNode;
 import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.procedure2.Procedure;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
@@ -144,6 +146,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
   private Set<String> prevRSGroups = new HashSet<>();
   private final ServerEventsListenerThread serverEventsListenerThread =
       new ServerEventsListenerThread();
+  private FailedOpenUpdaterThread failedOpenUpdaterThread;
 
   private RSGroupInfoManagerImpl(MasterServices masterServices) throws IOException {
     this.masterServices = masterServices;
@@ -156,6 +159,9 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
     rsGroupStartupWorker.start();
     serverEventsListenerThread.start();
     masterServices.getServerManager().registerListener(serverEventsListenerThread);
+    failedOpenUpdaterThread = new FailedOpenUpdaterThread(masterServices.getConfiguration());
+    failedOpenUpdaterThread.start();
+    masterServices.getServerManager().registerListener(failedOpenUpdaterThread);
   }
 
   static RSGroupInfoManager getInstance(MasterServices master) throws IOException {
@@ -564,6 +570,26 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
     flushConfig(newGroupMap);
   }
 
+  // Called by FailedOpenUpdaterThread
+  private void updateFailedAssignments() {
+    // Kick all regions in FAILED_OPEN state
+    List<RegionInfo> stuckAssignments = Lists.newArrayList();
+    for (RegionStateNode state:
+        masterServices.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
+      if (state.isStuck()) {
+        stuckAssignments.add(state.getRegionInfo());
+      }
+    }
+    for (RegionInfo region: stuckAssignments) {
+      LOG.info("Retrying assignment of " + region);
+      try {
+        masterServices.getAssignmentManager().unassign(region);
+      } catch (IOException e) {
+        LOG.warn("Unable to reassign " + region, e);
+      }
+    }
+  }
+
   /**
    * Calls {@link RSGroupInfoManagerImpl#updateDefaultServers(SortedSet)} to update list of known
    * servers. Notifications about server changes are received by registering {@link ServerListener}.
@@ -608,7 +634,7 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
           }
           try {
             synchronized (this) {
-              if(!changed) {
+              while (!changed) {
                 wait();
               }
               changed = false;
@@ -623,6 +649,67 @@ class RSGroupInfoManagerImpl implements RSGroupInfoManager {
     }
   }
 
+  private class FailedOpenUpdaterThread extends Thread implements ServerListener {
+    private final long waitInterval;
+    private volatile boolean hasChanged = false;
+
+    public FailedOpenUpdaterThread(Configuration conf) {
+      this.waitInterval = conf.getLong(REASSIGN_WAIT_INTERVAL_KEY,
+        DEFAULT_REASSIGN_WAIT_INTERVAL);
+      setDaemon(true);
+    }
+
+    @Override
+    public void serverAdded(ServerName serverName) {
+      serverChanged();
+    }
+
+    @Override
+    public void serverRemoved(ServerName serverName) {
+    }
+
+    @Override
+    public void run() {
+      while (isMasterRunning(masterServices)) {
+        boolean interrupted = false;
+        try {
+          synchronized (this) {
+            while (!hasChanged) {
+              wait();
+            }
+            hasChanged = false;
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+          interrupted = true;
+        }
+        if (!isMasterRunning(masterServices) || interrupted) {
+          continue;
+        }
+
+        // First, wait a while in case more servers are about to rejoin the cluster
+        try {
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+        }
+        if (!isMasterRunning(masterServices)) {
+          continue;
+        }
+
+        // Kick all regions in FAILED_OPEN state
+        updateFailedAssignments();
+      }
+    }
+
+    public void serverChanged() {
+      synchronized (this) {
+        hasChanged = true;
+        this.notify();
+      }
+    }
+  }
+
   private class RSGroupStartupWorker extends Thread {
     private final Log LOG = LogFactory.getLog(RSGroupStartupWorker.class);
     private volatile boolean online = false;


[4/4] hbase git commit: HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster

Posted by ap...@apache.org.
HBASE-19144 [RSgroups] Retry assignments in FAILED_OPEN state when servers (re)join the cluster


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/28c7315e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/28c7315e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/28c7315e

Branch: refs/heads/branch-1.4
Commit: 28c7315e0b1db7e7fd30ba996b8735ac2d805756
Parents: 7a9e1dd
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Nov 3 15:03:08 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Nov 3 15:08:46 2017 -0700

----------------------------------------------------------------------
 .../hbase/rsgroup/RSGroupInfoManager.java       |  4 +
 .../hbase/rsgroup/RSGroupInfoManagerImpl.java   | 90 ++++++++++++++++++--
 2 files changed, 89 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/28c7315e/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
index ab423e9..2330605 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManager.java
@@ -36,6 +36,10 @@ import org.apache.hadoop.hbase.net.Address;
  */
 @InterfaceAudience.Private
 public interface RSGroupInfoManager {
+
+  String REASSIGN_WAIT_INTERVAL_KEY = "hbase.rsgroup.reassign.wait";
+  long DEFAULT_REASSIGN_WAIT_INTERVAL = 30 * 1000L;
+
   //Assigned before user tables
   TableName RSGROUP_TABLE_NAME =
       TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "rsgroup");

http://git-wip-us.apache.org/repos/asf/hbase/blob/28c7315e/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
index 80eaefb..cfaa632 100644
--- a/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
+++ b/hbase-rsgroup/src/main/java/org/apache/hadoop/hbase/rsgroup/RSGroupInfoManagerImpl.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.hbase.constraint.ConstraintException;
 import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.master.ServerListener;
 import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
 import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@@ -81,6 +82,7 @@ import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ModifyRegionUtils;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -119,6 +121,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
   private volatile Set<String> prevRSGroups;
   private RSGroupSerDe rsGroupSerDe;
   private DefaultServerUpdater defaultServerUpdater;
+  private FailedOpenUpdater failedOpenUpdater;
   private boolean isInit = false;
 
   public RSGroupInfoManagerImpl(MasterServices master) throws IOException {
@@ -136,8 +139,10 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
     refresh();
     rsGroupStartupWorker.start();
     defaultServerUpdater = new DefaultServerUpdater(this);
+    Threads.setDaemonThreadRunning(defaultServerUpdater);
+    failedOpenUpdater = new FailedOpenUpdater(this);
+    Threads.setDaemonThreadRunning(failedOpenUpdater);
     master.getServerManager().registerListener(this);
-    defaultServerUpdater.start();
     isInit = true;
   }
 
@@ -493,6 +498,7 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
   @Override
   public void serverAdded(ServerName serverName) {
     defaultServerUpdater.serverChanged();
+    failedOpenUpdater.serverChanged();
   }
 
   @Override
@@ -503,18 +509,22 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
   private static class DefaultServerUpdater extends Thread {
     private static final Log LOG = LogFactory.getLog(DefaultServerUpdater.class);
     private RSGroupInfoManagerImpl mgr;
-    private boolean hasChanged = false;
+    private volatile boolean hasChanged = false;
 
     public DefaultServerUpdater(RSGroupInfoManagerImpl mgr) {
       this.mgr = mgr;
+      setName(DefaultServerUpdater.class.getName()+"-" + mgr.master.getServerName());
+      setDaemon(true);
     }
 
     @Override
     public void run() {
       List<Address> prevDefaultServers = new LinkedList<Address>();
-      while(!mgr.master.isAborted() || !mgr.master.isStopped()) {
+      while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
         try {
-          LOG.info("Updating default servers.");
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Updating default servers");
+          }
           List<Address> servers = mgr.getDefaultServers();
           Collections.sort(servers, new Comparator<Address>() {
             @Override
@@ -533,12 +543,13 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
           }
           try {
             synchronized (this) {
-              if(!hasChanged) {
+              while (!hasChanged) {
                 wait();
               }
               hasChanged = false;
             }
           } catch (InterruptedException e) {
+            LOG.warn("Interrupted", e);
           }
         } catch (IOException e) {
           LOG.warn("Failed to update default servers", e);
@@ -546,6 +557,75 @@ public class RSGroupInfoManagerImpl implements RSGroupInfoManager, ServerListene
       }
     }
 
+    // Called for both server additions and removals
+    public void serverChanged() {
+      synchronized (this) {
+        hasChanged = true;
+        this.notify();
+      }
+    }
+  }
+
+  private static class FailedOpenUpdater extends Thread {
+    private static final Log LOG = LogFactory.getLog(FailedOpenUpdater.class);
+
+    private final RSGroupInfoManagerImpl mgr;
+    private final long waitInterval;
+    private volatile boolean hasChanged = false;
+
+    public FailedOpenUpdater(RSGroupInfoManagerImpl mgr) {
+      this.mgr = mgr;
+      this.waitInterval = mgr.master.getConfiguration().getLong(REASSIGN_WAIT_INTERVAL_KEY,
+        DEFAULT_REASSIGN_WAIT_INTERVAL);
+      setName(FailedOpenUpdater.class.getName()+"-" + mgr.master.getServerName());
+      setDaemon(true);
+    }
+
+    @Override
+    public void run() {
+      while (!mgr.master.isAborted() && !mgr.master.isStopped()) {
+        boolean interrupted = false;
+        try {
+          synchronized (this) {
+            while (!hasChanged) {
+              wait();
+            }
+            hasChanged = false;
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+          interrupted = true;
+        }
+        if (mgr.master.isAborted() || mgr.master.isStopped() || interrupted) {
+          continue;
+        }
+
+        // First, wait a while in case more servers are about to rejoin the cluster
+        try {
+          Thread.sleep(waitInterval);
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted", e);
+        }
+        if (mgr.master.isAborted() || mgr.master.isStopped()) {
+          continue;
+        }
+
+        // Kick all regions in FAILED_OPEN state
+        List<HRegionInfo> failedAssignments = Lists.newArrayList();
+        for (RegionState state:
+            mgr.master.getAssignmentManager().getRegionStates().getRegionsInTransition()) {
+          if (state.isFailedOpen()) {
+            failedAssignments.add(state.getRegion());
+          }
+        }
+        for (HRegionInfo region: failedAssignments) {
+          LOG.info("Retrying assignment of " + region);
+          mgr.master.getAssignmentManager().unassign(region);
+        }
+      }
+    }
+
+    // Only called for server additions
     public void serverChanged() {
       synchronized (this) {
         hasChanged = true;