You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/02/03 04:35:27 UTC
[hive] branch master updated: HIVE-26963: Unset repl.faliover.endpoint during second cycle of optimized bootstrap (#4006) (Rakshith Chandraiah, reviewed by Teddy Choi)
This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a8151681965 HIVE-26963: Unset repl.faliover.endpoint during second cycle of optimized bootstrap (#4006) (Rakshith Chandraiah, reviewed by Teddy Choi)
a8151681965 is described below
commit a8151681965ceab430b3d778ad996dd0af560934
Author: Rakshith C <56...@users.noreply.github.com>
AuthorDate: Fri Feb 3 10:04:56 2023 +0530
HIVE-26963: Unset repl.faliover.endpoint during second cycle of optimized bootstrap (#4006) (Rakshith Chandraiah, reviewed by Teddy Choi)
---
.../parse/TestReplicationOptimisedBootstrap.java | 63 ++++++++++++++++++++++
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 5 +-
.../hadoop/hive/ql/exec/repl/ReplLoadTask.java | 4 ++
3 files changed, 71 insertions(+), 1 deletion(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
index 182cb966dfc..4959bacf5ad 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOptimisedBootstrap.java
@@ -66,6 +66,7 @@ import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_DONT_SET;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.QUOTA_RESET;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_ENABLE_BACKGROUND_THREAD;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_TARGET_DB_PROPERTY;
+import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_FAILOVER_ENDPOINT;
import static org.apache.hadoop.hive.common.repl.ReplConst.TARGET_OF_REPLICATION;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
import static org.apache.hadoop.hive.ql.exec.repl.OptimisedBootstrapUtils.EVENT_ACK_FILE;
@@ -1330,4 +1331,66 @@ public class TestReplicationOptimisedBootstrap extends BaseReplicationScenariosA
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
}
+ @Test
+ public void testOptimizedBootstrapWithControlledFailover() throws Throwable {
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id string)")
+ .run("insert into table t1 values ('A')")
+ .dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName);
+
+ primary.dump(primaryDbName);
+ replica.load(replicatedDbName, primaryDbName);
+ //initiate a controlled failover from primary to replica.
+ List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+ primary.dump(primaryDbName, failoverConfigs);
+ replica.load(replicatedDbName, primaryDbName, failoverConfigs);
+
+ primary.run("use " + primaryDbName)
+ .run("create table t3 (id int)")
+ .run("insert into t3 values(1),(2),(3)")
+ .run("insert into t1 values('B')"); //modify primary after failover.
+
+ // initiate first cycle of optimized bootstrap
+ WarehouseInstance.Tuple reverseDump = replica.run("use " + replicatedDbName)
+ .run("create table t2 (col int)")
+ .run("insert into t2 values(1),(2)")
+ .dump(replicatedDbName);
+
+ FileSystem fs = new Path(reverseDump.dumpLocation).getFileSystem(conf);
+ assertTrue(fs.exists(new Path(reverseDump.dumpLocation, EVENT_ACK_FILE)));
+
+ primary.load(primaryDbName, replicatedDbName);
+
+ assertEquals(MetaStoreUtils.FailoverEndpoint.SOURCE.toString(),
+ primary.getDatabase(primaryDbName).getParameters().get(REPL_FAILOVER_ENDPOINT));
+
+ assertEquals(MetaStoreUtils.FailoverEndpoint.TARGET.toString(),
+ replica.getDatabase(replicatedDbName).getParameters().get(REPL_FAILOVER_ENDPOINT));
+
+ assertTrue(fs.exists(new Path(reverseDump.dumpLocation, TABLE_DIFF_COMPLETE_DIRECTORY)));
+ HashSet<String> tableDiffEntries = getTablesFromTableDiffFile(new Path(reverseDump.dumpLocation), conf);
+ assertTrue(!tableDiffEntries.isEmpty());
+
+ assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
+ MetaStoreUtils.FailoverEndpoint.SOURCE));
+ assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
+ MetaStoreUtils.FailoverEndpoint.TARGET));
+
+ // second cycle of optimized bootstrap
+ reverseDump = replica.dump(replicatedDbName);
+ assertTrue(fs.exists(new Path(reverseDump.dumpLocation, OptimisedBootstrapUtils.BOOTSTRAP_TABLES_LIST)));
+
+ primary.load(primaryDbName, replicatedDbName);
+ //ensure optimized bootstrap was successful
+ primary.run(String.format("select * from %s.t1", primaryDbName))
+ .verifyResults(new String[]{"A"})
+ .run(String.format("select * from %s.t2", primaryDbName))
+ .verifyResults(new String[]{"1", "2"})
+ .run("show tables in " + primaryDbName)
+ .verifyResults(new String[]{"t1", "t2"});
+
+ assertFalse(primary.getDatabase(primaryDbName).getParameters().containsKey(REPL_FAILOVER_ENDPOINT));
+ assertFalse(replica.getDatabase(replicatedDbName).getParameters().containsKey(REPL_FAILOVER_ENDPOINT));
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index aa02b05f618..a2b1a900ae9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -493,7 +493,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
dbParams.remove(REPL_TARGET_DB_PROPERTY);
dbParams.remove(ReplConst.REPL_ENABLE_BACKGROUND_THREAD);
dbParams.remove(REPL_RESUME_STARTED_AFTER_FAILOVER);
-
+ if (!isFailoverInProgress) {
+ // if we have failover endpoint from controlled failover remove it.
+ dbParams.remove(ReplConst.REPL_FAILOVER_ENDPOINT);
+ }
database.setParameters(dbParams);
LOG.info("Removing {} property from the database {} after successful optimised bootstrap dump", String.join(",",
new String[] { TARGET_OF_REPLICATION, CURR_STATE_ID_TARGET.toString(), CURR_STATE_ID_SOURCE.toString(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 3e7961e9a68..83fda3a22b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -102,6 +102,7 @@ import java.util.Set;
import static org.apache.hadoop.hive.ql.hooks.EnforceReadOnlyDatabaseHook.READONLY;
import static org.apache.hadoop.hive.common.repl.ReplConst.READ_ONLY_HOOK;
import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_RESUME_STARTED_AFTER_FAILOVER;
+import static org.apache.hadoop.hive.common.repl.ReplConst.REPL_FAILOVER_ENDPOINT;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_SNAPSHOT_DIFF_FOR_EXTERNAL_TABLE_COPY;
import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
@@ -643,6 +644,9 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
LOG.debug("Database {} properties before removal {}", work.getTargetDatabase(), params);
params.remove(REPL_RESUME_STARTED_AFTER_FAILOVER);
params.remove(SOURCE_OF_REPLICATION);
+ if (!work.shouldFailover()) {
+ params.remove(REPL_FAILOVER_ENDPOINT);
+ }
db.setParameters(params);
LOG.info("Removed {} property from database {} after successful optimised bootstrap load.",
SOURCE_OF_REPLICATION, work.getTargetDatabase());