You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2021/06/21 02:39:08 UTC

[GitHub] [hive] pkumarsinha commented on a change in pull request #2121: HIVE-24918: Handle failover case during repl dump.

pkumarsinha commented on a change in pull request #2121:
URL: https://github.com/apache/hive/pull/2121#discussion_r647177768



##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -56,12 +60,7 @@
 import java.io.IOException;
 import java.io.BufferedReader;
 import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;

Review comment:
       Revert * import

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database

Review comment:
       What is secondary here, the one which is not under replication? If so, we can modify the comment a bit

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:

Review comment:
       I think we should rephrase the comment here. These are not options rather type/kinds

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
##########
@@ -581,7 +617,48 @@ private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive
     // factory per event to decode. For now, however, since all messages have the
     // same factory, restricting by message format is effectively a guard against
     // older leftover data that would cause us problems.
-    work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId);
+
+    String dbName = work.dbNameOrPattern;
+    Database db = hiveDb.getDatabase(dbName);
+    if (!HiveConf.getBoolVar(conf, REPL_DUMP_METADATA_ONLY)) {
+      setReplSourceFor(hiveDb, dbName, db);
+    }
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_REPL_FAILOVER_START)) {

Review comment:
       Move this logic to a separate method

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
##########
@@ -190,6 +198,64 @@ public void testSuccessBootstrapDumpMetrics() throws Exception {
     actualMetric.setProgress(progress);
     checkSuccessIncremental(actualMetric, expectedMetric, "dump",
       Arrays.asList(ReplUtils.MetricName.EVENTS.name()));
+
+    //Failover Metrics Sink
+    conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "3");
+    String stagingDir = "hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"
+            + "/hrepl0/dGVzdGFjaWR0YWJsZXNyZXBsbG9hZGJvb3RzdHJhcGluY3JfMTU5MjIwNTg3NTM4Nw==/0/hive/";
+    ReplicationMetricCollector failoverDumpMetricCollector = new IncrementalDumpMetricCollector(
+            "testAcidTablesReplLoadBootstrapIncr_1592205875387", stagingDir, conf);
+    metricMap = new HashMap<String, Long>(){{put(ReplUtils.MetricName.EVENTS.name(), (long) 10);}};
+
+    failoverDumpMetricCollector.reportFailoverStart("dump", metricMap, fmd);
+    failoverDumpMetricCollector.reportStageProgress("dump", ReplUtils.MetricName.EVENTS.name(), 10);
+    failoverDumpMetricCollector.reportStageEnd("dump", Status.SUCCESS, 10, new SnapshotUtils.ReplSnapshotCount(),
+            new ReplStatsTracker(0));
+    failoverDumpMetricCollector.reportEnd(Status.FAILOVER_READY);
+
+    expectedMetadata = new Metadata("testAcidTablesReplLoadBootstrapIncr_1592205875387",
+            Metadata.ReplicationType.INCREMENTAL, stagingDir);
+    expectedMetadata.setLastReplId(10);
+    expectedMetadata.setFailoverEventId(100);
+    expectedMetadata.setFailoverMDLoc(stagingDir + FailoverMetaData.FAILOVER_METADATA);
+    expectedProgress = new Progress();
+    expectedProgress.setStatus(Status.FAILOVER_READY);

Review comment:
       We need to test failure case + metrics as well.

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));

Review comment:
       size check for this is missing

##########
File path: ql/src/test/org/apache/hadoop/hive/ql/parse/repl/metric/TestReplicationMetricSink.java
##########
@@ -59,13 +61,19 @@
 
   HiveConf conf;
 
+  @Mock
+  private FailoverMetaData fmd;
+
   @Before
   public void setup() throws Exception {
     conf = new HiveConf();
     conf.set(Constants.SCHEDULED_QUERY_SCHEDULENAME, "repl");
     conf.set(Constants.SCHEDULED_QUERY_EXECUTIONID, "1");
     MetricSink metricSinkSpy = Mockito.spy(MetricSink.getInstance());
     Mockito.doReturn(1L).when(metricSinkSpy).getFrequencyInSecs();
+    Mockito.when(fmd.getEventId()).thenReturn(100L);
+    Mockito.when(fmd.getFilePath()).thenReturn("hdfs://localhost:65158/tmp/org_apache_hadoop_hive_ql_parse_TestReplicationScenarios_245261428230295"

Review comment:
       Hard-cording host port etc might not be flexible. 

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLocks().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    try {
+      primary.dump(primaryDbName, failoverConfigs);  //Will skip this dump since previous dump is failover Ready.
+      assertTrue(false);
+    } catch (Exception e) {
+      Assert.assertEquals(IndexOutOfBoundsException.class, e.getClass());
+    }
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    replica.load(replicatedDbName, primaryDbName);
+
+    fs.create(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+    fs.create(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+
+    //Since the failover start config is disabled and previous valid dump directory contains _failover_ready marker file
+    //So, this dump iteration will perform bootstrap dump instead of incremental.
+    dumpData = primary.dump(primaryDbName);
+    DumpMetaData dmd = new DumpMetaData(new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), conf);
+    assertTrue(dmd.getDumpType() == DumpType.BOOTSTRAP);
+  }
+
+  @Test
+  public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+    Long cursorPoint = primary.getCurrentNotificationEventId().getEventId();
+    failoverMD.setCursorPoint(cursorPoint);
+    failoverMD.setOpenTxns(Arrays.asList(1L, 2L, 3L));
+    failoverMD.setAbortedTxns(new ArrayList<>());
+    Long failoverEventId = primary.getCurrentNotificationEventId().getEventId();
+    failoverMD.setEventId(failoverEventId);
+    failoverMD.setTxnsWithoutLocks(new ArrayList<>());
+    failoverMD.write();
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));

