You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/04/16 13:19:22 UTC

[hive] branch master updated: HIVE-21529: Hive support bootstrap of ACID/MM tables on an existing policy (Ashutosh Bapat, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c35779  HIVE-21529: Hive support bootstrap of ACID/MM tables on an existing policy (Ashutosh Bapat, reviewed by Sankar Hariappan)
7c35779 is described below

commit 7c35779cceeea72d00611a4b25808eeb2f3fdc31
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Tue Apr 16 18:48:58 2019 +0530

    HIVE-21529: Hive support bootstrap of ACID/MM tables on an existing policy (Ashutosh Bapat, reviewed by Sankar Hariappan)
    
    Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  12 +-
 .../parse/TestReplicationScenariosAcidTables.java  | 639 ++++++++++++++++++---
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |   2 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 113 +++-
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java     |  17 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  13 +
 .../hadoop/hive/ql/parse/repl/dump/Utils.java      |  12 +
 .../repl/dump/events/AllocWriteIdHandler.java      |  14 +
 .../parse/repl/dump/events/CommitTxnHandler.java   |  25 +-
 .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java |   4 +-
 .../hadoop/hive/metastore/MetaStoreTestUtils.java  |  23 +-
 11 files changed, 749 insertions(+), 125 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f74485b..9d9fdbf 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -470,12 +470,12 @@ public class HiveConf extends Configuration {
     REPL_DUMP_METADATA_ONLY("hive.repl.dump.metadata.only", false,
         "Indicates whether replication dump only metadata information or data + metadata. \n"
           + "This config makes hive.repl.include.external.tables config ineffective."),
-    REPL_DUMP_INCLUDE_ACID_TABLES("hive.repl.dump.include.acid.tables", false,
-        "Indicates if repl dump should include information about ACID tables. It should be \n"
-            + "used in conjunction with 'hive.repl.dump.metadata.only' to enable copying of \n"
-            + "metadata for acid tables which do not require the corresponding transaction \n"
-            + "semantics to be applied on target. This can be removed when ACID table \n"
-            + "replication is supported."),
+    REPL_BOOTSTRAP_ACID_TABLES("hive.repl.bootstrap.acid.tables", false,
+        "Indicates if repl dump should bootstrap the information about ACID tables along with \n"
+            + "incremental dump for replication. It is recommended to keep this config parameter \n"
+            + "as false always and should be set to true only via WITH clause of REPL DUMP \n"
+            + "command. It should be set to true only once for incremental repl dump on \n"
+            + "each of the existing replication policies after enabling acid tables replication."),
     REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT("hive.repl.bootstrap.dump.open.txn.timeout", "1h",
         new TimeValidator(TimeUnit.HOURS),
         "Indicates the timeout for all transactions which are opened before triggering bootstrap REPL DUMP. "
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 342985e..aba97ec 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.Behaviour
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
@@ -55,6 +57,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Collections;
 import java.util.Map;
@@ -80,6 +83,14 @@ public class TestReplicationScenariosAcidTables {
     REPL_TEST_ACID_INSERT_OVERWRITE, REPL_TEST_ACID_INSERT_IMPORT, REPL_TEST_ACID_INSERT_LOADLOCAL,
     REPL_TEST_ACID_INSERT_UNION
   }
+  private static List<String> dumpWithoutAcidClause = Collections.singletonList(
+          "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='false'");
+  private static List<String> dumpWithAcidBootstrapClause = Arrays.asList(
+          "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
+          "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES + "'='true'");
+  private List<String> acidTableNames = new LinkedList<>();
+  private List<String> nonAcidTableNames = new LinkedList<>();
+
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
@@ -109,6 +120,7 @@ public class TestReplicationScenariosAcidTables {
       put("hive.mapred.mode", "nonstrict");
       put("mapred.input.dir.recursive", "true");
       put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.in.repl.test", "true");
     }};
 
     acidEnableConf.putAll(overrides);
@@ -149,10 +161,10 @@ public class TestReplicationScenariosAcidTables {
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
-  private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId) throws Throwable {
-    return primary.run("use " + primaryDbName)
+  private void prepareAcidData(String primaryDbName) throws Throwable {
+    primary.run("use " + primaryDbName)
             .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
-                    "tblproperties (\"transactional\"=\"true\")")
+            "tblproperties (\"transactional\"=\"true\")")
             .run("insert into t1 values(1)")
             .run("insert into t1 values(2)")
             .run("create table t2 (place string) partitioned by (country string) clustered by(place) " +
@@ -165,46 +177,344 @@ public class TestReplicationScenariosAcidTables {
                     "\"transactional_properties\"=\"insert_only\")")
             .run("insert into t3 values(11)")
             .run("insert into t3 values(22)")
-            .run("create table t4 (id int)")
-            .run("insert into t4 values(111), (222)")
             .run("create table t5 (id int) stored as orc ")
             .run("insert into t5 values(1111), (2222)")
             .run("alter table t5 set tblproperties (\"transactional\"=\"true\")")
-            .run("insert into t5 values(3333)")
-            .dump(primaryDbName, fromReplId);
+            .run("insert into t5 values(3333)");
+    acidTableNames.add("t1");
+    acidTableNames.add("t2");
+    acidTableNames.add("t3");
+    acidTableNames.add("t5");
   }
 
-  private void verifyLoadExecution(String replicatedDbName, String lastReplId) throws Throwable {
+  private void prepareNonAcidData(String primaryDbName) throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("create table t4 (id int)")
+            .run("insert into t4 values(111), (222)");
+    nonAcidTableNames.add("t4");
+  }
+  private WarehouseInstance.Tuple prepareDataAndDump(String primaryDbName, String fromReplId,
+                                                     List<String> withClause) throws Throwable {
+    prepareAcidData(primaryDbName);
+    prepareNonAcidData(primaryDbName);
+    return primary.run("use " + primaryDbName)
+            .dump(primaryDbName, fromReplId, withClause != null ?
+                    withClause : Collections.emptyList());
+  }
+
+  private void verifyNonAcidTableLoad(String replicatedDbName) throws Throwable {
+    replica.run("use " + replicatedDbName)
+            .run("select id from t4 order by id")
+            .verifyResults(new String[] {"111", "222"});
+  }
+
+  private void verifyAcidTableLoad(String replicatedDbName) throws Throwable {
     replica.run("use " + replicatedDbName)
-            .run("show tables")
-            .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"})
-            .run("repl status " + replicatedDbName)
-            .verifyResult(lastReplId)
             .run("select id from t1 order by id")
             .verifyResults(new String[]{"1", "2"})
             .run("select country from t2 order by country")
             .verifyResults(new String[] {"france", "india", "us"})
             .run("select rank from t3 order by rank")
             .verifyResults(new String[] {"11", "22"})
-            .run("select id from t4 order by id")
-            .verifyResults(new String[] {"111", "222"})
             .run("select id from t5 order by id")
             .verifyResults(new String[] {"1111", "2222", "3333"});
   }
 
+  private void verifyLoadExecution(String replicatedDbName, String lastReplId, boolean includeAcid)
+          throws Throwable {
+    List<String> tableNames = new LinkedList<>(nonAcidTableNames);
+    if (includeAcid) {
+      tableNames.addAll(acidTableNames);
+    }
+    replica.run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(tableNames)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(lastReplId);
+    verifyNonAcidTableLoad(replicatedDbName);
+    if (includeAcid) {
+      verifyAcidTableLoad(replicatedDbName);
+    }
+  }
+
   @Test
   public void testAcidTablesBootstrap() throws Throwable {
-    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
+    // Bootstrap
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null, null);
     replica.load(replicatedDbName, bootstrapDump.dumpLocation);
-    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId);
+    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
+
+    // First incremental, after bootstrap
+    prepareIncNonAcidData(primaryDbName);
+    prepareIncAcidData(primaryDbName);
+    LOG.info(testName.getMethodName() + ": first incremental dump and load.");
+    WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId);
+    replica.load(replicatedDbName, incDump.dumpLocation);
+    verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
+
+    // Second incremental, after bootstrap
+    prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+    prepareInc2AcidData(primaryDbName, primary.hiveConf);
+    LOG.info(testName.getMethodName() + ": second incremental dump and load.");
+    WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, incDump.lastReplicationId);
+    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
   @Test
   public void testAcidTablesMoveOptimizationBootStrap() throws Throwable {
-    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null, null);
     replica.load(replicatedDbName, bootstrapDump.dumpLocation,
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
-    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId);
+    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
+  }
+
+  private void prepareIncAcidData(String dbName) throws Throwable {
+    primary.run("use " + dbName)
+            .run("create table t6 (str string) stored as orc tblproperties " +
+                    "(\"transactional\"=\"true\")")
+            .run("insert into t6 values ('aaa'), ('bbb')")
+            .run("alter table t2 add columns (placetype string)")
+            .run("update t2 set placetype = 'city'");
+    acidTableNames.add("t6");
+  }
+
+  private void verifyIncAcidLoad(String dbName) throws Throwable {
+    replica.run("use " + dbName)
+            .run("select str from t6 order by str")
+            .verifyResults(new String[]{"aaa", "bbb"})
+            .run("select country from t2 order by country")
+            .verifyResults(new String[] {"france", "india", "us"})
+            .run("select distinct placetype from t2")
+            .verifyResult("city")
+            .run("select id from t1 order by id")
+            .verifyResults(new String[]{"1", "2"})
+            .run("select rank from t3 order by rank")
+            .verifyResults(new String[] {"11", "22"})
+            .run("select id from t5 order by id")
+            .verifyResults(new String[] {"1111", "2222", "3333"});
+  }
+
+  private void runUsingDriver(IDriver driver, String command) throws Throwable {
+    CommandProcessorResponse ret = driver.run(command);
+    if (ret.getException() != null) {
+      throw ret.getException();
+    }
+  }
+
+  private void prepareInc2AcidData(String dbName, HiveConf hiveConf) throws Throwable {
+    IDriver driver = DriverFactory.newDriver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    runUsingDriver(driver, "use " + dbName);
+    runUsingDriver(driver, "insert into t1 values (3)");
+    runUsingDriver(driver, "insert into t5 values (4444)");
+  }
+
+  private void verifyInc2AcidLoad(String dbName) throws Throwable {
+    replica.run("use " + dbName)
+            .run("select str from t6 order by str")
+            .verifyResults(new String[]{"aaa", "bbb"})
+            .run("select country from t2 order by country")
+            .verifyResults(new String[] {"france", "india", "us"})
+            .run("select distinct placetype from t2")
+            .verifyResult("city")
+            .run("select id from t1 order by id")
+            .verifyResults(new String[]{"1", "2", "3"})
+            .run("select rank from t3 order by rank")
+            .verifyResults(new String[] {"11", "22"})
+            .run("select id from t5 order by id")
+            .verifyResults(new String[] {"1111", "2222", "3333", "4444"});
+  }
+
+  private void prepareIncNonAcidData(String dbName) throws Throwable {
+    primary.run("use " + dbName)
+            .run("insert into t4 values (333)")
+            .run("create table t7 (str string)")
+            .run("insert into t7 values ('aaa')");
+    nonAcidTableNames.add("t7");
+  }
+
+  private void verifyIncNonAcidLoad(String dbName) throws Throwable {
+    replica.run("use " + dbName)
+            .run("select * from t4 order by id")
+            .verifyResults(new String[] {"111", "222", "333"})
+            .run("select * from t7")
+            .verifyResult("aaa");
+  }
+
+  private void prepareInc2NonAcidData(String dbName, HiveConf hiveConf) throws Throwable {
+    IDriver driver = DriverFactory.newDriver(hiveConf);
+    SessionState.start(new CliSessionState(hiveConf));
+    runUsingDriver(driver, "use " + dbName);
+    runUsingDriver(driver, "insert into t4 values (444)");
+    runUsingDriver(driver, "insert into t7 values ('bbb')");
+  }
+
+  private void verifyInc2NonAcidLoad(String dbName) throws Throwable {
+    replica.run("use " + dbName)
+            .run("select * from t4 order by id")
+            .verifyResults(new String[] {"111", "222", "333", "444"})
+            .run("select * from t7")
+            .verifyResults(new String[] {"aaa", "bbb"});
+  }
+
+  private void verifyIncLoad(String dbName, String lastReplId)
+          throws Throwable {
+    List<String> tableNames = new LinkedList<>(nonAcidTableNames);
+    tableNames.addAll(acidTableNames);
+    replica.run("use " + dbName)
+            .run("show tables")
+            .verifyResults(tableNames)
+            .run("repl status " + dbName)
+            .verifyResult(lastReplId);
+    verifyIncNonAcidLoad(dbName);
+    verifyIncAcidLoad(dbName);
+  }
+
+  private void verifyInc2Load(String dbName, String lastReplId)
+          throws Throwable {
+    List<String> tableNames = new LinkedList<>(nonAcidTableNames);
+    tableNames.addAll(acidTableNames);
+    replica.run("use " + dbName)
+            .run("show tables")
+            .verifyResults(tableNames)
+            .run("repl status " + dbName)
+            .verifyResult(lastReplId);
+    verifyInc2NonAcidLoad(dbName);
+    verifyInc2AcidLoad(dbName);
+  }
+
+  @Test
+  public void testAcidTablesBootstrapDuringIncremental() throws Throwable {
+    // Take a bootstrap dump without acid tables
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+            dumpWithoutAcidClause);
+    LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
+
+    // Take a incremental dump with acid table bootstrap
+    prepareIncAcidData(primaryDbName);
+    prepareIncNonAcidData(primaryDbName);
+    LOG.info(testName.getMethodName() + ": incremental dump and load dump with acid table bootstrap.");
+    WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+    replica.load(replicatedDbName, incrementalDump.dumpLocation);
+    verifyIncLoad(replicatedDbName, incrementalDump.lastReplicationId);
+    // Ckpt should be set on bootstrapped tables.
+    replica.verifyIfCkptSetForTables(replicatedDbName, acidTableNames, incrementalDump.dumpLocation);
+
+    // Take a second normal incremental dump after Acid table boostrap
+    prepareInc2AcidData(primaryDbName, primary.hiveConf);
+    prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+    LOG.info(testName.getMethodName()
+             + ": second incremental dump and load dump after incremental with acid table " +
+            "bootstrap.");
+    WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, incrementalDump.lastReplicationId);
+    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+  }
+
+  @Test
+  public void testRetryAcidTablesBootstrapFromDifferentDump() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+            dumpWithoutAcidClause);
+    LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
+
+    prepareIncAcidData(primaryDbName);
+    prepareIncNonAcidData(primaryDbName);
+    LOG.info(testName.getMethodName() + ": first incremental dump with acid table bootstrap.");
+    WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+
+    // Fail setting ckpt property for table t5 but success for earlier tables
+    BehaviourInjection<CallerArguments, Boolean> callerVerifier
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        if (args.tblName.equalsIgnoreCase("t5") && args.dbName.equalsIgnoreCase(replicatedDbName)) {
+          injectionPathCalled = true;
+          LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName);
+          return false;
+        }
+        return true;
+      }
+    };
+
+    // Fail repl load before the ckpt property is set for t4 and after it is set for t2.
+    // In the retry, these half baked tables should be dropped and bootstrap should be successful.
+    InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
+    try {
+      LOG.info(testName.getMethodName()
+              + ": loading first incremental dump with acid table bootstrap (will fail)");
+      replica.loadFailure(replicatedDbName, incDump.dumpLocation);
+      callerVerifier.assertInjectionsPerformed(true, false);
+    } finally {
+      InjectableBehaviourObjectStore.resetAlterTableModifier();
+    }
+
+    prepareInc2AcidData(primaryDbName, primary.hiveConf);
+    prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+    LOG.info(testName.getMethodName() + ": second incremental dump with acid table bootstrap");
+    WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+
+    // Set incorrect bootstrap dump to clean tables. Here, used the full bootstrap dump which is invalid.
+    // So, REPL LOAD fails.
+    List<String> loadWithClause = Collections.singletonList(
+            "'" + ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+            + bootstrapDump.dumpLocation + "'");
+    LOG.info(testName.getMethodName()
+            + ": trying to load second incremental dump with wrong bootstrap dump "
+            + " specified for cleaning ACID tables. Should fail.");
+    replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+
+    // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one.
+    loadWithClause = Collections.singletonList(
+            "'" + ReplUtils.REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
+                    + incDump.dumpLocation + "'");
+
+    LOG.info(testName.getMethodName()
+            + ": trying to load second incremental dump with correct bootstrap dump "
+            + "specified for cleaning ACID tables. Should succeed.");
+    replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+    verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+
+    // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD
+    // will also drop those tables which will cause data loss.
+    loadWithClause = Collections.emptyList();
+
+    // Verify if bootstrapping with same dump is idempotent and return same result
+    LOG.info(testName.getMethodName()
+            + ": trying to load second incremental dump (with acid bootstrap) again."
+            + " Should succeed.");
+    replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+    verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+  }
+
+  @Test
+  public void retryIncBootstrapAcidFromDifferentDumpWithoutCleanTablesConfig() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+            dumpWithoutAcidClause);
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+
+    prepareIncAcidData(primaryDbName);
+    prepareIncNonAcidData(primaryDbName);
+    WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+    WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+    replica.load(replicatedDbName, incDump.dumpLocation);
+
+    // Re-bootstrapping from different bootstrap dump without clean tables config should fail.
+    replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, Collections.emptyList(),
+            ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
   }
 
   @Test
