You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/07/05 20:38:01 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #2121: HIVE-24918: Handle failover case during repl dump.

pkumarsinha commented on a change in pull request #2121:
URL: https://github.com/apache/hive/pull/2121#discussion_r664101074



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:

Review comment:
       nit:
   /* Open transactions can be of two types:
        Case 1) Txns that have not acquired HIVE LOCKS or they belong to different db: These txns would be captured in
        _failovermetadata file inside dump directory.
        Case 2) Txns that have acquired HIVE LOCKS and belong to db under replication: These txns would be aborted by hive
        as a part of dump operation.
        */

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "' = 'true')");
+    assert dumpData.dumpLocation.equals(ReplUtils.getLatestDumpPath(dbRootDir, conf).toString());
+    assert prevDumpDirModifTime == getLatestDumpDirModifTime(dbRootDir);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    replica.load(replicatedDbName, primaryDbName);
+
+    fs.create(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+    fs.create(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+
+    //Since the failover start config is disabled and previous valid dump directory contains _failover_ready marker file
+    //So, this dump iteration will perform bootstrap dump instead of incremental.
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+  }
+
+  private long getLatestDumpDirModifTime(Path dumpRoot) throws Exception {
+    FileSystem fs = dumpRoot.getFileSystem(conf);
+    long latestModifTime = -1;
+    if (fs.exists(dumpRoot)) {
+      for (FileStatus status : fs.listStatus(dumpRoot)) {
+        if (status.getModificationTime() > latestModifTime) {
+          latestModifTime = status.getModificationTime();
+        }
+      }
+    }
+    return latestModifTime;
+  }
+
+  @Test
+  public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));

Review comment:
       nit: Why not create a Path object for each and reuse. Code would look more readable

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "' = 'true')");

Review comment:
       Why not to use this format: primary.dump(primaryDbName, failoverConfigs);

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -1090,6 +1172,26 @@ Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
     }
   }
 
+  private void unsetReplFailoverEnabled(Hive hiveDb, Database db) throws HiveException {

Review comment:
       Just  unsetReplFailoverEnabled(Hive hiveDb) is enough no?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)

Review comment:
       Why does load not require the  failoverConfig

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "' = 'true')");
+    assert dumpData.dumpLocation.equals(ReplUtils.getLatestDumpPath(dbRootDir, conf).toString());
+    assert prevDumpDirModifTime == getLatestDumpDirModifTime(dbRootDir);

Review comment:
       Also assert on dump dir name

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "' = 'true')");
+    assert dumpData.dumpLocation.equals(ReplUtils.getLatestDumpPath(dbRootDir, conf).toString());
+    assert prevDumpDirModifTime == getLatestDumpDirModifTime(dbRootDir);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    replica.load(replicatedDbName, primaryDbName);
+
+    fs.create(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+    fs.create(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+
+    //Since the failover start config is disabled and previous valid dump directory contains _failover_ready marker file
+    //So, this dump iteration will perform bootstrap dump instead of incremental.
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+  }
+
+  private long getLatestDumpDirModifTime(Path dumpRoot) throws Exception {
+    FileSystem fs = dumpRoot.getFileSystem(conf);
+    long latestModifTime = -1;
+    if (fs.exists(dumpRoot)) {
+      for (FileStatus status : fs.listStatus(dumpRoot)) {
+        if (status.getModificationTime() > latestModifTime) {
+          latestModifTime = status.getModificationTime();
+        }
+      }
+    }
+    return latestModifTime;
+  }
+
+  @Test
+  public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData previousFmd = new FailoverMetaData(dumpPath, conf);
+    Long failoverEventId = previousFmd.getFailoverEventId();
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));
+
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()));
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t2 partition(name='Carl') values(10)")
+            .dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));

Review comment:
       You are doing an insert. Why should it be == ever?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "' = 'true')");
+    assert dumpData.dumpLocation.equals(ReplUtils.getLatestDumpPath(dbRootDir, conf).toString());
+    assert prevDumpDirModifTime == getLatestDumpDirModifTime(dbRootDir);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);