Review comment:
       What failure case are you trying out here? Could you please explain?

##########
File path: common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
##########
@@ -623,6 +623,9 @@ private static void populateLlapDaemonVarsSet(Set<String> llapDaemonVarsSetLocal
                     + "with the hive data and metadata replication. Set the configuration "
                     + "hive.repl.include.authorization.metadata to false to disable "
                     + "security policies being replicated "),
+    HIVE_REPL_FAILOVER_START("hive.repl.failover.start",false,
+            "A replication policy level config to indicate if user wants to initiate fail-over" +

Review comment:
       whitespace at the end of line?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLocks().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    try {
+      primary.dump(primaryDbName, failoverConfigs);  //Will skip this dump since previous dump is failover Ready.
+      assertTrue(false);
+    } catch (Exception e) {
+      Assert.assertEquals(IndexOutOfBoundsException.class, e.getClass());
+    }
+

Review comment:
       We need to test metrics as well

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/metric/event/Metadata.java
##########
@@ -32,6 +32,8 @@
   private ReplicationType replicationType;
   private String stagingDir;
   private long lastReplId;
+  private String failoverMDLoc;

Review comment:
       nit: rename failoverMDLoc-> failoverMetadataLoc

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLocks().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    try {
+      primary.dump(primaryDbName, failoverConfigs);  //Will skip this dump since previous dump is failover Ready.
+      assertTrue(false);
+    } catch (Exception e) {
+      Assert.assertEquals(IndexOutOfBoundsException.class, e.getClass());
+    }
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    replica.load(replicatedDbName, primaryDbName);
+
+    fs.create(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString()));
+    fs.create(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+
+    //Since the failover start config is disabled and previous valid dump directory contains _failover_ready marker file
+    //So, this dump iteration will perform bootstrap dump instead of incremental.
+    dumpData = primary.dump(primaryDbName);
+    DumpMetaData dmd = new DumpMetaData(new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR), conf);
+    assertTrue(dmd.getDumpType() == DumpType.BOOTSTRAP);
+  }
+
+  @Test
+  public void testFailoverDuringDumpWithPreviousFailed() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+    Long cursorPoint = primary.getCurrentNotificationEventId().getEventId();
+    failoverMD.setCursorPoint(cursorPoint);
+    failoverMD.setOpenTxns(Arrays.asList(1L, 2L, 3L));
+    failoverMD.setAbortedTxns(new ArrayList<>());
+    Long failoverEventId = primary.getCurrentNotificationEventId().getEventId();
+    failoverMD.setEventId(failoverEventId);
+    failoverMD.setTxnsWithoutLocks(new ArrayList<>());
+    failoverMD.write();
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+
+    dumpData = primary.run("insert into t1 values(205)")
+            .run("insert into t2 partition(name='Bob') values(205)")
+            .dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    assertTrue(failoverEventId >= Long.parseLong(dumpData.lastReplicationId));
+    failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    assertTrue(cursorPoint.equals(failoverMD.getCursorPoint()));
+    assertTrue(failoverEventId.equals(failoverMD.getEventId()));
+    assertTrue(failoverMD.getAbortedTxns().isEmpty());
+    assertTrue(failoverMD.getOpenTxns().equals(Arrays.asList(1L, 2L, 3L)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+  }
+

Review comment:
       What happens if everything is done but ACK is not creeated? Can we include that as well?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLocks().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    try {
+      primary.dump(primaryDbName, failoverConfigs);  //Will skip this dump since previous dump is failover Ready.
+      assertTrue(false);
+    } catch (Exception e) {
+      Assert.assertEquals(IndexOutOfBoundsException.class, e.getClass());

Review comment:
       Why an IndexOutOfBoundsException for a skip?

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLocks().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    try {
+      primary.dump(primaryDbName, failoverConfigs);  //Will skip this dump since previous dump is failover Ready.
+      assertTrue(false);
+    } catch (Exception e) {
+      Assert.assertEquals(IndexOutOfBoundsException.class, e.getClass());
+    }
+
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    dumpData = primary.dump(primaryDbName);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));

