You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/05/31 21:41:32 UTC

[hbase] branch branch-1 updated: HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed … (#1755)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 3c13884  HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed … (#1755)
3c13884 is described below

commit 3c138845d935b7183d02f1312cf8a1e5ce690572
Author: sguggilam <sa...@gmail.com>
AuthorDate: Sun May 31 14:41:18 2020 -0700

    HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed … (#1755)
    
    HBASE-24069: Provide an ExponentialBackOffPolicy sleep between failed region close requests
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../hadoop/hbase/master/AssignmentManager.java     | 194 +++++++++++----------
 .../master/TestAssignmentManagerOnCluster.java     |  77 +++++++-
 2 files changed, 175 insertions(+), 96 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 5386f6e..78294a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -37,8 +37,8 @@ import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
 import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
-import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -1985,7 +1984,7 @@ public class AssignmentManager extends ZooKeeperListener {
       // ClosedRegionhandler can remove the server from this.regions
       if (!serverManager.isServerOnline(server)) {
         LOG.debug("Offline " + region.getRegionNameAsString()
-          + ", no need to unassign since it's on a dead server: " + server);
+            + ", no need to unassign since it's on a dead server: " + server);
         if (transitionInZK) {
           // delete the node. if no node exists need not bother.
           deleteClosingOrClosedNode(region, server);
@@ -1995,42 +1994,39 @@ public class AssignmentManager extends ZooKeeperListener {
         }
         return;
       }
+      long sleepTime = 0;
       try {
         // Send CLOSE RPC
-        if (serverManager.sendRegionClose(server, region,
-          versionOfClosingNode, dest, transitionInZK)) {
-          LOG.debug("Sent CLOSE to " + server + " for region " +
-            region.getRegionNameAsString());
+        if (serverManager.sendRegionClose(server, region, versionOfClosingNode, dest,
+          transitionInZK)) {
+          LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString());
           if (useZKForAssignment && !transitionInZK && state != null) {
             // Retry to make sure the region is
             // closed so as to avoid double assignment.
-            unassign(region, state, versionOfClosingNode,
-              dest, transitionInZK, src);
+            unassign(region, state, versionOfClosingNode, dest, transitionInZK, src);
           }
           return;
         }
         // This never happens. Currently regionserver close always return true.
         // Todo; this can now happen (0.96) if there is an exception in a coprocessor
-        LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
-          region.getRegionNameAsString());
+        LOG.warn("Server " + server + " region CLOSE RPC returned false for "
+            + region.getRegionNameAsString());
       } catch (Throwable t) {
-        long sleepTime = 0;
         Configuration conf = this.server.getConfiguration();
         if (t instanceof RemoteException) {
-          t = ((RemoteException)t).unwrapRemoteException();
+          t = ((RemoteException) t).unwrapRemoteException();
         }
         boolean logRetries = true;
-        if (t instanceof RegionServerAbortedException
-            || t instanceof RegionServerStoppedException
+        if (t instanceof RegionServerStoppedException
             || t instanceof ServerNotRunningYetException) {
           // RS is aborting or stopping, we cannot offline the region since the region may need
-          // to do WAL recovery. Until we see  the RS expiration, we should retry.
+          // to do WAL recovery. Until we see the RS expiration, we should retry.
           sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
             RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
 
         } else if (t instanceof NotServingRegionException) {
-          LOG.debug("Offline " + region.getRegionNameAsString()
-            + ", it's not any more on " + server, t);
+          LOG.debug(
+            "Offline " + region.getRegionNameAsString() + ", it's not any more on " + server, t);
           if (transitionInZK) {
             deleteClosingOrClosedNode(region, server);
           }
@@ -2038,25 +2034,24 @@ public class AssignmentManager extends ZooKeeperListener {
             regionOffline(region);
           }
           return;
-        } else if ((t instanceof FailedServerException) || (state != null &&
-            t instanceof RegionAlreadyInTransitionException)) {
+        } else if ((t instanceof FailedServerException)
+            || (state != null && t instanceof RegionAlreadyInTransitionException)) {
           if (t instanceof FailedServerException) {
             sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
-                  RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+              RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
           } else {
             // RS is already processing this region, only need to update the timestamp
             LOG.debug("update " + state + " the timestamp.");
             state.updateTimestampToNow();
             if (maxWaitTime < 0) {
-              maxWaitTime =
-                  EnvironmentEdgeManager.currentTime()
-                      + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
-                        DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
+              maxWaitTime = EnvironmentEdgeManager.currentTime() + conf.getLong(
+                ALREADY_IN_TRANSITION_WAITTIME, DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
             }
             long now = EnvironmentEdgeManager.currentTime();
             if (now < maxWaitTime) {
-              LOG.debug("Region is already in transition; "
-                + "waiting up to " + (maxWaitTime - now) + "ms", t);
+              LOG.debug("Region is already in transition; " + "waiting up to "
+                  + (maxWaitTime - now) + "ms",
+                t);
               sleepTime = 100;
               i--; // reset the try count
               logRetries = false;
@@ -2064,28 +2059,40 @@ public class AssignmentManager extends ZooKeeperListener {
           }
         }
 
-        try {
-          if (sleepTime > 0) {
-            Thread.sleep(sleepTime);
-          }
-        } catch (InterruptedException ie) {
-          LOG.warn("Failed to unassign "
-            + region.getRegionNameAsString() + " since interrupted", ie);
-          Thread.currentThread().interrupt();
-          if (state != null) {
-            regionStates.updateRegionState(region, State.FAILED_CLOSE);
-          }
-          return;
-        }
-
         if (logRetries) {
-          LOG.info("Server " + server + " returned " + t + " for "
-            + region.getRegionNameAsString() + ", try=" + i
-            + " of " + this.maximumAttempts, t);
+          LOG.info("Server " + server + " returned " + t + " for " + region.getRegionNameAsString()
+              + ", try=" + i + " of " + this.maximumAttempts,
+            t);
           // Presume retry or server will expire.
         }
       }
+      // sleepTime is set in one of the following cases (reasons commented above):
+      // 1. Region server stopping or aborting
+      // 2. Region already in transition
+      // 3. Connecting to server that is already dead
+      //
+      // If sleepTime is not set by any of the cases, set it to sleep for
+      // configured exponential backoff time
+      if (sleepTime == 0 && i != maximumAttempts) {
+        sleepTime = backoffPolicy.getBackoffTime(retryConfig, i);
+        LOG.info("Waiting for " + sleepTime + "milliseconds exponential backoff time for "
+            + region.getRegionNameAsString() + " before next retry " + (i + 1) + " of "
+            + this.maximumAttempts);
+      }
+      try {
+        if (sleepTime > 0 && i != maximumAttempts) {
+          Thread.sleep(sleepTime);
+        }
+      } catch (InterruptedException ie) {
+        LOG.warn("Failed to unassign " + region.getRegionNameAsString() + " since interrupted", ie);
+        if (state != null) {
+          regionStates.updateRegionState(region, State.FAILED_CLOSE);
+        }
+        Thread.currentThread().interrupt();
+        return;
+      }
     }
+
     // Run out of attempts
     if (state != null) {
       regionStates.updateRegionState(region, State.FAILED_CLOSE);
@@ -2108,54 +2115,57 @@ public class AssignmentManager extends ZooKeeperListener {
       LOG.debug("Force region state offline " + state);
     }
 
-    switch (state.getState()) {
-    case OPEN:
-    case OPENING:
-    case PENDING_OPEN:
-    case CLOSING:
-    case PENDING_CLOSE:
-      if (!forceNewPlan) {
-        LOG.debug("Skip assigning " +
-          region + ", it is already " + state);
-        return null;
-      }
-    case FAILED_CLOSE:
-    case FAILED_OPEN:
-      unassign(region, state, -1, null, false, null);
-      state = regionStates.getRegionState(region);
-      if (state.isFailedClose()) {
-        // If we can't close the region, we can't re-assign
-        // it so as to avoid possible double assignment/data loss.
-        LOG.info("Skip assigning " +
-          region + ", we couldn't close it: " + state);
-        return null;
-      }
-    case OFFLINE:
-      // This region could have been open on this server
-      // for a while. If the server is dead and not processed
-      // yet, we can move on only if the meta shows the
-      // region is not on this server actually, or on a server
-      // not dead, or dead and processed already.
-      // In case not using ZK, we don't need this check because
-      // we have the latest info in memory, and the caller
-      // will do another round checking any way.
-      if (useZKForAssignment
-          && regionStates.isServerDeadAndNotProcessed(sn)
-          && wasRegionOnDeadServerByMeta(region, sn)) {
-        if (!regionStates.isRegionInTransition(region)) {
-          LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
-          regionStates.updateRegionState(region, State.OFFLINE);
-        }
-        LOG.info("Skip assigning " + region.getRegionNameAsString()
-            + ", it is on a dead but not processed yet server: " + sn);
-        return null;
+    // We need a lock on the region as we could update it
+    Lock lock = locker.acquireLock(region.getEncodedName());
+    try {
+      switch (state.getState()) {
+        case OPEN:
+        case OPENING:
+        case PENDING_OPEN:
+        case CLOSING:
+        case PENDING_CLOSE:
+          if (!forceNewPlan) {
+            LOG.debug("Skip assigning " + region + ", it is already " + state);
+            return null;
+          }
+        case FAILED_CLOSE:
+        case FAILED_OPEN:
+          unassign(region, state, -1, null, false, null);
+          state = regionStates.getRegionState(region);
+          if (state.isFailedClose()) {
+            // If we can't close the region, we can't re-assign
+            // it so as to avoid possible double assignment/data loss.
+            LOG.info("Skip assigning " + region + ", we couldn't close it: " + state);
+            return null;
+          }
+        case OFFLINE:
+          // This region could have been open on this server
+          // for a while. If the server is dead and not processed
+          // yet, we can move on only if the meta shows the
+          // region is not on this server actually, or on a server
+          // not dead, or dead and processed already.
+          // In case not using ZK, we don't need this check because
+          // we have the latest info in memory, and the caller
+          // will do another round checking any way.
+          if (useZKForAssignment && regionStates.isServerDeadAndNotProcessed(sn)
+              && wasRegionOnDeadServerByMeta(region, sn)) {
+            if (!regionStates.isRegionInTransition(region)) {
+              LOG.info(
+                "Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
+              regionStates.updateRegionState(region, State.OFFLINE);
+            }
+            LOG.info("Skip assigning " + region.getRegionNameAsString()
+                + ", it is on a dead but not processed yet server: " + sn);
+            return null;
+          }
+        case CLOSED:
+          break;
+        default:
+          LOG.error("Trying to assign region " + region + ", which is " + state);
+          return null;
       }
-    case CLOSED:
-      break;
-    default:
-      LOG.error("Trying to assign region " + region
-        + ", which is " + state);
-      return null;
+    } finally {
+      lock.unlock();
     }
     return state;
   }
@@ -3623,7 +3633,7 @@ public class AssignmentManager extends ZooKeeperListener {
     if (failedOpenTracker.containsKey(regionInfo.getEncodedName())) {
       // Sleep before reassigning if this region has failed to open before
       long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
-          getFailedAttempts(regionInfo.getEncodedName()));
+        getFailedAttempts(regionInfo.getEncodedName()));
       invokeAssignLater(regionInfo, forceNewPlan, sleepTime);
     } else {
       // Immediately reassign if this region has never failed an open before
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 69dfa40..28f9e83 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -32,11 +32,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.MetaTableAccessor;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
@@ -72,11 +71,13 @@ import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ConfigUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.TestTableName;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -84,9 +85,13 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
 
 /**
  * This tests AssignmentManager with a testing cluster.
@@ -98,6 +103,8 @@ public class TestAssignmentManagerOnCluster {
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   final static Configuration conf = TEST_UTIL.getConfiguration();
   private static HBaseAdmin admin;
+  @Rule
+  public TestTableName testTableName = new TestTableName();
 
   static void setupOnce() throws Exception {
     // Using the our load balancer to control region plans
@@ -599,6 +606,68 @@ public class TestAssignmentManagerOnCluster {
   }
 
   /**
+   * This tests region close with exponential backoff
+   */
+  @Test(timeout = 60000)
+  public void testCloseRegionWithExponentialBackOff() throws Exception {
+    TableName tableName = testTableName.getTableName();
+    // Set the backoff time between each retry for failed close
+    TEST_UTIL.getMiniHBaseCluster().getConf().setLong("hbase.assignment.retry.sleep.initial", 1000);
+    HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+    TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName());
+    TEST_UTIL.getMiniHBaseCluster().startMaster(); // restart the master for conf take into affect
+
+    try {
+      ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
+          new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("ExponentialBackOff"));
+
+      HTableDescriptor desc = new HTableDescriptor(tableName);
+      desc.addFamily(new HColumnDescriptor(FAMILY));
+      admin.createTable(desc);
+
+      Table meta = new HTable(conf, TableName.META_TABLE_NAME);
+      HRegionInfo hri =
+          new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
+      MetaTableAccessor.addRegionToMeta(meta, hri);
+
+      HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+      AssignmentManager am = master.getAssignmentManager();
+      assertTrue(TEST_UTIL.assignRegion(hri));
+      ServerName sn = am.getRegionStates().getRegionServerOfRegion(hri);
+      TEST_UTIL.assertRegionOnServer(hri, sn, 6000);
+
+      MyRegionObserver.preCloseEnabled.set(true);
+      // Unset the precloseEnabled flag after 1 second for next retry to succeed
+      scheduledThreadPoolExecutor.schedule(new Runnable() {
+        @Override
+        public void run() {
+          MyRegionObserver.preCloseEnabled.set(false);
+        }
+      }, 1000, TimeUnit.MILLISECONDS);
+      am.unassign(hri);
+
+      // region may still be assigned now since it's closing,
+      // let's check if it's assigned after it's out of transition
+      am.waitOnRegionToClearRegionsInTransition(hri);
+
+      // region should be closed and re-assigned
+      assertTrue(am.waitForAssignment(hri));
+      ServerName serverName =
+          master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+      TEST_UTIL.assertRegionOnServer(hri, serverName, 6000);
+    } finally {
+      MyRegionObserver.preCloseEnabled.set(false);
+      TEST_UTIL.deleteTable(tableName);
+
+      // reset the backoff time to default
+      TEST_UTIL.getMiniHBaseCluster().getConf().unset("hbase.assignment.retry.sleep.initial");
+      activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster();
+      TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName());
+      TEST_UTIL.getMiniHBaseCluster().startMaster();
+    }
+  }
+
+  /**
    * This tests region open failed
    */
   @Test (timeout=60000)
@@ -889,7 +958,7 @@ public class TestAssignmentManagerOnCluster {
   /**
    * This tests region close racing with open
    */
-  @Test (timeout=60000)
+  @Test(timeout = 60000)
   public void testOpenCloseRacing() throws Exception {
     String table = "testOpenCloseRacing";
     try {