@@ -212,23 +522,90 @@ public class TestReplicationScenariosAcidTables {
     WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName, null);
     replica.load(replicatedDbName, bootstrapDump.dumpLocation,
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
-    WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, bootstrapDump.lastReplicationId);
+    WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName,
+            bootstrapDump.lastReplicationId, null);
     replica.load(replicatedDbName, incrDump.dumpLocation,
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
-    verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId);
+    verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true);
+  }
+
+  private List<Long> openTxns(int numTxns, TxnStore txnHandler, HiveConf primaryConf) throws Throwable {
+    OpenTxnsResponse otResp = txnHandler.openTxns(new OpenTxnRequest(numTxns, "u1", "localhost"));
+    List<Long> txns = otResp.getTxn_ids();
+    String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(numTxns - 1);
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+            numTxns, TxnDbUtil.countQueryAgent(primaryConf,
+                    "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+    return txns;
+  }
+
+  private void allocateWriteIdsForTables(String primaryDbName, Map<String, Long> tables,
+                                         TxnStore txnHandler,
+                                         List<Long> txns, HiveConf primaryConf) throws Throwable {
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest();
+    rqst.setDbName(primaryDbName);
+
+    for(Map.Entry<String, Long> entry : tables.entrySet()) {
+      rqst.setTableName(entry.getKey());
+      rqst.setTxnIds(txns);
+      txnHandler.allocateTableWriteIds(rqst);
+    }
+    verifyWriteIdsForTables(tables, primaryConf, primaryDbName);
+  }
+
+  private void verifyWriteIdsForTables(Map<String, Long> tables, HiveConf conf, String dbName)
+          throws Throwable {
+    for(Map.Entry<String, Long> entry : tables.entrySet()) {
+      Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from TXN_TO_WRITE_ID"),
+                          entry.getValue().longValue(),
+                          TxnDbUtil.countQueryAgent(conf,
+          "select count(*) from TXN_TO_WRITE_ID where t2w_database = '"
+                    + dbName.toLowerCase()
+                    + "' and t2w_table = '" + entry.getKey() + "'"));
+    }
+  }
+
+  private void verifyAllOpenTxnsAborted(List<Long> txns, HiveConf primaryConf) throws Throwable {
+    int numTxns = txns.size();
+    String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(numTxns - 1);
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+            0, TxnDbUtil.countQueryAgent(primaryConf,
+                    "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+            numTxns, TxnDbUtil.countQueryAgent(primaryConf,
+                    "select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
+  }
+
+  private void verifyNextId(Map<String, Long> tables, String dbName, HiveConf conf) throws Throwable {
+    // Verify the next write id
+    for(Map.Entry<String, Long> entry : tables.entrySet()) {
+      String[] nextWriteId =
+              TxnDbUtil.queryToString(conf,
+                      "select nwi_next from NEXT_WRITE_ID where  nwi_database = '"
+                              + dbName.toLowerCase() + "' and nwi_table = '"
+                              + entry.getKey() + "'").split("\n");
+      Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), entry.getValue() + 1);
+    }
+  }
+
+  private void verifyCompactionQueue(Map<String, Long> tables, String dbName, HiveConf conf)
+          throws Throwable {
+    for(Map.Entry<String, Long> entry : tables.entrySet()) {
+    Assert.assertEquals(TxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+                        entry.getValue().longValue(),
+                        TxnDbUtil.countQueryAgent(conf,
+                    "select count(*) from COMPACTION_QUEUE where cq_database = '" + dbName
+                            + "' and cq_table = '" + entry.getKey() + "'"));
+    }
   }
 
   @Test
   public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable {
-    // Open 5 txns
+    int numTxns = 5;
     HiveConf primaryConf = primary.getConf();
     TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
-    OpenTxnsResponse otResp = txnHandler.openTxns(new OpenTxnRequest(5, "u1", "localhost"));
-    List<Long> txns = otResp.getTxn_ids();
-    String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(4);
-    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
-            5, TxnDbUtil.countQueryAgent(primaryConf,
-                  "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+    // Open 5 txns
+    List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
 
     // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables.
     primary.run("use " + primaryDbName)
@@ -239,21 +616,13 @@ public class TestReplicationScenariosAcidTables {
                     "\"transactional_properties\"=\"insert_only\")")
             .run("insert into t2 partition(name='Bob') values(11)")
             .run("insert into t2 partition(name='Carl') values(10)");
+
     // Allocate write ids for both tables t1 and t2 for all txns
     // t1=5+1(insert) and t2=5+2(insert)
-    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(primaryDbName, "t1");
-    rqst.setTxnIds(txns);
-    txnHandler.allocateTableWriteIds(rqst);
-    rqst.setTableName("t2");
-    txnHandler.allocateTableWriteIds(rqst);
-    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
-            6, TxnDbUtil.countQueryAgent(primaryConf,
-                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
-                            + "' and t2w_table = 't1'"));
-    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
-            7, TxnDbUtil.countQueryAgent(primaryConf,
-                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
-                            + "' and t2w_table = 't2'"));
+    Map<String, Long> tables = new HashMap<>();
+    tables.put("t1", numTxns+1L);
+    tables.put("t2", numTxns+2L);
+    allocateWriteIdsForTables(primaryDbName, tables, txnHandler, txns, primaryConf);
 
     // Bootstrap dump with open txn timeout as 1s.
     List<String> withConfigs = Arrays.asList(
@@ -263,22 +632,8 @@ public class TestReplicationScenariosAcidTables {
             .dump(primaryDbName, null, withConfigs);
 
     // After bootstrap dump, all the opened txns should be aborted. Verify it.
-    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
-            0, TxnDbUtil.countQueryAgent(primaryConf,
-                    "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
-    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
-            5, TxnDbUtil.countQueryAgent(primaryConf,
-                    "select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
-
-    // Verify the next write id
-    String[] nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
-            + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't1'")
-            .split("\n");
-    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
-    nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
-            + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't2'")
-            .split("\n");
-    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+    verifyAllOpenTxnsAborted(txns, primaryConf);
+    verifyNextId(tables, primaryDbName, primaryConf);
 
     // Bootstrap load which should also replicate the aborted write ids on both tables.
     HiveConf replicaConf = replica.getConf();
@@ -294,35 +649,82 @@ public class TestReplicationScenariosAcidTables {
             .verifyResults(new String[] {"10", "11"});
 
     // Verify if HWM is properly set after REPL LOAD
-    nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
-            + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't1'")
-            .split("\n");
-    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
-    nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
-            + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't2'")
-            .split("\n");
-    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+    verifyNextId(tables, replicatedDbName, replicaConf);
+
+    // Verify if all the aborted write ids are replicated to the replicated DB
+    for(Map.Entry<String, Long> entry : tables.entrySet()) {
+      entry.setValue((long) numTxns);
+    }
+    verifyWriteIdsForTables(tables, replicaConf, replicatedDbName);
+
+    // Verify if entries added in COMPACTION_QUEUE for each table/partition
+    // t1-> 1 entry and t2-> 2 entries (1 per partition)
+    tables.clear();
+    tables.put("t1", 1L);
+    tables.put("t2", 2L);
+    verifyCompactionQueue(tables, replicatedDbName, replicaConf);
+  }
+
+  @Test
+  public void testAcidTablesBootstrapDuringIncrementalWithOpenTxnsTimeout() throws Throwable {
+    // Take a dump without ACID tables
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+                                                                dumpWithoutAcidClause);
+    LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+
+    // Open concurrent transactions, create data for incremental and take an incremental dump
+    // with ACID table bootstrap.
+    int numTxns = 5;
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    // Open 5 txns
+    List<Long> txns = openTxns(numTxns, txnHandler, primaryConf);
+    prepareIncNonAcidData(primaryDbName);
+    prepareIncAcidData(primaryDbName);
+    // Allocate write ids for tables t1 and t2 for all txns
+    // t1=5+2(insert) and t2=5+5(insert, alter add column)
+    Map<String, Long> tables = new HashMap<>();
+    tables.put("t1", numTxns+2L);
+    tables.put("t2", numTxns+5L);
+    allocateWriteIdsForTables(primaryDbName, tables, txnHandler, txns, primaryConf);
+
+    // Bootstrap dump with open txn timeout as 1s.
+    List<String> withConfigs = new LinkedList<>(dumpWithAcidBootstrapClause);
+            withConfigs.add("'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
+    WarehouseInstance.Tuple incDump = primary
+            .run("use " + primaryDbName)
+            .dump(primaryDbName, bootstrapDump.lastReplicationId, withConfigs);
+
+    // After bootstrap dump, all the opened txns should be aborted. Verify it.
+    verifyAllOpenTxnsAborted(txns, primaryConf);
+    verifyNextId(tables, primaryDbName, primaryConf);
+
+    // Incremental load with ACID bootstrap should also replicate the aborted write ids on
+    // tables t1 and t2
+    HiveConf replicaConf = replica.getConf();
+    LOG.info(testName.getMethodName() + ": loading incremental dump with ACID bootstrap.");
+    replica.load(replicatedDbName, incDump.dumpLocation);
+    // During incremental dump with ACID bootstrap we do not dump ALLOC_WRITE_ID events. So the
+    // two ALLOC_WRITE_ID events corresponding aborted transactions on t1 and t2 will not be
+    // repliaced. Discount those.
+    verifyIncLoad(replicatedDbName,
+            (new Long(Long.valueOf(incDump.lastReplicationId) - 2)).toString());
+    // Verify if HWM is properly set after REPL LOAD
+    verifyNextId(tables, replicatedDbName, replicaConf);
 
     // Verify if all the aborted write ids are replicated to the replicated DB
-    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
-            5, TxnDbUtil.countQueryAgent(replicaConf,
-                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
-                            + "' and t2w_table = 't1'"));
-    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
-            5, TxnDbUtil.countQueryAgent(replicaConf,
-                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
-                            + "' and t2w_table = 't2'"));
+    for(Map.Entry<String, Long> entry : tables.entrySet()) {
+      entry.setValue((long) numTxns);
+    }
+    verifyWriteIdsForTables(tables, replicaConf, replicatedDbName);
 
     // Verify if entries added in COMPACTION_QUEUE for each table/partition
     // t1-> 1 entry and t2-> 2 entries (1 per partition)
-    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
-            1, TxnDbUtil.countQueryAgent(replicaConf,
-                    "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
-                            + "' and cq_table = 't1'"));
-    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
-            2, TxnDbUtil.countQueryAgent(replicaConf,
-                    "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
-                            + "' and cq_table = 't2'"));
+    tables.clear();
+    tables.put("t1", 1L);
+    tables.put("t2", 4L);
+    verifyCompactionQueue(tables, replicatedDbName, replicaConf);
   }
 
   @Test
