You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/05/31 21:41:32 UTC
[hbase] branch branch-1 updated: HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed … (#1755)
This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 3c13884 HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed … (#1755)
3c13884 is described below
commit 3c138845d935b7183d02f1312cf8a1e5ce690572
Author: sguggilam <sa...@gmail.com>
AuthorDate: Sun May 31 14:41:18 2020 -0700
HBASE-24069 Provide an ExponentialBackOffPolicy sleep between failed … (#1755)
HBASE-24069: Provide an ExponentialBackOffPolicy sleep between failed region close requests
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../hadoop/hbase/master/AssignmentManager.java | 194 +++++++++++----------
.../master/TestAssignmentManagerOnCluster.java | 77 +++++++-
2 files changed, 175 insertions(+), 96 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
index 5386f6e..78294a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
@@ -37,8 +37,8 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -95,7 +95,6 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
-import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -1985,7 +1984,7 @@ public class AssignmentManager extends ZooKeeperListener {
// ClosedRegionhandler can remove the server from this.regions
if (!serverManager.isServerOnline(server)) {
LOG.debug("Offline " + region.getRegionNameAsString()
- + ", no need to unassign since it's on a dead server: " + server);
+ + ", no need to unassign since it's on a dead server: " + server);
if (transitionInZK) {
// delete the node. if no node exists need not bother.
deleteClosingOrClosedNode(region, server);
@@ -1995,42 +1994,39 @@ public class AssignmentManager extends ZooKeeperListener {
}
return;
}
+ long sleepTime = 0;
try {
// Send CLOSE RPC
- if (serverManager.sendRegionClose(server, region,
- versionOfClosingNode, dest, transitionInZK)) {
- LOG.debug("Sent CLOSE to " + server + " for region " +
- region.getRegionNameAsString());
+ if (serverManager.sendRegionClose(server, region, versionOfClosingNode, dest,
+ transitionInZK)) {
+ LOG.debug("Sent CLOSE to " + server + " for region " + region.getRegionNameAsString());
if (useZKForAssignment && !transitionInZK && state != null) {
// Retry to make sure the region is
// closed so as to avoid double assignment.
- unassign(region, state, versionOfClosingNode,
- dest, transitionInZK, src);
+ unassign(region, state, versionOfClosingNode, dest, transitionInZK, src);
}
return;
}
// This never happens. Currently regionserver close always return true.
// Todo; this can now happen (0.96) if there is an exception in a coprocessor
- LOG.warn("Server " + server + " region CLOSE RPC returned false for " +
- region.getRegionNameAsString());
+ LOG.warn("Server " + server + " region CLOSE RPC returned false for "
+ + region.getRegionNameAsString());
} catch (Throwable t) {
- long sleepTime = 0;
Configuration conf = this.server.getConfiguration();
if (t instanceof RemoteException) {
- t = ((RemoteException)t).unwrapRemoteException();
+ t = ((RemoteException) t).unwrapRemoteException();
}
boolean logRetries = true;
- if (t instanceof RegionServerAbortedException
- || t instanceof RegionServerStoppedException
+ if (t instanceof RegionServerStoppedException
|| t instanceof ServerNotRunningYetException) {
// RS is aborting or stopping, we cannot offline the region since the region may need
- // to do WAL recovery. Until we see the RS expiration, we should retry.
+ // to do WAL recovery. Until we see the RS expiration, we should retry.
sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
} else if (t instanceof NotServingRegionException) {
- LOG.debug("Offline " + region.getRegionNameAsString()
- + ", it's not any more on " + server, t);
+ LOG.debug(
+ "Offline " + region.getRegionNameAsString() + ", it's not any more on " + server, t);
if (transitionInZK) {
deleteClosingOrClosedNode(region, server);
}
@@ -2038,25 +2034,24 @@ public class AssignmentManager extends ZooKeeperListener {
regionOffline(region);
}
return;
- } else if ((t instanceof FailedServerException) || (state != null &&
- t instanceof RegionAlreadyInTransitionException)) {
+ } else if ((t instanceof FailedServerException)
+ || (state != null && t instanceof RegionAlreadyInTransitionException)) {
if (t instanceof FailedServerException) {
sleepTime = 1L + conf.getInt(RpcClient.FAILED_SERVER_EXPIRY_KEY,
- RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
+ RpcClient.FAILED_SERVER_EXPIRY_DEFAULT);
} else {
// RS is already processing this region, only need to update the timestamp
LOG.debug("update " + state + " the timestamp.");
state.updateTimestampToNow();
if (maxWaitTime < 0) {
- maxWaitTime =
- EnvironmentEdgeManager.currentTime()
- + conf.getLong(ALREADY_IN_TRANSITION_WAITTIME,
- DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
+ maxWaitTime = EnvironmentEdgeManager.currentTime() + conf.getLong(
+ ALREADY_IN_TRANSITION_WAITTIME, DEFAULT_ALREADY_IN_TRANSITION_WAITTIME);
}
long now = EnvironmentEdgeManager.currentTime();
if (now < maxWaitTime) {
- LOG.debug("Region is already in transition; "
- + "waiting up to " + (maxWaitTime - now) + "ms", t);
+ LOG.debug("Region is already in transition; " + "waiting up to "
+ + (maxWaitTime - now) + "ms",
+ t);
sleepTime = 100;
i--; // reset the try count
logRetries = false;
@@ -2064,28 +2059,40 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
- try {
- if (sleepTime > 0) {
- Thread.sleep(sleepTime);
- }
- } catch (InterruptedException ie) {
- LOG.warn("Failed to unassign "
- + region.getRegionNameAsString() + " since interrupted", ie);
- Thread.currentThread().interrupt();
- if (state != null) {
- regionStates.updateRegionState(region, State.FAILED_CLOSE);
- }
- return;
- }
-
if (logRetries) {
- LOG.info("Server " + server + " returned " + t + " for "
- + region.getRegionNameAsString() + ", try=" + i
- + " of " + this.maximumAttempts, t);
+ LOG.info("Server " + server + " returned " + t + " for " + region.getRegionNameAsString()
+ + ", try=" + i + " of " + this.maximumAttempts,
+ t);
// Presume retry or server will expire.
}
}
+ // sleepTime is set in one of the following cases (reasons commented above):
+ // 1. Region server stopping or aborting
+ // 2. Region already in transition
+ // 3. Connecting to server that is already dead
+ //
+ // If sleepTime is not set by any of the cases, set it to sleep for
+ // configured exponential backoff time
+ if (sleepTime == 0 && i != maximumAttempts) {
+ sleepTime = backoffPolicy.getBackoffTime(retryConfig, i);
+ LOG.info("Waiting for " + sleepTime + "milliseconds exponential backoff time for "
+ + region.getRegionNameAsString() + " before next retry " + (i + 1) + " of "
+ + this.maximumAttempts);
+ }
+ try {
+ if (sleepTime > 0 && i != maximumAttempts) {
+ Thread.sleep(sleepTime);
+ }
+ } catch (InterruptedException ie) {
+ LOG.warn("Failed to unassign " + region.getRegionNameAsString() + " since interrupted", ie);
+ if (state != null) {
+ regionStates.updateRegionState(region, State.FAILED_CLOSE);
+ }
+ Thread.currentThread().interrupt();
+ return;
+ }
}
+
// Run out of attempts
if (state != null) {
regionStates.updateRegionState(region, State.FAILED_CLOSE);
@@ -2108,54 +2115,57 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.debug("Force region state offline " + state);
}
- switch (state.getState()) {
- case OPEN:
- case OPENING:
- case PENDING_OPEN:
- case CLOSING:
- case PENDING_CLOSE:
- if (!forceNewPlan) {
- LOG.debug("Skip assigning " +
- region + ", it is already " + state);
- return null;
- }
- case FAILED_CLOSE:
- case FAILED_OPEN:
- unassign(region, state, -1, null, false, null);
- state = regionStates.getRegionState(region);
- if (state.isFailedClose()) {
- // If we can't close the region, we can't re-assign
- // it so as to avoid possible double assignment/data loss.
- LOG.info("Skip assigning " +
- region + ", we couldn't close it: " + state);
- return null;
- }
- case OFFLINE:
- // This region could have been open on this server
- // for a while. If the server is dead and not processed
- // yet, we can move on only if the meta shows the
- // region is not on this server actually, or on a server
- // not dead, or dead and processed already.
- // In case not using ZK, we don't need this check because
- // we have the latest info in memory, and the caller
- // will do another round checking any way.
- if (useZKForAssignment
- && regionStates.isServerDeadAndNotProcessed(sn)
- && wasRegionOnDeadServerByMeta(region, sn)) {
- if (!regionStates.isRegionInTransition(region)) {
- LOG.info("Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
- regionStates.updateRegionState(region, State.OFFLINE);
- }
- LOG.info("Skip assigning " + region.getRegionNameAsString()
- + ", it is on a dead but not processed yet server: " + sn);
- return null;
+ // We need a lock on the region as we could update it
+ Lock lock = locker.acquireLock(region.getEncodedName());
+ try {
+ switch (state.getState()) {
+ case OPEN:
+ case OPENING:
+ case PENDING_OPEN:
+ case CLOSING:
+ case PENDING_CLOSE:
+ if (!forceNewPlan) {
+ LOG.debug("Skip assigning " + region + ", it is already " + state);
+ return null;
+ }
+ case FAILED_CLOSE:
+ case FAILED_OPEN:
+ unassign(region, state, -1, null, false, null);
+ state = regionStates.getRegionState(region);
+ if (state.isFailedClose()) {
+ // If we can't close the region, we can't re-assign
+ // it so as to avoid possible double assignment/data loss.
+ LOG.info("Skip assigning " + region + ", we couldn't close it: " + state);
+ return null;
+ }
+ case OFFLINE:
+ // This region could have been open on this server
+ // for a while. If the server is dead and not processed
+ // yet, we can move on only if the meta shows the
+ // region is not on this server actually, or on a server
+ // not dead, or dead and processed already.
+ // In case not using ZK, we don't need this check because
+ // we have the latest info in memory, and the caller
+ // will do another round checking any way.
+ if (useZKForAssignment && regionStates.isServerDeadAndNotProcessed(sn)
+ && wasRegionOnDeadServerByMeta(region, sn)) {
+ if (!regionStates.isRegionInTransition(region)) {
+ LOG.info(
+ "Updating the state to " + State.OFFLINE + " to allow to be reassigned by SSH");
+ regionStates.updateRegionState(region, State.OFFLINE);
+ }
+ LOG.info("Skip assigning " + region.getRegionNameAsString()
+ + ", it is on a dead but not processed yet server: " + sn);
+ return null;
+ }
+ case CLOSED:
+ break;
+ default:
+ LOG.error("Trying to assign region " + region + ", which is " + state);
+ return null;
}
- case CLOSED:
- break;
- default:
- LOG.error("Trying to assign region " + region
- + ", which is " + state);
- return null;
+ } finally {
+ lock.unlock();
}
return state;
}
@@ -3623,7 +3633,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (failedOpenTracker.containsKey(regionInfo.getEncodedName())) {
// Sleep before reassigning if this region has failed to open before
long sleepTime = backoffPolicy.getBackoffTime(retryConfig,
- getFailedAttempts(regionInfo.getEncodedName()));
+ getFailedAttempts(regionInfo.getEncodedName()));
invokeAssignLater(regionInfo, forceNewPlan, sleepTime);
} else {
// Immediately reassign if this region has never failed an open before
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
index 69dfa40..28f9e83 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestAssignmentManagerOnCluster.java
@@ -32,11 +32,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
@@ -72,11 +71,13 @@ import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer;
import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConfigUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.TestTableName;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
@@ -84,9 +85,13 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
/**
* This tests AssignmentManager with a testing cluster.
@@ -98,6 +103,8 @@ public class TestAssignmentManagerOnCluster {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
final static Configuration conf = TEST_UTIL.getConfiguration();
private static HBaseAdmin admin;
+ @Rule
+ public TestTableName testTableName = new TestTableName();
static void setupOnce() throws Exception {
// Using the our load balancer to control region plans
@@ -599,6 +606,68 @@ public class TestAssignmentManagerOnCluster {
}
/**
+ * This tests region close with exponential backoff
+ */
+ @Test(timeout = 60000)
+ public void testCloseRegionWithExponentialBackOff() throws Exception {
+ TableName tableName = testTableName.getTableName();
+ // Set the backoff time between each retry for failed close
+ TEST_UTIL.getMiniHBaseCluster().getConf().setLong("hbase.assignment.retry.sleep.initial", 1000);
+ HMaster activeMaster = TEST_UTIL.getHBaseCluster().getMaster();
+ TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName());
+ TEST_UTIL.getMiniHBaseCluster().startMaster(); // restart the master for conf take into affect
+
+ try {
+ ScheduledThreadPoolExecutor scheduledThreadPoolExecutor =
+ new ScheduledThreadPoolExecutor(1, Threads.newDaemonThreadFactory("ExponentialBackOff"));
+
+ HTableDescriptor desc = new HTableDescriptor(tableName);
+ desc.addFamily(new HColumnDescriptor(FAMILY));
+ admin.createTable(desc);
+
+ Table meta = new HTable(conf, TableName.META_TABLE_NAME);
+ HRegionInfo hri =
+ new HRegionInfo(desc.getTableName(), Bytes.toBytes("A"), Bytes.toBytes("Z"));
+ MetaTableAccessor.addRegionToMeta(meta, hri);
+
+ HMaster master = TEST_UTIL.getHBaseCluster().getMaster();
+ AssignmentManager am = master.getAssignmentManager();
+ assertTrue(TEST_UTIL.assignRegion(hri));
+ ServerName sn = am.getRegionStates().getRegionServerOfRegion(hri);
+ TEST_UTIL.assertRegionOnServer(hri, sn, 6000);
+
+ MyRegionObserver.preCloseEnabled.set(true);
+ // Unset the precloseEnabled flag after 1 second for next retry to succeed
+ scheduledThreadPoolExecutor.schedule(new Runnable() {
+ @Override
+ public void run() {
+ MyRegionObserver.preCloseEnabled.set(false);
+ }
+ }, 1000, TimeUnit.MILLISECONDS);
+ am.unassign(hri);
+
+ // region may still be assigned now since it's closing,
+ // let's check if it's assigned after it's out of transition
+ am.waitOnRegionToClearRegionsInTransition(hri);
+
+ // region should be closed and re-assigned
+ assertTrue(am.waitForAssignment(hri));
+ ServerName serverName =
+ master.getAssignmentManager().getRegionStates().getRegionServerOfRegion(hri);
+ TEST_UTIL.assertRegionOnServer(hri, serverName, 6000);
+ } finally {
+ MyRegionObserver.preCloseEnabled.set(false);
+ TEST_UTIL.deleteTable(tableName);
+
+ // reset the backoff time to default
+ TEST_UTIL.getMiniHBaseCluster().getConf().unset("hbase.assignment.retry.sleep.initial");
+ activeMaster = TEST_UTIL.getMiniHBaseCluster().getMaster();
+ TEST_UTIL.getMiniHBaseCluster().stopMaster(activeMaster.getServerName());
+ TEST_UTIL.getMiniHBaseCluster().startMaster();
+ }
+ }
+
+ /**
* This tests region open failed
*/
@Test (timeout=60000)
@@ -889,7 +958,7 @@ public class TestAssignmentManagerOnCluster {
/**
* This tests region close racing with open
*/
- @Test (timeout=60000)
+ @Test(timeout = 60000)
public void testOpenCloseRacing() throws Exception {
String table = "testOpenCloseRacing";
try {