You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by tc...@apache.org on 2023/01/18 04:35:58 UTC

[hive] branch master updated: HIVE-26597: Fix unsetting of db property repl.target.for in ReplicationSemanticAnalyzer (Rakshith Chandraiah, reviewed by Teddy Choi)

This is an automated email from the ASF dual-hosted git repository.

tchoi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c92a478e514 HIVE-26597: Fix unsetting of db property repl.target.for in ReplicationSemanticAnalyzer (Rakshith Chandraiah, reviewed by Teddy Choi)
c92a478e514 is described below

commit c92a478e514a28a53009fe5fbf08ce6fa35b58b9
Author: Rakshith C <56...@users.noreply.github.com>
AuthorDate: Wed Jan 18 10:05:44 2023 +0530

    HIVE-26597: Fix unsetting of db property repl.target.for in ReplicationSemanticAnalyzer (Rakshith Chandraiah, reviewed by Teddy Choi)
---
 .../parse/TestReplicationScenariosAcidTables.java  | 65 +++++++++++++---------
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 20 ++++---
 2 files changed, 49 insertions(+), 36 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 13bb9ad1c64..4c82a42badc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -383,10 +383,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(primary.getDatabase(primaryDbName),
             MetaStoreUtils.FailoverEndpoint.SOURCE));
 
-    primary.run("drop database if exists " + primaryDbName + " cascade");
-
-    assertTrue(primary.getDatabase(primaryDbName) == null);
-
     assertFalse(ReplChangeManager.isSourceOfReplication(replica.getDatabase(replicatedDbName)));
 
     WarehouseInstance.Tuple reverseDumpData = replica.run("create table t3 (id int)")
@@ -398,11 +394,15 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
     dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
     assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
-    assertFalse(MetaStoreUtils.isTargetOfReplication(db));
+    assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+    //do a second reverse dump.
+    primary.load(primaryDbName, replicatedDbName);
+    reverseDumpData = replica.dump(replicatedDbName);
+    dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
 
     primary.load(primaryDbName, replicatedDbName)
             .run("use " + primaryDbName)
@@ -419,7 +419,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     Database primaryDb = primary.getDatabase(primaryDbName);
     assertFalse(primaryDb == null);
-    assertTrue(ReplUtils.isFirstIncPending(primaryDb.getParameters()));
+    assertFalse(ReplUtils.isFirstIncPending(primaryDb.getParameters()));
     assertTrue(MetaStoreUtils.isTargetOfReplication(primaryDb));
     assertFalse(MetaStoreUtils.isDbBeingFailedOver(primaryDb));
     assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
@@ -531,7 +531,9 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
             "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
             "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
     List<String> retainPrevDumpDir = Arrays.asList("'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
-            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
+            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'",
+            "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='true'"
+    );
     WarehouseInstance.Tuple dumpData = primary.run("use " + primaryDbName)
             .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
                     "tblproperties (\"transactional\"=\"true\")")
@@ -608,8 +610,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
     assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
 
-    primary.run("drop database if exists " + primaryDbName + " cascade");
-
     WarehouseInstance.Tuple reverseDumpData = replica.run("use " + replicatedDbName)
             .run("insert into t2 partition(name='Carl') values(12)")
             .run("create table t3 (id int)")
@@ -622,12 +622,16 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
     assertTrue(fs.exists(dumpAckFile));
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
-    assertFalse(MetaStoreUtils.isTargetOfReplication(db));
+    assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+    primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir);
+    //do a second reverse dump.
+    reverseDumpData = replica.dump(replicatedDbName, retainPrevDumpDir);
+    dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
 
-    primary.load(primaryDbName, replicatedDbName)
+    primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir)
             .run("use " + primaryDbName)
             .run("show tables")
             .verifyResults(new String[]{"t1", "t2", "t3"})
@@ -642,13 +646,15 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
 
-    reverseDumpData = replica.run("insert into t3 values (15)")
-            .dump(replicatedDbName, retainPrevDumpDir);
+    reverseDumpData = replica.run("use " + replicatedDbName)
+            .run("insert into t3 values (15)")
+            .dump(replicatedDbName);
     dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()));
     assertFalse(MetaStoreUtils.isDbBeingFailedOver(replica.getDatabase(replicatedDbName)));
 
     primary.load(primaryDbName, replicatedDbName)
+            .run("use " + primaryDbName)
             .run("select id from t3")
             .verifyResults(new String[]{"10", "15"});;
     assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
@@ -660,7 +666,9 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
             "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
             "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
     List<String> retainPrevDumpDir = Arrays.asList("'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'",
-            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'");
+            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR_COUNT + "'='1'",
+            "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET + "'='true'"
+    );
     WarehouseInstance.Tuple dumpData = primary.run("use " + primaryDbName)
             .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
                     "tblproperties (\"transactional\"=\"true\")")
