You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/01/18 04:35:58 UTC
[hive] branch master updated: HIVE-26597: Fix unsetting of db property repl.target.for in ReplicationSemanticAnalyzer (Rakshith Chandraiah, reviewed by Teddy Choi)
This is an automated email from the ASF dual-hosted git repository.
tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c92a478e514 HIVE-26597: Fix unsetting of db property repl.target.for in ReplicationSemanticAnalyzer (Rakshith Chandraiah, reviewed by Teddy Choi)
c92a478e514 is described below
commit c92a478e514a28a53009fe5fbf08ce6fa35b58b9
Author: Rakshith C <56...@users.noreply.github.com>
AuthorDate: Wed Jan 18 10:05:44 2023 +0530
HIVE-26597: Fix unsetting of db property repl.target.for in ReplicationSemanticAnalyzer (Rakshith Chandraiah, reviewed by Teddy Choi)
---
.../parse/TestReplicationScenariosAcidTables.java | 65 +++++++++++++---------
.../hive/ql/parse/ReplicationSemanticAnalyzer.java | 20 ++++---
2 files changed, 49 insertions(+), 36 deletions(-)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 13bb9ad1c64..4c82a42badc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -383,10 +383,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
MetaStoreUtils.FailoverEndpoint.SOURCE));
- primary.run("drop database if exists " + primaryDbName + " cascade");
-
- assertTrue(primary.getDatabase(primaryDbName) == null);
-
assertFalse(ReplChangeManager.isSourceOfReplication(replica.getDatabase(replicatedDbName)));
WarehouseInstance.Tuple reverseDumpData = replica.run("create table t3 (id int)")
@@ -398,11 +394,15 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
- assertFalse(MetaStoreUtils.isTargetOfReplication(db));
+ assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+ //do a second reverse dump.
+ primary.load(primaryDbName, replicatedDbName);
+ reverseDumpData = replica.dump(replicatedDbName);
+ dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
primary.load(primaryDbName, replicatedDbName)
.run("use " + primaryDbName)
@@ -419,7 +419,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
Database primaryDb = primary.getDatabase(primaryDbName);
assertFalse(primaryDb == null);
- assertTrue(ReplUtils.isFirstIncPending(primaryDb.getParameters()));
+ assertFalse(ReplUtils.isFirstIncPending(primaryDb.getParameters()));
assertTrue(MetaStoreUtils.isTargetOfReplication(primaryDb));
assertFalse(MetaStoreUtils.isDbBeingFailedOver(primaryDb));
assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
@@ -531,7 +531,9 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
"'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
"'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
List<String> retainPrevDumpDir = Arrays.asList("'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
- "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
+ "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'",
+ "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='true'"
+ );
WarehouseInstance.Tuple dumpData = primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
@@ -608,8 +610,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
- primary.run("drop database if exists " + primaryDbName + " cascade");
-
WarehouseInstance.Tuple reverseDumpData = replica.run("use " + replicatedDbName)
.run("insert into t2 partition(name='Carl') values(12)")
.run("create table t3 (id int)")
@@ -622,12 +622,16 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
assertTrue(fs.exists(dumpAckFile));
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
- assertFalse(MetaStoreUtils.isTargetOfReplication(db));
+ assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+ primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir);
+ //do a second reverse dump.
+ reverseDumpData = replica.dump(replicatedDbName, retainPrevDumpDir);
+ dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
- primary.load(primaryDbName, replicatedDbName)
+ primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir)
.run("use " + primaryDbName)
.run("show tables")
.verifyResults(new String[]{"t1", "t2", "t3"})
@@ -642,13 +646,15 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
- reverseDumpData = replica.run("insert into t3 values (15)")
- .dump(replicatedDbName, retainPrevDumpDir);
+ reverseDumpData = replica.run("use " + replicatedDbName)
+ .run("insert into t3 values (15)")
+ .dump(replicatedDbName);
dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()));
assertFalse(MetaStoreUtils.isDbBeingFailedOver(replica.getDatabase(replicatedDbName)));
primary.load(primaryDbName, replicatedDbName)
+ .run("use " + primaryDbName)
.run("select id from t3")
.verifyResults(new String[]{"10", "15"});;
assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
@@ -660,7 +666,9 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
"'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
"'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
List<String> retainPrevDumpDir = Arrays.asList("'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
- "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
+ "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'",
+ "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='true'"
+ );
WarehouseInstance.Tuple dumpData = primary.run("use " + primaryDbName)
.run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
"tblproperties (\"transactional\"=\"true\")")
@@ -711,8 +719,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
- primary.run("drop database if exists " + primaryDbName + " cascade");
-
WarehouseInstance.Tuple reverseDumpData = replica.run("use " + replicatedDbName)
.run("insert into t2 partition(name='Bob') values(20)")
.run("create table t3 (id int)")
@@ -725,26 +731,29 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
Path dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
assertTrue(fs.exists(dumpAckFile));
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
- assertFalse(MetaStoreUtils.isTargetOfReplication(db));
-
+ assertTrue(MetaStoreUtils.isTargetOfReplication(db));
fs.delete(dumpAckFile, false);
assertFalse(fs.exists(dumpAckFile));
WarehouseInstance.Tuple preFailoverDumpData = dumpData;
dumpData = replica.dump(replicatedDbName, retainPrevDumpDir);
+ dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+ dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
assertNotEquals(dumpData.dumpLocation, preFailoverDumpData.dumpLocation);
assertTrue(fs.exists(new Path(preFailoverDumpData.dumpLocation)));
- assertEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
+ assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
- assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+ assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
assertTrue(fs.exists(dumpAckFile));
db = replica.getDatabase(replicatedDbName);
assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
- assertFalse(MetaStoreUtils.isTargetOfReplication(db));
+ assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+ primary.load(primaryDbName, replicatedDbName);
+ dumpData = replica.dump(replicatedDbName, retainPrevDumpDir);
- primary.load(primaryDbName, replicatedDbName)
+ primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir)
.run("use " + primaryDbName)
.run("show tables")
.verifyResults(new String[]{"t1", "t2", "t3"})
@@ -759,7 +768,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
- reverseDumpData = replica.run("insert into t3 values (3)")
+ reverseDumpData = replica.run("use " + replicatedDbName)
+ .run("insert into t3 values (3)")
.run("insert into t2 partition(name='Bob') values(30)")
.dump(replicatedDbName, retainPrevDumpDir);
dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
@@ -778,7 +788,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
assertFalse(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
MetaStoreUtils.FailoverEndpoint.TARGET));
- primary.load(primaryDbName, replicatedDbName)
+ primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir)
+ .run("use " + primaryDbName)
.run("select rank from t2 order by rank")
.verifyResults(new String[]{"11", "20", "30"})
.run("select id from t3")
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index abd4cf8ac3e..34406ee05e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -51,6 +51,7 @@ import java.io.IOException;
import java.util.Map;
import java.util.List;
import java.util.Collections;
+import java.util.Objects;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
@@ -192,18 +193,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
for (String dbName : databases) {
Database database = db.getDatabase(dbName);
if (database != null) {
+ Map<String, String> dbParams = database.getParameters();
if (MetaStoreUtils.isTargetOfReplication(database)) {
- if (MetaStoreUtils.isDbBeingFailedOverAtEndpoint(database, MetaStoreUtils.FailoverEndpoint.TARGET)) {
- LOG.info("Proceeding with dump operation as database: {} is target of replication and" +
- "{} is set to TARGET.", dbName, ReplConst.REPL_FAILOVER_ENDPOINT);
- ReplUtils.unsetDbPropIfSet(database, ReplConst.TARGET_OF_REPLICATION, db);
- } else {
- LOG.warn("Database " + dbNameOrPattern + " is marked as target of replication (repl.target.for), Will "
- + "trigger failover.");
- }
+ LOG.info("Triggering optimized bootstrap for database {} since it " +
+ "is marked as target of replication (repl.target.for) " +
+ "with {} set to {}",
+ dbName, ReplConst.REPL_FAILOVER_ENDPOINT,
+ Objects.nonNull(dbParams) ?
+ dbParams.get(ReplConst.REPL_FAILOVER_ENDPOINT)
+ : null
+ );
}
} else {
- throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist");
+ throw new SemanticException("Cannot dump database " + dbName + " as it does not exist");
}
}
}