@@ -397,6 +799,83 @@ public class TestReplicationScenariosAcidTables {
   }
 
   @Test
+  public void testBootstrapAcidTablesDuringIncrementalWithConcurrentWrites() throws Throwable {
+    // Dump and load bootstrap without ACID tables.
+    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null,
+                                                                dumpWithoutAcidClause);
+    LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+
+    // Create incremental data for incremental load with bootstrap of ACID
+    prepareIncNonAcidData(primaryDbName);
+    prepareIncAcidData(primaryDbName);
+    // Perform concurrent writes. Bootstrap won't see the written data but the subsequent
+    // incremental repl should see it. We can not inject callerVerifier since an incremental dump
+    // would not cause an ALTER DATABASE event. Instead we piggy back on
+    // getCurrentNotificationEventId() which is anyway required for a bootstrap.
+    BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId> callerInjectedBehavior
+            = new BehaviourInjection<CurrentNotificationEventId, CurrentNotificationEventId>() {
+      @Nullable
+      @Override
+      public CurrentNotificationEventId apply(@Nullable CurrentNotificationEventId input) {
+        if (injectionPathCalled) {
+          nonInjectedPathCalled = true;
+        } else {
+          // Do some writes through concurrent thread
+          injectionPathCalled = true;
+          Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+              LOG.info("Entered new thread");
+              try {
+                prepareInc2NonAcidData(primaryDbName, primary.hiveConf);
+                prepareInc2AcidData(primaryDbName, primary.hiveConf);
+              } catch (Throwable t) {
+                Assert.assertNull(t);
+              }
+              LOG.info("Exit new thread success");
+            }
+          });
+          t.start();
+          LOG.info("Created new thread {}", t.getName());
+          try {
+            t.join();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        return input;
+      }
+    };
+
+    InjectableBehaviourObjectStore.setGetCurrentNotificationEventIdBehaviour(callerInjectedBehavior);
+    WarehouseInstance.Tuple incDump = null;
+    try {
+      incDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId, dumpWithAcidBootstrapClause);
+      callerInjectedBehavior.assertInjectionsPerformed(true, true);
+    } finally {
+      // reset the behaviour
+      InjectableBehaviourObjectStore.resetGetCurrentNotificationEventIdBehaviour();
+    }
+
+    // While bootstrapping ACID tables it has taken snapshot before concurrent thread performed
+    // write. So concurrent writes won't be dumped.
+    LOG.info(testName.getMethodName() +
+            ": loading incremental dump containing bootstrapped ACID tables.");
+    replica.load(replicatedDbName, incDump.dumpLocation);
+    verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
+
+    // Next Incremental should include the concurrent writes
+    LOG.info(testName.getMethodName() +
+            ": dumping second normal incremental dump from event id = " + incDump.lastReplicationId);
+    WarehouseInstance.Tuple inc2Dump = primary.dump(primaryDbName, incDump.lastReplicationId);
+    LOG.info(testName.getMethodName() +
+            ": loading second normal incremental dump from event id = " + incDump.lastReplicationId);
+    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
+  }
+
+  @Test
   public void testAcidTablesBootstrapWithConcurrentDropTable() throws Throwable {
     HiveConf primaryConf = primary.getConf();
     primary.run("use " + primaryDbName)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index e9a63f8..aeafe85 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -155,7 +155,7 @@ public class WarehouseInstance implements Closeable {
       hiveConf.set(entry.getKey(), entry.getValue());
     }
 
-    MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true);
+    MetaStoreTestUtils.startMetaStoreWithRetry(hiveConf, true, true);
 
     // Add the below mentioned dependency in metastore/pom.xml file. For postgres need to copy postgresql-42.2.1.jar to
     // .m2//repository/postgresql/postgresql/9.3-1102.jdbc41/postgresql-9.3-1102.jdbc41.jar.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index eb5c18a..11f8bc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -137,9 +137,60 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     Utils.writeOutput(values, new Path(work.resultTempPath), conf);
   }
 
