You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/05/15 14:54:12 UTC

[hbase] 12/16: HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 650350a74919d60c0d5b701c320fbfc8f85b5302
Author: Liangjun He <he...@apache.org>
AuthorDate: Wed Apr 5 23:37:04 2023 +0800

    HBASE-27623 Start a new ReplicationSyncUp after the previous failed (#5150)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../regionserver/ReplicationSyncUp.java            | 46 ++++++++++++++++++++--
 .../replication/TestReplicationSyncUpTool.java     | 36 +++++++++++++++++
 .../replication/TestReplicationSyncUpToolBase.java |  7 +++-
 3 files changed, 84 insertions(+), 5 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
index f071cf6f1f8..cd6a4d9ac4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSyncUp.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import org.apache.hadoop.conf.Configuration;
@@ -182,7 +184,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
     }
   }
 
-  private void writeInfoFile(FileSystem fs) throws IOException {
+  private void writeInfoFile(FileSystem fs, boolean isForce) throws IOException {
     // Record the info of this run. Currently only record the time we run the job. We will use this
     // timestamp to clean up the data for last sequence ids and hfile refs in replication queue
     // storage. See ReplicationQueueStorage.removeLastSequenceIdsAndHFileRefsBefore.
@@ -190,11 +192,48 @@ public class ReplicationSyncUp extends Configured implements Tool {
       new ReplicationSyncUpToolInfo(EnvironmentEdgeManager.currentTime());
     String json = JsonMapper.writeObjectAsString(info);
     Path infoDir = new Path(CommonFSUtils.getRootDir(getConf()), INFO_DIR);
-    try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), false)) {
+    try (FSDataOutputStream out = fs.create(new Path(infoDir, INFO_FILE), isForce)) {
       out.write(Bytes.toBytes(json));
     }
   }
 
+  private static boolean parseOpts(String args[]) {
+    LinkedList<String> argv = new LinkedList<>();
+    argv.addAll(Arrays.asList(args));
+    String cmd = null;
+    while ((cmd = argv.poll()) != null) {
+      if (cmd.equals("-h") || cmd.equals("--h") || cmd.equals("--help")) {
+        printUsageAndExit(null, 0);
+      }
+      if (cmd.equals("-f")) {
+        return true;
+      }
+      if (!argv.isEmpty()) {
+        printUsageAndExit("ERROR: Unrecognized option/command: " + cmd, -1);
+      }
+    }
+    return false;
+  }
+
+  private static void printUsageAndExit(final String message, final int exitCode) {
+    printUsage(message);
+    System.exit(exitCode);
+  }
+
+  private static void printUsage(final String message) {
+    if (message != null && message.length() > 0) {
+      System.err.println(message);
+    }
+    System.err.println("Usage: hbase " + ReplicationSyncUp.class.getName() + " \\");
+    System.err.println("  <OPTIONS> [-D<property=value>]*");
+    System.err.println();
+    System.err.println("General Options:");
+    System.err.println(" -h|--h|--help  Show this help and exit.");
+    System.err
+      .println(" -f Start a new ReplicationSyncUp after the previous ReplicationSyncUp failed. "
+        + "See HBASE-27623 for details.");
+  }
+
   @Override
   public int run(String[] args) throws Exception {
     Abortable abortable = new Abortable() {
@@ -217,6 +256,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
         return abort;
       }
     };
+    boolean isForce = parseOpts(args);
     Configuration conf = getConf();
     try (ZKWatcher zkw = new ZKWatcher(conf,
       "syncupReplication" + EnvironmentEdgeManager.currentTime(), abortable, true)) {
@@ -226,7 +266,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
       Path logDir = new Path(walRootDir, HConstants.HREGION_LOGDIR_NAME);
 
       System.out.println("Start Replication Server");
-      writeInfoFile(fs);
+      writeInfoFile(fs, isForce);
       Replication replication = new Replication();
       // use offline table replication queue storage
       getConf().setClass(ReplicationStorageFactory.REPLICATION_QUEUE_IMPL,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 38225613b9d..66de933832b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -27,6 +27,8 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -300,4 +302,38 @@ public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
     assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
       rowCountHt2TargetAtPeer1);
   }
+
+  /**
+   * test "start a new ReplicationSyncUp after the previous failed". See HBASE-27623 for details.
+   */
+  @Test
+  public void testStartANewSyncUpToolAfterFailed() throws Exception {
+    // Start syncUpTool for the first time with non-force mode,
+    // let's assume that this will fail in sync data,
+    // this does not affect our test results
+    syncUp(UTIL1);
+    Path rootDir = CommonFSUtils.getRootDir(UTIL1.getConfiguration());
+    Path syncUpInfoDir = new Path(rootDir, ReplicationSyncUp.INFO_DIR);
+    Path replicationInfoPath = new Path(syncUpInfoDir, ReplicationSyncUp.INFO_FILE);
+    FileSystem fs = UTIL1.getTestFileSystem();
+    assertTrue(fs.exists(replicationInfoPath));
+    FileStatus fileStatus1 = fs.getFileStatus(replicationInfoPath);
+
+    // Start syncUpTool for the second time with non-force mode,
+    // startup will fail because replication info file already exists
+    try {
+      syncUp(UTIL1);
+    } catch (Exception e) {
+      assertTrue("e should be a FileAlreadyExistsException",
+        (e instanceof FileAlreadyExistsException));
+    }
+    FileStatus fileStatus2 = fs.getFileStatus(replicationInfoPath);
+    assertEquals(fileStatus1.getModificationTime(), fileStatus2.getModificationTime());
+
+    // Start syncUpTool for the third time with force mode,
+    // startup will success and create a new replication info file
+    syncUp(UTIL1, new String[] { "-f" });
+    FileStatus fileStatus3 = fs.getFileStatus(replicationInfoPath);
+    assertTrue(fileStatus3.getModificationTime() > fileStatus2.getModificationTime());
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
index 8a28db3b185..44258241058 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
@@ -136,8 +136,11 @@ public abstract class TestReplicationSyncUpToolBase {
   }
 
   final void syncUp(HBaseTestingUtil util) throws Exception {
-    ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(),
-      new String[0]);
+    syncUp(util, new String[0]);
+  }
+
+  final void syncUp(HBaseTestingUtil util, String[] args) throws Exception {
+    ToolRunner.run(new Configuration(util.getConfiguration()), new ReplicationSyncUp(), args);
   }
 
   // Utilities that manager shutdown / restart of source / sink clusters. They take care of