Review comment:
       Also probe that dumpmetadata says that it is incremental dump

##########
File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
##########
@@ -138,6 +141,211 @@ public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
+  @Test
+  public void testFailoverDuringDump() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    WarehouseInstance.Tuple dumpData = null;
+    List<String> failoverConfigs = Arrays.asList("'" + HiveConf.ConfVars.HIVE_REPL_FAILOVER_START + "'='true'");
+    dumpData = primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .dump(primaryDbName, failoverConfigs);
+
+    //This dump is not failover ready as target db can be used for replication only after first incremental load.
+    FileSystem fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertFalse(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertFalse(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId);
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values(1)")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+
+    /**There are 2 options with respect to open transactions:
+     Case 1) Txns that belong to different db or have not acquired HIVE LOCKS: These txns would be caught in
+     _failovermetadata file.
+     Case 2) Txns that belong to db under replication: These txns would be aborted as part of dump operation.
+     */
+    // Open 3 txns for Secondary Database
+    int numTxnsForSecDb = 3;
+    List<Long> txnsForSecDb = openTxns(numTxnsForSecDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of secondary db for 3 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInSecDb = new HashMap<>();
+    tablesInSecDb.put("t1", (long) numTxnsForSecDb);
+    tablesInSecDb.put("t2", (long) numTxnsForSecDb);
+    List<Long> lockIdsForSecDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName + "_extra",
+            tablesInSecDb, txnHandler, txnsForSecDb, primaryConf);
+
+    //Open 2 txns for Primary Db
+    int numTxnsForPrimaryDb = 2;
+    List<Long> txnsForPrimaryDb = openTxns(numTxnsForPrimaryDb, txnHandler, primaryConf);
+
+    // Allocate write ids for both tables of primary db for 2 txns
+    // t1=5 and t2=5
+    Map<String, Long> tablesInPrimaryDb = new HashMap<>();
+    tablesInPrimaryDb.put("t1", (long) numTxnsForPrimaryDb + 1);
+    tablesInPrimaryDb.put("t2", (long) numTxnsForPrimaryDb + 2);
+    List<Long> lockIdsForPrimaryDb = allocateWriteIdsForTablesAndAquireLocks(primaryDbName,
+            tablesInPrimaryDb, txnHandler, txnsForPrimaryDb, primaryConf);
+
+    //Open 1 txn with no hive locks acquired
+    List<Long> txnsWithNoLocks = openTxns(1, txnHandler, primaryConf);
+
+    dumpData = primary.dump(primaryDbName, failoverConfigs);
+
+    fs = new Path(dumpData.dumpLocation).getFileSystem(conf);
+    dumpPath = new Path(dumpData.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, FailoverMetaData.FAILOVER_METADATA)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.FAILOVER_READY_MARKER.toString())));
+    assertTrue(MetaStoreUtils.isDbBeingFailedOver(primary.getDatabase(primaryDbName)));
+    FailoverMetaData failoverMD = new FailoverMetaData(dumpPath, conf);
+
+    List<Long> openTxns = failoverMD.getOpenTxns();
+    List<Long> txnsAborted = failoverMD.getAbortedTxns();
+    assertTrue(txnsAborted.size() == 2);
+    assertTrue(txnsAborted.containsAll(txnsForPrimaryDb));
+    assertTrue(openTxns.size() == 4);
+    assertTrue(openTxns.containsAll(txnsForSecDb));
+    assertTrue(openTxns.containsAll(txnsWithNoLocks));
+    assertTrue(failoverMD.getTxnsWithoutLocks().equals(txnsWithNoLocks));
+
+
+    //TxnsForPrimaryDb and txnsWithNoLocks would have been aborted by dump operation.
+    verifyAllOpenTxnsAborted(txnsForPrimaryDb, primaryConf);
+    verifyAllOpenTxnsNotAborted(txnsForSecDb, primaryConf);
+    //Abort the txns
+    txnHandler.abortTxns(new AbortTxnsRequest(txnsForSecDb));
+    verifyAllOpenTxnsAborted(txnsForSecDb, primaryConf);
+    releaseLocks(txnHandler, lockIdsForSecDb);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(dumpData.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[]{"10", "11"});
+
+    assertTrue(fs.exists(new Path(dumpPath, ReplAck.LOAD_ACKNOWLEDGEMENT.toString())));
+
+    try {
+      primary.dump(primaryDbName, failoverConfigs);  //Will skip this dump since previous dump is failover Ready.
+      assertTrue(false);
+    } catch (Exception e) {
+      Assert.assertEquals(IndexOutOfBoundsException.class, e.getClass());
+    }
+