Review comment:
       nit: assertEquals for equality check?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -180,6 +184,20 @@ public int execute() {
         }
         //If no previous dump is present or previous dump is already loaded, proceed with the dump operation.
         if (shouldDump(previousValidHiveDumpPath)) {
+          if (previousValidHiveDumpPath != null) {
+            FileSystem fs = previousValidHiveDumpPath.getFileSystem(conf);
+            if (fs.exists(new Path(previousValidHiveDumpPath, ReplAck.FAILOVER_READY_MARKER.toString()))) {
+              Database db = getHive().getDatabase(work.dbNameOrPattern);
+              if (MetaStoreUtils.isDbBeingFailedOver(db)) {
+                unsetReplFailoverEnabled(getHive(), db);
+                fs.delete(new Path(previousValidHiveDumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+              } else {
+                LOG.info("Switching to bootstrap dump as this is the first dump execution after failover.");

Review comment:
       We shouldn't delete the prev dump directory in this case for debugging?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/FailoverMetaData.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FailoverMetaData {
+    public static final String FAILOVER_METADATA = "_failovermetadata";
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverMetaData.class);
+
+    private static ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+
+    @JsonProperty
+    private Long failoverEventId = null;
+    @JsonProperty
+    private Long cursorPoint = null;
+    @JsonProperty
+    private List<Long> abortedTxns;
+    @JsonProperty
+    private List<Long> openTxns;
+    @JsonProperty
+    private List<Long> txnsWithoutLock;
+
+    @JsonIgnore
+    private boolean initialized = false;
+    @JsonIgnore
+    private final Path dumpFile;
+    @JsonIgnore
+    private final HiveConf hiveConf;
+
+    public FailoverMetaData() {
+        //to be instantiated by JSON ObjectMapper.
+        dumpFile = null;
+        hiveConf = null;
+    }
+
+    public FailoverMetaData(Path dumpDir, HiveConf hiveConf) {
+        this.hiveConf = hiveConf;
+        this.dumpFile = new Path(dumpDir, FAILOVER_METADATA);
+    }
+
+    private void initializeIfNot() throws SemanticException {
+        if (!initialized) {
+            loadMetadataFromFile();
+            initialized = true;
+        }
+    }
+
+    public void setMetaData(FailoverMetaData otherDMD) throws SemanticException {
+        otherDMD.initializeIfNot();
+        this.failoverEventId = otherDMD.failoverEventId;
+        this.abortedTxns = otherDMD.abortedTxns;
+        this.openTxns = otherDMD.openTxns;
+        this.cursorPoint = otherDMD.cursorPoint;
+        this.txnsWithoutLock = otherDMD.txnsWithoutLock;
+        this.initialized = true;
+    }
+
+    private void loadMetadataFromFile() throws SemanticException {
+        LOG.info("Reading failover metadata from file: ", dumpFile);
+        BufferedReader br = null;
+        try {
+            FileSystem fs = dumpFile.getFileSystem(hiveConf);
+            br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
+            String line;
+            if ((line = br.readLine()) != null) {
+                FailoverMetaData otherDMD = JSON_OBJECT_MAPPER.readValue(line, FailoverMetaData.class);
+                setMetaData(otherDMD);
+            } else {
+                throw new IOException("Unable to read valid values from failover Metadata file:"
+                        + dumpFile.toUri().toString());
+            }
+
+        } catch (IOException ioe) {
+            throw new SemanticException(ioe);
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    throw new SemanticException(e);
+                }
+            }
+        }
+    }
+
+    public void setFailoverEventId(Long failoverEventId) {
+        this.failoverEventId = failoverEventId;
+        initialized = true;
+    }
+
+    public void setAbortedTxns(List<Long> abortedTxns) {
+        this.abortedTxns = abortedTxns;
+        initialized = true;
+    }
+
+    public void addToAbortedTxns(List<Long> abortedTxns) {
+        if (this.abortedTxns == null) {
+            this.abortedTxns = abortedTxns;
+        } else {
+            this.abortedTxns.addAll(abortedTxns);
+        }
+    }
+
+    public void setCursorPoint(Long cursorPoint) {
+        this.cursorPoint = cursorPoint;
+        initialized = true;
+    }
+
+    public void setOpenTxns(List<Long> openTxns) {
+        this.openTxns = openTxns;
+        initialized = true;
+    }
+
+    public void setTxnsWithoutLock(List<Long> txnsWithoutLock) {
+        this.txnsWithoutLock = txnsWithoutLock;
+        initialized = true;
+    }
+
+    public Long getCursorPoint() throws SemanticException {
+        initializeIfNot();
+        return cursorPoint;
+    }
+
+    public List<Long> getOpenTxns() throws SemanticException {
+        initializeIfNot();
+        return openTxns;
+    }
+
+    public Long getFailoverEventId() throws SemanticException {
+        initializeIfNot();
+        return failoverEventId;
+    }
+
+    public List<Long> getAbortedTxns() throws SemanticException {
+        initializeIfNot();
+        return abortedTxns;
+    }
+
+    public List<Long> getTxnsWithoutLock() throws SemanticException {
+        initializeIfNot();
+        return txnsWithoutLock;
+    }
+
+    @JsonIgnore
+    public boolean isValidMetadata() throws SemanticException {
+        initializeIfNot();
+        return openTxns != null && abortedTxns != null && failoverEventId != null
+                && cursorPoint != null && txnsWithoutLock != null;
+    }
+
+    public boolean equals(FailoverMetaData otherFmd) throws SemanticException {
+        otherFmd.initializeIfNot();
+        this.initializeIfNot();
+        return this.failoverEventId.equals(otherFmd.getFailoverEventId()) &&
+                this.openTxns.equals(otherFmd.getOpenTxns()) &&
+                this.abortedTxns.equals(otherFmd.getAbortedTxns()) &&
+                this.txnsWithoutLock.equals(otherFmd.getTxnsWithoutLock()) &&
+                this.cursorPoint.equals(otherFmd.getCursorPoint());
+    }
+
+    @JsonIgnore
+    public String getFilePath() {
+        return (dumpFile == null) ? null : dumpFile.toString();
+    }
+
+    public void write() throws SemanticException {
+        String failoverContentAsJSON = null;
+        try {
+            failoverContentAsJSON = JSON_OBJECT_MAPPER.writeValueAsString(this);
+        } catch (JsonProcessingException e) {
+            new SemanticException(e);

Review comment:
       Unused object?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,226 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**Open transactions can be of two types:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Database which is not under replication
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLock().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsWithNoLocks, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsWithNoLocks));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    verifyAllOpenTxnsAborted(txnsWithNoLocks, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    Path dbRootDir = new Path(dumpData.dumpLocation).getParent();
+    long prevDumpDirModifTime = getLatestDumpDirModifTime(dbRootDir);
+    primary.run("REPL DUMP " + primaryDbName + " with ('" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "' = 'true')");
+    assert dumpData.dumpLocation.equals(ReplUtils.getLatestDumpPath(dbRootDir, conf).toString());
+    assert prevDumpDirModifTime == getLatestDumpDirModifTime(dbRootDir);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.INCREMENTAL);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    replica.load(replicatedDbName, primaryDbName);
+
+    fs.create(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+    fs.create(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+
+    //Since the failover start config is disabled and previous valid dump directory contains _failover_ready marker file
+    //So, this dump iteration will perform bootstrap dump instead of incremental.
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(new DumpMetaData(dumpPath, conf).getDumpType() == DumpType.BOOTSTRAP);
+  }
+
+  private long getLatestDumpDirModifTime(Path dumpRoot) throws Exception {
+    FileSystem fs = dumpRoot.getFileSystem(conf);
+    long latestModifTime = -1;
+    if (fs.exists(dumpRoot)) {
+      for (FileStatus status : fs.listStatus(dumpRoot)) {
+        if (status.getModificationTime() > latestModifTime) {
+          latestModifTime = status.getModificationTime();
+        }
+      }
+    }
+    return latestModifTime;
+  }
+
+  @Test
+  public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_RETAIN_PREV_DUMP_DIR + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData previousFmd = new FailoverMetaData(dumpPath, conf);
+    Long failoverEventId = previousFmd.getFailoverEventId();
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));
+
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()));
+
+    dumpData = primary.run("use " + primaryDbName)
+            .run("insert into t2 partition(name='Carl') values(10)")
+            .dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));
+    FailoverMetaData currentFmd = new FailoverMetaData(dumpPath, conf);
+    assertTrue(currentFmd.equals(previousFmd));

Review comment:
       how is this true?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org