You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/05/15 14:54:12 UTC
[hbase] 12/16: HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 650350a74919d60c0d5b701c320fbfc8f85b5302
Author: Liangjun He <he...@apache.org>
AuthorDate: Wed Apr 5 23:37:04 2023 +0800
HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../regionserver/ReplicationSyncUp.java | 46 ++++++++++++++++++++--
.../replication/TestReplicationSyncUpTool.java | 36 +++++++++++++++++
.../replication/TestReplicationSyncUpToolBase.java | 7 +++-
3 files changed, 84 insertions(+), 5 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index f071cf6f1f8..cd6a4d9ac4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
@@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
}
}
- private void writeInfoFile(FileSystem fs) throws IOException {
+ private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException {
// Record the info of this run. Currently only record the time we run the job. We will use this
// timestamp to clean up the data for last sequence ids and hfile refs in replication queue
// storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
@@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured implements Tool {
new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
String json = JsonMapper.writeObjectAsString(info);
Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
- try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
+ try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) {
out.write(Bytes.toBytes(json));
}
}
+ private static boolean parseOpts(String args[]) {
+ LinkedList<String> argv = new LinkedList<>();
+ argv.addAll(Arrays.asList(args));
+ String cmd = null;
+ while ((cmd = argv.poll()) != null) {
+ if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
+ printUsageAndExit(null, 0);
+ }
+ if (cmd.equals("-f")) {
+ return true;
+ }
+ if (!argv.isEmpty()) {
+ printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
+ }
+ }
+ return false;
+ }
+
+ private static void printUsageAndExit(final String message, final int exitCode) {
+ printUsage(message);
+ System.exit(exitCode);
+ }
+
+ private static void printUsage(final String message) {
+ if (message != null && message.length() > 0) {
+ System.err.println(message);
+ }
+ System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\");
+ System.err.println(" <OPTIONS> [-D<property=value>]*");
+ System.err.println();
+ System.err.println("General Options:");
+ System.err.println(" -h|--h|--help Show this help and exit.");
+ System.err
+ .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. "
+ + "See HBASE-27623 for details.");
+ }
+
@Override
public int run(String[] args) throws Exception {
Abortable abortable = new Abortable() {
@@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
return abort;
}
};
+ boolean isForce = parseOpts(args);
Configuration conf = getConf();
try (ZKWatcher zkw = new ZKWatcher(conf,
"syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
@@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
System.out.println("Start Replication Server");
- writeInfoFile(fs);
+ writeInfoFile(fs, isForce);
Replication replication = new Replication();
// use offline table replication queue storage
getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 38225613b9d..66de933832b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -27,6 +27,8 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -300,4 +302,38 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
rowCountHt2TargetAtPeer1);
}
+
+ /**
+ * test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details.
+ */
+ @Test
+ public void testStartANewSyncUpToolAfterFailed() throws Exception {
+ // Start syncUpTool for the first time with non-force mode,
+ // let's assume that this will fail in sync data,
+ // this does not affect our test results
+ syncUp(UTIL1);
+ Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
+ Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
+ Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE);
+ FileSystem fs = UTIL1.getTestFileSystem();
+ assertTrue(fs.exists(replicationInfoPath));
+ FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath);
+
+ // Start syncUpTool for the second time with non-force mode,
+ // startup will fail because replication info file already exists
+ try {
+ syncUp(UTIL1);
+ } catch (Exception e) {
+ assertTrue("e should be a FileAlreadyExistsException",
+ (e instanceof FileAlreadyExistsException));
+ }
+ FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath);
+ assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime());
+
+ // Start syncUpTool for the third time with force mode,
+ // startup will success and create a new replication info file
+ syncUp(UTIL1, new String[] { "-f" });
+ FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath);
+ assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime());
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
index 8a28db3b185..44258241058 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
@@ -136,8 +136,11 @@ public abstract class TestReplicationSyncUpToolBase {
}
final void syncUp(HBaseTestingUtil util) throws Exception {
- ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
- new String[0]);
+ syncUp(util, new String[0]);
+ }
+
+ final void syncUp(HBaseTestingUtil util, String[] args) throws Exception {
+ ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args);
}
// Utilities that manager shutdown / restart of source / sink clusters. They take care of