@@ -711,8 +719,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
     assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
 
-    primary.run("drop database if exists " + primaryDbName + " cascade");
-
     WarehouseInstance.Tuple reverseDumpData = replica.run("use " + replicatedDbName)
             .run("insert into t2 partition(name='Bob') values(20)")
             .run("create table t3 (id int)")
@@ -725,26 +731,29 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     Path dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
     assertTrue(fs.exists(dumpAckFile));
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
-    assertFalse(MetaStoreUtils.isTargetOfReplication(db));
-
+    assertTrue(MetaStoreUtils.isTargetOfReplication(db));
     fs.delete(dumpAckFile, false);
     assertFalse(fs.exists(dumpAckFile));
     WarehouseInstance.Tuple preFailoverDumpData = dumpData;
     dumpData = replica.dump(replicatedDbName, retainPrevDumpDir);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    dumpAckFile = new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString());
     assertNotEquals(dumpData.dumpLocation, preFailoverDumpData.dumpLocation);
     assertTrue(fs.exists(new Path(preFailoverDumpData.dumpLocation)));
-    assertEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
+    assertNotEquals(reverseDumpData.dumpLocation, dumpData.dumpLocation);
     assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
-    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
     assertTrue(fs.exists(dumpAckFile));
     db = replica.getDatabase(replicatedDbName);
     assertTrue(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(db, MetaStoreUtils.FailoverEndpoint.TARGET));
-    assertFalse(MetaStoreUtils.isTargetOfReplication(db));
+    assertTrue(MetaStoreUtils.isTargetOfReplication(db));
+    primary.load(primaryDbName, replicatedDbName);
+    dumpData = replica.dump(replicatedDbName, retainPrevDumpDir);
 
-    primary.load(primaryDbName, replicatedDbName)
+    primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir)
             .run("use " + primaryDbName)
             .run("show tables")
             .verifyResults(new String[]{"t1", "t2", "t3"})
@@ -759,7 +768,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
 
-    reverseDumpData = replica.run("insert into t3 values (3)")
+    reverseDumpData = replica.run("use " + replicatedDbName)
+            .run("insert into t3 values (3)")
             .run("insert into t2 partition(name='Bob') values(30)")
             .dump(replicatedDbName, retainPrevDumpDir);
     dumpPath = new Path(reverseDumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
@@ -778,7 +788,8 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     assertFalse(MetaStoreUtils.isDbBeingFailedOverAtEndpoint(replica.getDatabase(replicatedDbName),
             MetaStoreUtils.FailoverEndpoint.TARGET));
 
-    primary.load(primaryDbName, replicatedDbName)
+    primary.load(primaryDbName, replicatedDbName, retainPrevDumpDir)
+            .run("use " + primaryDbName)
             .run("select rank from t2 order by rank")
             .verifyResults(new String[]{"11", "20", "30"})
             .run("select id from t3")
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index abd4cf8ac3e..34406ee05e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -51,6 +51,7 @@ import java.io.IOException;
 import java.util.Map;
 import java.util.List;
 import java.util.Collections;
+import java.util.Objects;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
@@ -192,18 +193,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     for (String dbName : databases) {
       Database database = db.getDatabase(dbName);
       if (database != null) {
+        Map<String, String> dbParams = database.getParameters();
         if (MetaStoreUtils.isTargetOfReplication(database)) {
-          if (MetaStoreUtils.isDbBeingFailedOverAtEndpoint(database, MetaStoreUtils.FailoverEndpoint.TARGET)) {
-            LOG.info("Proceeding with dump operation as database: {} is target of replication and" +
-                    "{} is set to TARGET.", dbName, ReplConst.REPL_FAILOVER_ENDPOINT);
-            ReplUtils.unsetDbPropIfSet(database, ReplConst.TARGET_OF_REPLICATION, db);
-          } else {
-            LOG.warn("Database " + dbNameOrPattern + " is marked as target of replication (repl.target.for), Will "
-                + "trigger failover.");
-          }
+          LOG.info("Triggering optimized bootstrap for database {} since it " +
+                          "is marked as target of replication (repl.target.for) " +
+                          "with {} set to {}",
+                  dbName, ReplConst.REPL_FAILOVER_ENDPOINT,
+                  Objects.nonNull(dbParams) ?
+                          dbParams.get(ReplConst.REPL_FAILOVER_ENDPOINT)
+                          : null
+          );
         }
       } else {
-        throw new SemanticException("Cannot dump database " + dbNameOrPattern + " as it does not exist");
+        throw new SemanticException("Cannot dump database " + dbName + " as it does not exist");
       }
     }
   }