+  /**
+   * Decide whether to examine all the tables to dump. We do this if
+   * 1. External tables are going to be part of the dump : In which case we need to list their
+   * locations.
+   * 2. External or ACID tables are being bootstrapped for the first time : so that we can dump
+   * those tables as a whole.
+   * @return
+   */
+  private boolean shouldDumpExternalTableLocation() {
+    return conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
+            && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
+            || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES));
+  }
+
+  private boolean shouldExamineTablesToDump() {
+    return shouldDumpExternalTableLocation() ||
+            conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+  }
+
+  private boolean shouldBootstrapDumpTable(Table table) {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES) &&
+            TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+      return true;
+    }
+
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES) &&
+           AcidUtils.isTransactionalTable(table)) {
+      return true;
+    }
+
+    return false;
+  }
+
   private Long incrementalDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb) throws Exception {
     Long lastReplId;// get list of events matching dbPattern & tblPattern
     // go through each event, and dump out each event to a event-level dump dir inside dumproot
+    String validTxnList = null;
+    long waitUntilTime = 0;
+    long bootDumpBeginReplId = -1;
+
+    // If we are bootstrapping ACID tables, we need to perform steps similar to a regular
+    // bootstrap (See bootstrapDump() for more details. Only difference here is instead of
+    // waiting for the concurrent transactions to finish, we start dumping the incremental events
+    // and wait only for the remaining time if any.
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+      bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
+      assert (bootDumpBeginReplId >= 0);
+      LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {} and table {}",
+              work.dbNameOrPattern,
+              work.tableNameOrPattern);
+      long timeoutInMs = HiveConf.getTimeVar(conf,
+              HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+      waitUntilTime = System.currentTimeMillis() + timeoutInMs;
+    }
 
     // TODO : instead of simply restricting by message format, we should eventually
     // move to a jdbc-driver-stype registering of message format, and picking message