Review comment:
       Test skip for load as well

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/FailoverMetaData.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.List;
+
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class FailoverMetaData {
+    public static final String FAILOVER_METADATA = "_failovermetadata";
+    private static final Logger LOG = LoggerFactory.getLogger(FailoverMetaData.class);
+
+    private static ObjectMapper JSON_OBJECT_MAPPER = new ObjectMapper();
+
+    @JsonProperty
+    private Long eventId = null;
+    @JsonProperty
+    private Long cursorPoint = null;
+    @JsonProperty
+    private List<Long> abortedTxns;
+    @JsonProperty
+    private List<Long> openTxns;
+    @JsonProperty
+    private List<Long> txnsWithoutLocks;
+
+    @JsonIgnore
+    private boolean initialized = false;
+    @JsonIgnore
+    private final Path dumpFile;
+    @JsonIgnore
+    private final HiveConf hiveConf;
+
+    public FailoverMetaData() {
+        //to be instantiated by JSON ObjectMapper.
+        dumpFile = null;
+        hiveConf = null;
+    }
+
+    public FailoverMetaData(Path dumpDir, HiveConf hiveConf) {
+        this.hiveConf = hiveConf;
+        this.dumpFile = new Path(dumpDir, FAILOVER_METADATA);
+    }
+
+    private void initializeIfNot() throws SemanticException {
+        if (!initialized) {
+            loadMetadataFromFile();
+            initialized = true;
+        }
+    }
+
+    public void setMetaData(FailoverMetaData otherDMD) {
+        this.eventId = otherDMD.eventId;
+        this.abortedTxns = otherDMD.abortedTxns;
+        this.openTxns = otherDMD.openTxns;
+        this.cursorPoint = otherDMD.cursorPoint;
+        this.txnsWithoutLocks = otherDMD.txnsWithoutLocks;
+        this.initialized = true;
+    }
+
+    private void loadMetadataFromFile() throws SemanticException {
+        LOG.info("Reading failover metadata from file: ", dumpFile);
+        BufferedReader br = null;
+        try {
+            FileSystem fs = dumpFile.getFileSystem(hiveConf);
+            br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
+            String line;
+            if ((line = br.readLine()) != null) {
+                FailoverMetaData otherDMD = JSON_OBJECT_MAPPER.readValue(line, FailoverMetaData.class);
+                setMetaData(otherDMD);
+            } else {
+                throw new IOException("Unable to read valid values from failover Metadata file:"
+                        + dumpFile.toUri().toString());
+            }
+
+        } catch (IOException ioe) {
+            throw new SemanticException(ioe);
+        } finally {
+            if (br != null) {
+                try {
+                    br.close();
+                } catch (IOException e) {
+                    throw new SemanticException(e);
+                }
+            }
+        }
+    }
+
+    public void setEventId(Long eventId) {
+        this.eventId = eventId;
+        initialized = true;
+    }
+
+    public void setAbortedTxns(List<Long> abortedTxns) {
+        this.abortedTxns = abortedTxns;
+        initialized = true;
+    }
+
+    public void addToAbortedTxns(List<Long> abortedTxns) {
+        if (this.abortedTxns == null) {
+            this.abortedTxns = abortedTxns;
+        } else {
+            this.abortedTxns.addAll(abortedTxns);
+        }
+    }
+
+    public void setCursorPoint(Long cursorPoint) {
+        this.cursorPoint = cursorPoint;
+        initialized = true;
+    }
+
+    public void setOpenTxns(List<Long> openTxns) {
+        this.openTxns = openTxns;
+        initialized = true;
+    }
+
+    public void setTxnsWithoutLocks(List<Long> txnsWithoutLocks) {
+        this.txnsWithoutLocks = txnsWithoutLocks;
+        initialized = true;
+    }
+
+    public Long getCursorPoint() throws SemanticException {
+        initializeIfNot();
+        return cursorPoint;
+    }
+
+    public List<Long> getOpenTxns() throws SemanticException {
+        initializeIfNot();
+        return openTxns;
+    }
+
+    public Long getEventId() throws SemanticException {
+        initializeIfNot();
+        return eventId;
+    }
+
+    public List<Long> getAbortedTxns() throws SemanticException {
+        initializeIfNot();
+        return abortedTxns;
+    }
+
+    public List<Long> getTxnsWithoutLocks() throws SemanticException {
+        initializeIfNot();
+        return txnsWithoutLocks;
+    }
+
+    @JsonIgnore
+    public boolean isValidMetadata() throws SemanticException {
+        initializeIfNot();
+        return openTxns != null && abortedTxns != null && eventId != null

Review comment:
       What would be values for openTxns/abortedTxns if there is no txn which were open and hence nothing was aborted?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org