You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/09/29 05:34:25 UTC
hbase git commit: HBASE-21248 Implement exponential backoff when
retrying for ModifyPeerProcedure
Repository: hbase
Updated Branches:
refs/heads/master ab6ec1f9e -> fdbaa4c3f
HBASE-21248 Implement exponential backoff when retrying for ModifyPeerProcedure
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fdbaa4c3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fdbaa4c3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fdbaa4c3
Branch: refs/heads/master
Commit: fdbaa4c3f0253700d12b6bd61cacf29ca7fbc19f
Parents: ab6ec1f
Author: zhangduo <zh...@apache.org>
Authored: Sat Sep 29 09:55:24 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Sep 29 13:19:36 2018 +0800
----------------------------------------------------------------------
.../master/replication/ModifyPeerProcedure.java | 82 ++++++---
.../apache/hadoop/hbase/ProcedureTestUtil.java | 85 ++++++++++
.../assignment/TestCloseRegionWhileRSCrash.java | 53 +-----
.../TestModifyPeerProcedureRetryBackoff.java | 166 +++++++++++++++++++
4 files changed, 317 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fdbaa4c3/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index ad4df61..7690c96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
-import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -42,7 +42,10 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
/**
* The base class for all replication peer related procedure except sync replication state
@@ -58,6 +61,8 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
// The sleep interval when waiting table to be enabled or disabled.
protected static final int SLEEP_INTERVAL_MS = 1000;
+ private int attemps;
+
protected ModifyPeerProcedure() {
}
@@ -143,7 +148,9 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
- private void reopenRegions(MasterProcedureEnv env) throws IOException {
+ // will be override in test to simulate error
+ @VisibleForTesting
+ protected void reopenRegions(MasterProcedureEnv env) throws IOException {
ReplicationPeerConfig peerConfig = getNewPeerConfig();
ReplicationPeerConfig oldPeerConfig = getOldPeerConfig();
TableStateManager tsm = env.getMasterServices().getTableStateManager();
@@ -165,6 +172,12 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
}
+ // will be override in test to simulate error
+ @VisibleForTesting
+ protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
+ env.getReplicationPeerManager().enablePeer(peerId);
+ }
+
private void addToMap(Map<String, Long> lastSeqIds, String encodedRegionName, long barrier,
ReplicationQueueStorage queueStorage) throws ReplicationException {
if (barrier >= 0) {
@@ -236,8 +249,23 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
}
@Override
+ protected synchronized boolean setTimeoutFailure(MasterProcedureEnv env) {
+ setState(ProcedureProtos.ProcedureState.RUNNABLE);
+ env.getProcedureScheduler().addFront(this);
+ return false;
+ }
+
+ private ProcedureSuspendedException suspend(long backoff) throws ProcedureSuspendedException {
+ attemps++;
+ setTimeout(Math.toIntExact(backoff));
+ setState(ProcedureProtos.ProcedureState.WAITING_TIMEOUT);
+ skipPersistence();
+ throw new ProcedureSuspendedException();
+ }
+
+ @Override
protected Flow executeFromState(MasterProcedureEnv env, PeerModificationState state)
- throws ProcedureSuspendedException, ProcedureYieldException, InterruptedException {
+ throws ProcedureSuspendedException {
switch (state) {
case PRE_PEER_MODIFICATION:
try {
@@ -249,20 +277,24 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
releaseLatch();
return Flow.NO_MORE_STATE;
} catch (ReplicationException e) {
- LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
- peerId, e);
- throw new ProcedureYieldException();
+ long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+ LOG.warn("{} failed to call prePeerModification for peer {}, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e);
+ throw suspend(backoff);
}
+ attemps = 0;
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
return Flow.HAS_MORE_STATE;
case UPDATE_PEER_STORAGE:
try {
updatePeerStorage(env);
} catch (ReplicationException e) {
- LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
- e);
- throw new ProcedureYieldException();
+ long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+ LOG.warn("{} update peer storage for peer {} failed, sleep {} secs", getClass().getName(),
+ peerId, backoff / 1000, e);
+ throw suspend(backoff);
}
+ attemps = 0;
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case REFRESH_PEER_ON_RS:
@@ -273,30 +305,37 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try {
reopenRegions(env);
} catch (Exception e) {
- LOG.warn("{} reopen regions for peer {} failed, retry", getClass().getName(), peerId, e);
- throw new ProcedureYieldException();
+ long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+ LOG.warn("{} reopen regions for peer {} failed, sleep {} secs", getClass().getName(),
+ peerId, backoff / 1000, e);
+ throw suspend(backoff);
}
+ attemps = 0;
setNextState(PeerModificationState.SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID:
try {
updateLastPushedSequenceIdForSerialPeer(env);
} catch (Exception e) {
- LOG.warn("{} set last sequence id for peer {} failed, retry", getClass().getName(),
- peerId, e);
- throw new ProcedureYieldException();
+ long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+ LOG.warn("{} set last sequence id for peer {} failed, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e);
+ throw suspend(backoff);
}
+ attemps = 0;
setNextState(enablePeerBeforeFinish() ? PeerModificationState.SERIAL_PEER_SET_PEER_ENABLED
: PeerModificationState.POST_PEER_MODIFICATION);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_SET_PEER_ENABLED:
try {
- env.getReplicationPeerManager().enablePeer(peerId);
+ enablePeer(env);
} catch (ReplicationException e) {
- LOG.warn("{} enable peer before finish for peer {} failed, retry", getClass().getName(),
- peerId, e);
- throw new ProcedureYieldException();
+ long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+ LOG.warn("{} enable peer before finish for peer {} failed, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e);
+ throw suspend(backoff);
}
+ attemps = 0;
setNextState(PeerModificationState.SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS);
return Flow.HAS_MORE_STATE;
case SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS:
@@ -307,9 +346,10 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
try {
postPeerModification(env);
} catch (ReplicationException e) {
- LOG.warn("{} failed to call postPeerModification for peer {}, retry",
- getClass().getName(), peerId, e);
- throw new ProcedureYieldException();
+ long backoff = ProcedureUtil.getBackoffTimeMs(attemps);
+ LOG.warn("{} failed to call postPeerModification for peer {}, sleep {} secs",
+ getClass().getName(), peerId, backoff / 1000, e);
+ throw suspend(backoff);
} catch (IOException e) {
LOG.warn("{} failed to call post CP hook for peer {}, " +
"ignore since the procedure has already done", getClass().getName(), peerId, e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/fdbaa4c3/hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java
new file mode 100644
index 0000000..ff23d85
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ProcedureTestUtil.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Optional;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
+import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+
+public final class ProcedureTestUtil {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ProcedureTestUtil.class);
+
+ private ProcedureTestUtil() {
+ }
+
+ private static Optional<JsonObject> getProcedure(HBaseTestingUtility util,
+ Class<? extends Procedure<?>> clazz, JsonParser parser) throws IOException {
+ JsonArray array = parser.parse(util.getAdmin().getProcedures()).getAsJsonArray();
+ Iterator<JsonElement> iterator = array.iterator();
+ while (iterator.hasNext()) {
+ JsonElement element = iterator.next();
+ JsonObject obj = element.getAsJsonObject();
+ String className = obj.get("className").getAsString();
+ if (className.equals(clazz.getName())) {
+ return Optional.of(obj);
+ }
+ }
+ return Optional.empty();
+ }
+
+ public static void waitUntilProcedureWaitingTimeout(HBaseTestingUtility util,
+ Class<? extends Procedure<?>> clazz, long timeout) throws IOException {
+ JsonParser parser = new JsonParser();
+ util.waitFor(timeout,
+ () -> getProcedure(util, clazz, parser)
+ .filter(o -> ProcedureState.WAITING_TIMEOUT.name().equals(o.get("state").getAsString()))
+ .isPresent());
+ }
+
+ public static void waitUntilProcedureTimeoutIncrease(HBaseTestingUtility util,
+ Class<? extends Procedure<?>> clazz, int times) throws IOException, InterruptedException {
+ JsonParser parser = new JsonParser();
+ long oldTimeout = 0;
+ int timeoutIncrements = 0;
+ for (;;) {
+ long timeout = getProcedure(util, clazz, parser).filter(o -> o.has("timeout"))
+ .map(o -> o.get("timeout").getAsLong()).orElse(-1L);
+ if (timeout > oldTimeout) {
+ LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
+ timeoutIncrements);
+ oldTimeout = timeout;
+ timeoutIncrements++;
+ if (timeoutIncrements > times) {
+ break;
+ }
+ }
+ Thread.sleep(1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fdbaa4c3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java
index 3573bd6..d34bfbb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/assignment/TestCloseRegionWhileRSCrash.java
@@ -18,10 +18,10 @@
package org.apache.hadoop.hbase.master.assignment;
import java.io.IOException;
-import java.util.Iterator;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ProcedureTestUtil;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
@@ -45,13 +45,6 @@ import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.gson.JsonArray;
-import org.apache.hbase.thirdparty.com.google.gson.JsonElement;
-import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
-import org.apache.hbase.thirdparty.com.google.gson.JsonParser;
/**
* Confirm that we will do backoff when retrying on closing a region, to avoid consuming all the
@@ -64,8 +57,6 @@ public class TestCloseRegionWhileRSCrash {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCloseRegionWhileRSCrash.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestCloseRegionWhileRSCrash.class);
-
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("Backoff");
@@ -189,25 +180,11 @@ public class TestCloseRegionWhileRSCrash {
}
});
t.start();
- JsonParser parser = new JsonParser();
- long oldTimeout = 0;
- int timeoutIncrements = 0;
// wait until we enter the WAITING_TIMEOUT state
- UTIL.waitFor(30000, () -> getTimeout(parser, UTIL.getAdmin().getProcedures()) > 0);
- while (true) {
- long timeout = getTimeout(parser, UTIL.getAdmin().getProcedures());
- if (timeout > oldTimeout) {
- LOG.info("Timeout incremented, was {}, now is {}, increments={}", timeout, oldTimeout,
- timeoutIncrements);
- oldTimeout = timeout;
- timeoutIncrements++;
- if (timeoutIncrements > 3) {
- // If we incremented at least twice, break; the backoff is working.
- break;
- }
- }
- Thread.sleep(1000);
- }
+ ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TransitRegionStateProcedure.class,
+ 30000);
+ // wait until the timeout value increase three times
+ ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TransitRegionStateProcedure.class, 3);
// let's close the connection to make sure that the SCP can not update meta successfully
UTIL.getMiniHBaseCluster().getMaster().getConnection().close();
RESUME.countDown();
@@ -223,24 +200,4 @@ public class TestCloseRegionWhileRSCrash {
table.put(new Put(Bytes.toBytes(1)).addColumn(CF, Bytes.toBytes("cq"), Bytes.toBytes(1)));
}
}
-
- /**
- * @param proceduresAsJSON This is String returned by admin.getProcedures call... an array of
- * Procedures as JSON.
- * @return The Procedure timeout value parsed from the TRSP.
- */
- private long getTimeout(JsonParser parser, String proceduresAsJSON) {
- JsonArray array = parser.parse(proceduresAsJSON).getAsJsonArray();
- Iterator<JsonElement> iterator = array.iterator();
- while (iterator.hasNext()) {
- JsonElement element = iterator.next();
- JsonObject obj = element.getAsJsonObject();
- String className = obj.get("className").getAsString();
- String actualClassName = TransitRegionStateProcedure.class.getName();
- if (className.equals(actualClassName) && obj.has("timeout")) {
- return obj.get("timeout").getAsLong();
- }
- }
- return -1L;
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fdbaa4c3/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java
new file mode 100644
index 0000000..7566d28
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/replication/TestModifyPeerProcedureRetryBackoff.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ProcedureTestUtil;
+import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationState;
+
+@Category({ MasterTests.class, LargeTests.class })
+public class TestModifyPeerProcedureRetryBackoff {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestModifyPeerProcedureRetryBackoff.class);
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static boolean FAIL = true;
+
+ public static class TestModifyPeerProcedure extends ModifyPeerProcedure {
+
+ public TestModifyPeerProcedure() {
+ }
+
+ public TestModifyPeerProcedure(String peerId) {
+ super(peerId);
+ }
+
+ @Override
+ public PeerOperationType getPeerOperationType() {
+ return PeerOperationType.ADD;
+ }
+
+ private void tryFail() throws ReplicationException {
+ synchronized (TestModifyPeerProcedureRetryBackoff.class) {
+ if (FAIL) {
+ throw new ReplicationException("Inject error");
+ }
+ FAIL = true;
+ }
+ }
+
+ @Override
+ protected <T extends Procedure<MasterProcedureEnv>> void addChildProcedure(
+ @SuppressWarnings("unchecked") T... subProcedure) {
+ // Make it a no-op
+ }
+
+ @Override
+ protected PeerModificationState nextStateAfterRefresh() {
+ return PeerModificationState.SERIAL_PEER_REOPEN_REGIONS;
+ }
+
+ @Override
+ protected boolean enablePeerBeforeFinish() {
+ return true;
+ }
+
+ @Override
+ protected void updateLastPushedSequenceIdForSerialPeer(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ tryFail();
+ }
+
+ @Override
+ protected void reopenRegions(MasterProcedureEnv env) throws IOException {
+ try {
+ tryFail();
+ } catch (ReplicationException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ protected void enablePeer(MasterProcedureEnv env) throws ReplicationException {
+ tryFail();
+ }
+
+ @Override
+ protected void prePeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ tryFail();
+ }
+
+ @Override
+ protected void updatePeerStorage(MasterProcedureEnv env) throws ReplicationException {
+ tryFail();
+ }
+
+ @Override
+ protected void postPeerModification(MasterProcedureEnv env)
+ throws IOException, ReplicationException {
+ tryFail();
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ private void assertBackoffIncrease() throws IOException, InterruptedException {
+ ProcedureTestUtil.waitUntilProcedureWaitingTimeout(UTIL, TestModifyPeerProcedure.class, 30000);
+ ProcedureTestUtil.waitUntilProcedureTimeoutIncrease(UTIL, TestModifyPeerProcedure.class, 2);
+ synchronized (TestModifyPeerProcedureRetryBackoff.class) {
+ FAIL = false;
+ }
+ UTIL.waitFor(30000, () -> FAIL);
+ }
+
+ @Test
+ public void test() throws IOException, InterruptedException {
+ ProcedureExecutor<MasterProcedureEnv> procExec =
+ UTIL.getMiniHBaseCluster().getMaster().getMasterProcedureExecutor();
+ long procId = procExec.submitProcedure(new TestModifyPeerProcedure("1"));
+ // PRE_PEER_MODIFICATION
+ assertBackoffIncrease();
+ // UPDATE_PEER_STORAGE
+ assertBackoffIncrease();
+ // No retry for REFRESH_PEER_ON_RS
+ // SERIAL_PEER_REOPEN_REGIONS
+ assertBackoffIncrease();
+ // SERIAL_PEER_UPDATE_LAST_PUSHED_SEQ_ID
+ assertBackoffIncrease();
+ // SERIAL_PEER_SET_PEER_ENABLED
+ assertBackoffIncrease();
+ // No retry for SERIAL_PEER_ENABLE_PEER_REFRESH_PEER_ON_RS
+ // POST_PEER_MODIFICATION
+ assertBackoffIncrease();
+ UTIL.waitFor(30000, () -> procExec.isFinished(procId));
+ }
+}