@@ -147,7 +198,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     // same factory, restricting by message format is effectively a guard against
     // older leftover data that would cause us problems.
 
-    work.overrideEventTo(hiveDb);
+    work.overrideLastEventToDump(hiveDb, bootDumpBeginReplId);
 
     IMetaStoreClient.NotificationFilter evFilter = new AndFilter(
         new DatabaseAndTableFilter(work.dbNameOrPattern, work.tableNameOrPattern),
@@ -193,27 +244,36 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     dmd.setDump(DumpType.INCREMENTAL, work.eventFrom, lastReplId, cmRoot);
     dmd.write();
 
-    // If external tables are enabled for replication and
-    // - If bootstrap is enabled, then need to combine bootstrap dump of external tables.
-    // - If metadata-only dump is enabled, then shall skip dumping external tables data locations to
-    //   _external_tables_info file. If not metadata-only, then dump the data locations.
-    if (conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)
-        && (!conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)
-        || conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES))) {
+    // If required wait more for any transactions open at the time of starting the ACID bootstrap.
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+      assert (waitUntilTime > 0);
+      validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+    }
+
+    // Examine all the tables if required.
+    if (shouldExamineTablesToDump()) {
       Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true);
+
       try (Writer writer = new Writer(dumpRoot, conf)) {
         for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
           Table table = hiveDb.getTable(dbName, tableName);
-          if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+
+          // Dump external table locations if required.
+          if (shouldDumpExternalTableLocation() &&
+                  TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
             writer.dataLocationDump(table);
-            if (conf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES)) {
-              HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
-              dumpTable(dbName, tableName, null, dbRoot, 0, hiveDb, tableTuple);
-            }
+          }
+
+          // Dump the table to be bootstrapped if required.
+          if (shouldBootstrapDumpTable(table)) {
+            HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
+            dumpTable(dbName, tableName, validTxnList, dbRoot, bootDumpBeginReplId, hiveDb,
+                    tableTuple);
           }
         }
       }
     }
+
     return lastReplId;
   }
 
@@ -256,8 +316,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     assert (bootDumpBeginReplId >= 0L);
 
     LOG.info("Bootstrap Dump for db {} and table {}", work.dbNameOrPattern, work.tableNameOrPattern);
+    long timeoutInMs = HiveConf.getTimeVar(conf,
+            HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+    long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
 
-    String validTxnList = getValidTxnListForReplDump(hiveDb);
+    String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
     for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
       LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
       Database db = hiveDb.getDatabase(dbName);
@@ -397,18 +460,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return openTxns;
   }
 
-  String getValidTxnListForReplDump(Hive hiveDb) throws HiveException {
-    // Key design point for REPL DUMP is to not have any txns older than current txn in which dump runs.
-    // This is needed to ensure that Repl dump doesn't copy any data files written by any open txns
-    // mainly for streaming ingest case where one delta file shall have data from committed/aborted/open txns.
-    // It may also have data inconsistency if the on-going txns doesn't have corresponding open/write
-    // events captured which means, catch-up incremental phase won't be able to replicate those txns.
-    // So, the logic is to wait for configured amount of time to see if all open txns < current txn is
-    // getting aborted/committed. If not, then we forcefully abort those txns just like AcidHouseKeeperService.
+  // Get list of valid transactions for Repl Dump. Also wait for a given amount of time for the
+  // open transactions to finish. Abort any open transactions after the wait is over.
+  String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) throws HiveException {
+    // Key design point for REPL DUMP is to not have any txns older than current txn in which
+    // dump runs. This is needed to ensure that Repl dump doesn't copy any data files written by
+    // any open txns mainly for streaming ingest case where one delta file shall have data from
+    // committed/aborted/open txns. It may also have data inconsistency if the on-going txns
+    // doesn't have corresponding open/write events captured which means, catch-up incremental
+    // phase won't be able to replicate those txns. So, the logic is to wait for the given amount
+    // of time to see if all open txns < current txn is getting aborted/committed. If not, then
+    // we forcefully abort those txns just like AcidHouseKeeperService.
     ValidTxnList validTxnList = getTxnMgr().getValidTxns();
-    long timeoutInMs = HiveConf.getTimeVar(conf,
-            HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
-    long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
     while (System.currentTimeMillis() < waitUntilTime) {
       // If there are no txns which are open for the given ValidTxnList snapshot, then just return it.
       if (getOpenTxns(validTxnList).isEmpty()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 61fa424..d32be96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -65,7 +65,22 @@ public class ReplDumpWork implements Serializable {
     return maxEventLimit;
   }
 
-  void overrideEventTo(Hive fromDb) throws Exception {
+  // Override any user specification that changes the last event to be dumped.
+  void overrideLastEventToDump(Hive fromDb, long bootstrapLastId) throws Exception {
+    // If we are bootstrapping ACID tables, we need to dump all the events upto the event id at
+    // the beginning of the bootstrap dump and also not dump any event after that. So we override
+    // both, the last event as well as any user specified limit on the number of events. See
+    // bootstrampDump() for more details.
+    if (bootstrapLastId > 0) {
+      eventTo = bootstrapLastId;
+      maxEventLimit = null;
+      LoggerFactory.getLogger(this.getClass())
+              .debug("eventTo restricted to event id : {} because of bootstrap of ACID tables",
+                      eventTo);
+      return;
+    }
+
+    // If no last event is specified get the current last from the metastore.
     if (eventTo == null) {
       eventTo = fromDb.getMSC().getCurrentNotificationEventId().getEventId();
       LoggerFactory.getLogger(this.getClass())
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index fbdbbdd..f9f13e1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -88,6 +88,10 @@ public class ReplUtils {
   // duplicate check. Note : Stmt id is not used for base directory now, but to avoid misuse later, its maintained.
   public static final int REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID = 0;
 
+  // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be
+  // seen in production or in case of tests other than the ones where it's required.
+  public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables";
+
   /**
    * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
    */
@@ -264,4 +268,13 @@ public class ReplUtils {
     }
     return Long.parseLong(writeIdString);
   }
+
+  // Only for testing, we do not include ACID tables in the dump (and replicate) if config says so.
+  public static boolean includeAcidTableInDump(HiveConf conf) {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST_REPL)) {
+      return conf.getBoolean(REPL_DUMP_INCLUDE_ACID_TABLES, true);
+    }
+
+    return true;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index 3cac813..b09ec25 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -196,6 +197,17 @@ public class Utils {
         }
         return shouldReplicateExternalTables;
       }
+
+      if (AcidUtils.isTransactionalTable(tableHandle.getTTable())) {
+        if (!ReplUtils.includeAcidTableInDump(hiveConf)) {
+          return false;
+        }
+
+        // Skip dumping events related to ACID tables if bootstrap is enabled on it
+        if (isEventDump) {
+          return !hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES);
+        }
+      }
     }
     return true;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
index 7602d1f..bd25a6c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
@@ -35,6 +37,18 @@ class AllocWriteIdHandler extends AbstractEventHandler<AllocWriteIdMessage> {
   @Override
   public void handle(Context withinContext) throws Exception {
     LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), eventMessageAsJSON);
+
+    // If we are bootstrapping ACID table during an incremental dump, the events corresponding to
+    // these ACID tables are not dumped. Hence we do not need to allocate any writeId on the
+    // target and hence we do not need to dump these events.
+    if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+      return;
+    }
+
+    if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) {
+      return;
+    }
+
     DumpMetaData dmd = withinContext.createDmd(this);
     dmd.setPayload(eventMessageAsJSON);
     dmd.write();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 620263f..5c2b3d1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
@@ -107,12 +108,32 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
 
     if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
 
+      boolean replicatingAcidEvents = true;
+      if (withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES)) {
+        // We do not dump ACID table related events when taking a bootstrap dump of ACID tables as
+        // part of an incremental dump. So we shouldn't be dumping any changes to ACID table as
+        // part of the commit. At the same time we need to dump the commit transaction event so
+        // that replication can end a transaction opened when replaying open transaction event.
+        LOG.debug("writeEventsInfoList will be removed from commit message because we are " +
+                "bootstrapping acid tables.");
+        replicatingAcidEvents = false;
+      } else if (!ReplUtils.includeAcidTableInDump(withinContext.hiveConf)) {
+        // Similar to the above condition, only for testing purposes, if the config doesn't allow
+        // ACID tables to be replicated, we don't dump any changes to the ACID tables as part of
+        // commit.
+        LOG.debug("writeEventsInfoList will be removed from commit message because we are " +
+                "not dumping acid tables.");
+        replicatingAcidEvents = false;
+      }
       String contextDbName =  withinContext.dbName == null ? null :
               StringUtils.normalizeIdentifier(withinContext.dbName);
       String contextTableName =  withinContext.tableName == null ? null :
               StringUtils.normalizeIdentifier(withinContext.tableName);
-      List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
-          getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName);
+      List<WriteEventInfo> writeEventInfoList = null;
+      if (replicatingAcidEvents) {
+        writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
+                getAllWriteEventInfo(eventMessage.getTxnId(), contextDbName, contextTableName);
+      }
       int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
       if (numEntry != 0) {
         eventMessage.addWriteEventInfo(writeEventInfoList);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 39f342f..7a58dba 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -78,7 +78,7 @@ public class TestReplDumpTask {
     }
 
     @Override
-    String getValidTxnListForReplDump(Hive hiveDb) {
+    String getValidTxnListForReplDump(Hive hiveDb, long waitUntilTime) {
       return "";
     }
 
@@ -116,6 +116,8 @@ public class TestReplDumpTask {
     when(queryState.getConf()).thenReturn(conf);
     when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L);
     when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false);
+    when(HiveConf.getVar(conf,
+            HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h");
 
     whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class));
     whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class));
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
index efbcfb2..412314c 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/MetaStoreTestUtils.java
@@ -106,10 +106,11 @@ public class MetaStoreTestUtils {
     return MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf);
   }
 
-  public static int startMetaStoreWithRetry(Configuration conf, boolean keepJdbcUri)
+  public static int startMetaStoreWithRetry(Configuration conf, boolean keepJdbcUri,
+                                            boolean keepWarehousePath)
       throws Exception {
     return MetaStoreTestUtils.startMetaStoreWithRetry(HadoopThriftAuthBridge.getBridge(), conf,
-        keepJdbcUri);
+        keepJdbcUri, keepWarehousePath);
   }
 
   public static int startMetaStoreWithRetry() throws Exception {
@@ -119,7 +120,7 @@ public class MetaStoreTestUtils {
 
   public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge,
                                             Configuration conf) throws Exception {
-    return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false);
+    return MetaStoreTestUtils.startMetaStoreWithRetry(bridge, conf, false, false);
   }
 
   /**
@@ -130,20 +131,24 @@ public class MetaStoreTestUtils {
    * @param bridge The Thrift bridge to uses
    * @param conf The configuration to use
    * @param keepJdbcUri If set to true, then the JDBC url is not changed
+   * @param keepWarehousePath If set to true, then the Warehouse directory is not changed
    * @return The port on which the MetaStore finally started
    * @throws Exception
    */
   public static int startMetaStoreWithRetry(HadoopThriftAuthBridge bridge,
-      Configuration conf, boolean keepJdbcUri) throws Exception {
+      Configuration conf, boolean keepJdbcUri, boolean keepWarehousePath) throws Exception {
     Exception metaStoreException = null;
     String warehouseDir = MetastoreConf.getVar(conf, ConfVars.WAREHOUSE);
 
     for (int tryCount = 0; tryCount < MetaStoreTestUtils.RETRY_COUNT; tryCount++) {
       try {
         int metaStorePort = findFreePort();
-        // Setting metastore instance specific warehouse directory, postfixing with port
-        Path postfixedWarehouseDir = new Path(warehouseDir, String.valueOf(metaStorePort));
-        MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, postfixedWarehouseDir.toString());
+        if (!keepWarehousePath) {
+          // Setting metastore instance specific warehouse directory, postfixing with port
+          Path postfixedWarehouseDir = new Path(warehouseDir, String.valueOf(metaStorePort));
+          MetastoreConf.setVar(conf, ConfVars.WAREHOUSE, postfixedWarehouseDir.toString());
+          warehouseDir = postfixedWarehouseDir.toString();
+        }
 
         String jdbcUrl = MetastoreConf.getVar(conf, ConfVars.CONNECT_URL_KEY);
         if (!keepJdbcUri) {
@@ -167,11 +172,11 @@ public class MetaStoreTestUtils {
           fs.mkdirs(wh.getWhRoot());
           fs.setPermission(wh.getWhRoot(),
               new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
-          LOG.info("MetaStore warehouse root dir ({}) is created", postfixedWarehouseDir);
+          LOG.info("MetaStore warehouse root dir ({}) is created", warehouseDir);
         }
 
         LOG.info("MetaStore Thrift Server started on port: {} with warehouse dir: {} with " +
-            "jdbcUrl: {}", metaStorePort, postfixedWarehouseDir, jdbcUrl);
+            "jdbcUrl: {}", metaStorePort, warehouseDir, jdbcUrl);
         return metaStorePort;
       } catch (ConnectException ce) {
         metaStoreException = ce;