You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/10/23 03:39:07 UTC

[hive] branch master updated: HIVE-24109: Load partitions in batches for managed tables in the bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e033ebe  HIVE-24109: Load partitions in batches for managed tables in the bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha)
e033ebe is described below

commit e033ebe3e37357244fe35d3ff9f080742b4e1b5f
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Fri Oct 23 09:08:53 2020 +0530

    HIVE-24109: Load partitions in batches for managed tables in the bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  16 +-
 .../hive/ql/parse/TestReplicationScenarios.java    |  10 +-
 .../parse/TestReplicationScenariosAcidTables.java  | 266 +++++++++++----------
 .../TestReplicationScenariosAcrossInstances.java   |  64 +++--
 .../partition/add/AlterTableAddPartitionDesc.java  |   7 +
 .../apache/hadoop/hive/ql/exec/ReplCopyTask.java   |  81 ++-----
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   2 +
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java     |  17 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |   3 +-
 .../exec/repl/bootstrap/events/PartitionEvent.java |   5 +
 .../events/filesystem/FSPartitionEvent.java        |  12 +
 .../exec/repl/bootstrap/load/ReplicationState.java |  18 ++
 .../repl/bootstrap/load/table/LoadPartitions.java  | 228 ++++++++----------
 .../exec/repl/bootstrap/load/table/LoadTable.java  |   6 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java      |  48 ++--
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |  39 ---
 .../TestPrimaryToReplicaResourceFunction.java      |   2 +-
 .../metastore/InjectableBehaviourObjectStore.java  |  23 ++
 18 files changed, 411 insertions(+), 436 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index aab4913..edaa75b 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -596,18 +596,6 @@ public class HiveConf extends Configuration {
           + "'hive.repl.include.external.tables' when sets to true. If 'hive.repl.include.external.tables' is \n"
           + "set to false, then this config parameter has no effect. It should be set to true only once for \n"
           + "incremental repl dump on each existing replication policy after enabling external tables replication."),
-    REPL_ENABLE_MOVE_OPTIMIZATION("hive.repl.enable.move.optimization", false,
-          "If its set to true, REPL LOAD copies data files directly to the target table/partition location \n"
-          + "instead of copying to staging directory first and then move to target location. This optimizes \n"
-          + " the REPL LOAD on object data stores such as S3 or WASB where creating a directory and move \n"
-          + " files are costly operations. In file system like HDFS where move operation is atomic, this \n"
-          + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."),
-    REPL_MOVE_OPTIMIZED_FILE_SCHEMES("hive.repl.move.optimized.scheme", "s3a, wasb",
-        "Comma separated list of schemes for which move optimization will be enabled during repl load. \n"
-        + "This configuration overrides the value set using REPL_ENABLE_MOVE_OPTIMIZATION for the given schemes. \n"
-        + " Schemes of the file system which does not support atomic move (rename) can be specified here to \n "
-        + " speed up the repl load operation. In file system like HDFS where move operation is atomic, this \n"
-        + " optimization should not be enabled as it may lead to inconsistent data read for non acid tables."),
     REPL_EXTERNAL_TABLE_BASE_DIR("hive.repl.replica.external.table.base.dir", null,
         "This is the fully qualified base directory on the target/replica warehouse under which data for "
             + "external tables is stored. This is relative base path and hence prefixed to the source "
@@ -655,6 +643,10 @@ public class HiveConf extends Configuration {
       "Provide the maximum number of partitions of a table that will be batched together during  \n"
         + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n"
         + "The data for these partitions will be copied before copying the metadata batch. "),
+    REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE("hive.repl.load.partitions.with.data.copy.batch.size",1000,
+      "Provide the maximum number of partitions of a table that will be batched together during  \n"
+        + "repl load. All the partitions in a batch will make a single metastore call to update the metadata. \n"
+        + "The data for these partitions will be copied before copying the metadata batch. "),
     LOCALSCRATCHDIR("hive.exec.local.scratchdir",
         "${system:java.io.tmpdir}" + File.separator + "${system:user.name}",
         "Local scratch space for Hive jobs"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index bef0a95..27e97b9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -475,7 +475,6 @@ public class TestReplicationScenarios {
   private Task getReplLoadRootTask(String sourceDb, String replicadb, boolean isIncrementalDump,
                                    Tuple tuple) throws Throwable {
     HiveConf confTemp = new HiveConf();
-    confTemp.set("hive.repl.enable.move.optimization", "true");
     Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     ReplicationMetricCollector metricCollector;
     if (isIncrementalDump) {
@@ -4022,8 +4021,7 @@ public class TestReplicationScenarios {
 
     String replDbName = dbName + "_replica";
     Tuple dump = replDumpDb(dbName);
-    run("REPL LOAD " + dbName + " INTO " + replDbName +
-            " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
     verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror);
 
     run(" use " + replDbName, driverMirror);
@@ -4056,8 +4054,7 @@ public class TestReplicationScenarios {
     verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver);
 
     Tuple incrementalDump = replDumpDb(dbName);
-    run("REPL LOAD " + dbName + " INTO " + replDbName +
-            " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
     verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
 
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
@@ -4073,8 +4070,7 @@ public class TestReplicationScenarios {
     verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver);
 
     incrementalDump = replDumpDb(dbName);
-    run("REPL LOAD " + dbName + " INTO " + replDbName +
-            " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
     verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
 
     verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 575eeab..3d22770 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -61,6 +61,7 @@ import java.util.List;
 import java.util.Collections;
 import java.util.Map;
 
+
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static org.junit.Assert.assertEquals;
@@ -161,25 +162,6 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   }
 
   @Test
-  public void testAcidTablesMoveOptimizationBootStrap() throws Throwable {
-    WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, primaryDbName,
-            Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
-    verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
-  }
-
-  @Test
-  public void testAcidTablesMoveOptimizationIncremental() throws Throwable {
-    WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, primaryDbName,
-            Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
-    WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, primaryDbName,
-            Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
-    verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true);
-  }
-
-  @Test
   /**
    * Testcase for getting immutable dataset dump, and its corresponding repl load.
    */
@@ -2301,131 +2283,155 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   public void testORCTableDistcpCopyWithCopyOnTarget() throws Throwable {
     //Distcp copy
     List<String> withClause = Arrays.asList(
-            "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'",
-            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
-            "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
-            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
-            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
-                    + UserGroupInformation.getCurrentUser().getUserName() + "'");
+      "'" + HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET.varname + "'='true'",
+      "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+      "'" + HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname + "'='false'",
+      "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+      "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+        + UserGroupInformation.getCurrentUser().getUserName() + "'");
     WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
-            .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
-                    " stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
-                    " stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
-                    " stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
-                    " stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
-            .run("insert into t1 values (1)")
-            .run("insert into t1 values (11)")
-            .run("insert into t2 values (2)")
-            .run("insert into t2 values (22)")
-            .run("insert into t3 values (33)")
-            .run("insert into tpart1 partition(name='Tom') values(100)")
-            .run("insert into tpart1 partition(name='Jerry') values(101)")
-            .run("insert into tpart2 partition(name='Bob') values(200)")
-            .run("insert into tpart2 partition(name='Carl') values(201)")
-            .run("insert into text1 values ('ricky')")
-            .dump(primaryDbName, withClause);
+      .run("CREATE TABLE t1(a int) stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
+        " stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("CREATE TABLE t3(a string) clustered by (a) into 2 buckets" +
+        " stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("CREATE TABLE tpart1(a int) partitioned by (name string)" +
+        " stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("CREATE TABLE tpart2(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
+        " stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("CREATE TABLE text1(a string) STORED AS TEXTFILE")
+      .run("insert into t1 values (1)")
+      .run("insert into t1 values (11)")
+      .run("insert into t2 values (2)")
+      .run("insert into t2 values (22)")
+      .run("insert into t3 values (33)")
+      .run("insert into tpart1 partition(name='Tom') values(100)")
+      .run("insert into tpart1 partition(name='Jerry') values(101)")
+      .run("insert into tpart2 partition(name='Bob') values(200)")
+      .run("insert into tpart2 partition(name='Carl') values(201)")
+      .run("insert into text1 values ('ricky')")
+      .dump(primaryDbName, withClause);
 
     replica.run("DROP TABLE t3");
 
     replica.load(replicatedDbName, primaryDbName, withClause)
-            .run("use " + replicatedDbName)
-            .run("show tables")
-            .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"})
-            .run("select * from " + replicatedDbName + ".t1")
-            .verifyResults(new String[] {"1", "11"})
-            .run("select * from " + replicatedDbName + ".t2")
-            .verifyResults(new String[]{"2", "22"})
-            .run("select a from " + replicatedDbName + ".tpart1")
-            .verifyResults(new String[]{"100", "101"})
-            .run("show partitions " + replicatedDbName + ".tpart1")
-            .verifyResults(new String[]{"name=Tom", "name=Jerry"})
-            .run("select a from " + replicatedDbName + ".tpart2")
-            .verifyResults(new String[]{"200", "201"})
-            .run("show partitions " + replicatedDbName + ".tpart2")
-            .verifyResults(new String[]{"name=Bob", "name=Carl"})
-            .run("select a from " + replicatedDbName + ".text1")
-            .verifyResults(new String[]{"ricky"});
+      .run("use " + replicatedDbName)
+      .run("show tables")
+      .verifyResults(new String[]{"t1", "t2", "t3", "tpart1", "tpart2", "text1"})
+      .run("select * from " + replicatedDbName + ".t1")
+      .verifyResults(new String[]{"1", "11"})
+      .run("select * from " + replicatedDbName + ".t2")
+      .verifyResults(new String[]{"2", "22"})
+      .run("select a from " + replicatedDbName + ".tpart1")
+      .verifyResults(new String[]{"100", "101"})
+      .run("show partitions " + replicatedDbName + ".tpart1")
+      .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+      .run("select a from " + replicatedDbName + ".tpart2")
+      .verifyResults(new String[]{"200", "201"})
+      .run("show partitions " + replicatedDbName + ".tpart2")
+      .verifyResults(new String[]{"name=Bob", "name=Carl"})
+      .run("select a from " + replicatedDbName + ".text1")
+      .verifyResults(new String[]{"ricky"});
 
     WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
-            .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
-                    " stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
-                    " stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
-                    " stored as orc TBLPROPERTIES ('transactional'='true')")
-            .run("insert into t1 values (111)")
-            .run("insert into t2 values (222)")
-            .run("insert into t4 values (4)")
-            .run("insert into tpart1 partition(name='Tom') values(102)")
-            .run("insert into tpart1 partition(name='Jerry') values(103)")
-            .run("insert into tpart2 partition(name='Bob') values(202)")
-            .run("insert into tpart2 partition(name='Carl') values(203)")
-            .run("insert into tpart3 partition(name='Tom3') values(300)")
-            .run("insert into tpart3 partition(name='Jerry3') values(301)")
-            .run("insert into tpart4 partition(name='Bob4') values(400)")
-            .run("insert into tpart4 partition(name='Carl4') values(401)")
-            .run("insert into text1 values ('martin')")
-            .dump(primaryDbName, withClause);
+      .run("CREATE TABLE t4(a int) clustered by (a) into 2 buckets" +
+        " stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("CREATE TABLE tpart3(a int) partitioned by (name string)" +
+        " stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("CREATE TABLE tpart4(a int) partitioned by (name string) clustered by (a) into 2 buckets" +
+        " stored as orc TBLPROPERTIES ('transactional'='true')")
+      .run("insert into t1 values (111)")
+      .run("insert into t2 values (222)")
+      .run("insert into t4 values (4)")
+      .run("insert into tpart1 partition(name='Tom') values(102)")
+      .run("insert into tpart1 partition(name='Jerry') values(103)")
+      .run("insert into tpart2 partition(name='Bob') values(202)")
+      .run("insert into tpart2 partition(name='Carl') values(203)")
+      .run("insert into tpart3 partition(name='Tom3') values(300)")
+      .run("insert into tpart3 partition(name='Jerry3') values(301)")
+      .run("insert into tpart4 partition(name='Bob4') values(400)")
+      .run("insert into tpart4 partition(name='Carl4') values(401)")
+      .run("insert into text1 values ('martin')")
+      .dump(primaryDbName, withClause);
 
     replica.load(replicatedDbName, primaryDbName, withClause)
-            .run("use " + replicatedDbName)
-            .run("show tables ")
-            .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"})
-            .run("select * from " + replicatedDbName + ".t1")
-            .verifyResults(new String[] {"1", "11", "111"})
-            .run("select * from " + replicatedDbName + ".t2")
-            .verifyResults(new String[]{"2", "22", "222"})
-            .run("select * from " + replicatedDbName + ".t4")
-            .verifyResults(new String[]{"4"})
-            .run("select a from " + replicatedDbName + ".tpart1")
-            .verifyResults(new String[]{"100", "101", "102", "103"})
-            .run("show partitions " + replicatedDbName + ".tpart1")
-            .verifyResults(new String[]{"name=Tom", "name=Jerry"})
-            .run("select a from " + replicatedDbName + ".tpart2")
-            .verifyResults(new String[]{"200", "201", "202", "203"})
-            .run("show partitions " + replicatedDbName + ".tpart2")
-            .verifyResults(new String[]{"name=Bob", "name=Carl"})
-            .run("select a from " + replicatedDbName + ".tpart3")
-            .verifyResults(new String[]{"300", "301"})
-            .run("show partitions " + replicatedDbName + ".tpart3")
-            .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
-            .run("select a from " + replicatedDbName + ".tpart4")
-            .verifyResults(new String[]{"400", "401"})
-            .run("show partitions " + replicatedDbName + ".tpart4")
-            .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
-            .run("select a from " + replicatedDbName + ".text1")
-            .verifyResults(new String[]{"ricky", "martin"});
+      .run("use " + replicatedDbName)
+      .run("show tables ")
+      .verifyResults(new String[]{"t1", "t2", "t4", "tpart1", "tpart2", "tpart3", "tpart4", "text1"})
+      .run("select * from " + replicatedDbName + ".t1")
+      .verifyResults(new String[]{"1", "11", "111"})
+      .run("select * from " + replicatedDbName + ".t2")
+      .verifyResults(new String[]{"2", "22", "222"})
+      .run("select * from " + replicatedDbName + ".t4")
+      .verifyResults(new String[]{"4"})
+      .run("select a from " + replicatedDbName + ".tpart1")
+      .verifyResults(new String[]{"100", "101", "102", "103"})
+      .run("show partitions " + replicatedDbName + ".tpart1")
+      .verifyResults(new String[]{"name=Tom", "name=Jerry"})
+      .run("select a from " + replicatedDbName + ".tpart2")
+      .verifyResults(new String[]{"200", "201", "202", "203"})
+      .run("show partitions " + replicatedDbName + ".tpart2")
+      .verifyResults(new String[]{"name=Bob", "name=Carl"})
+      .run("select a from " + replicatedDbName + ".tpart3")
+      .verifyResults(new String[]{"300", "301"})
+      .run("show partitions " + replicatedDbName + ".tpart3")
+      .verifyResults(new String[]{"name=Tom3", "name=Jerry3"})
+      .run("select a from " + replicatedDbName + ".tpart4")
+      .verifyResults(new String[]{"400", "401"})
+      .run("show partitions " + replicatedDbName + ".tpart4")
+      .verifyResults(new String[]{"name=Bob4", "name=Carl4"})
+      .run("select a from " + replicatedDbName + ".text1")
+      .verifyResults(new String[]{"ricky", "martin"});
 
     incrementalDump = primary.run("use " + primaryDbName)
-            .run("insert into t4 values (44)")
-            .run("insert into t1 values (1111)")
-            .run("DROP TABLE t1")
-            .run("insert into t2 values (2222)")
-            .run("insert into tpart1 partition(name='Tom') values(104)")
-            .run("insert into tpart1 partition(name='Tom_del') values(1000)")
-            .run("insert into tpart1 partition(name='Harry') values(10001)")
-            .run("insert into tpart1 partition(name='Jerry') values(105)")
-            .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
-            .run("DROP TABLE tpart2")
-            .dump(primaryDbName, withClause);
+      .run("insert into t4 values (44)")
+      .run("insert into t1 values (1111)")
+      .run("DROP TABLE t1")
+      .run("insert into t2 values (2222)")
+      .run("insert into tpart1 partition(name='Tom') values(104)")
+      .run("insert into tpart1 partition(name='Tom_del') values(1000)")
+      .run("insert into tpart1 partition(name='Harry') values(10001)")
+      .run("insert into tpart1 partition(name='Jerry') values(105)")
+      .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom')")
+      .run("DROP TABLE tpart2")
+      .dump(primaryDbName, withClause);
 
     replica.run("DROP TABLE t4")
-            .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
+      .run("ALTER TABLE tpart1 DROP PARTITION(name='Tom_del')");
 
     replica.load(replicatedDbName, primaryDbName, withClause)
-            .run("use " + replicatedDbName)
-            .run("show tables ")
-            .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"})
-            .run("select * from " + replicatedDbName + ".t2")
-            .verifyResults(new String[]{"2", "22", "222", "2222"})
-            .run("select a from " + replicatedDbName + ".tpart1")
-            .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
-            .run("show partitions " + replicatedDbName + ".tpart1")
-            .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"});
+      .run("use " + replicatedDbName)
+      .run("show tables ")
+      .verifyResults(new String[]{"t2", "t4", "tpart1", "tpart3", "tpart4", "text1"})
+      .run("select * from " + replicatedDbName + ".t2")
+      .verifyResults(new String[]{"2", "22", "222", "2222"})
+      .run("select a from " + replicatedDbName + ".tpart1")
+      .verifyResults(new String[]{"101", "103", "105", "1000", "10001"})
+      .run("show partitions " + replicatedDbName + ".tpart1")
+      .verifyResults(new String[]{"name=Harry", "name=Jerry", "name=Tom_del"});
+  }
+
+  @Test
+  public void testTableWithPartitionsInBatch() throws Throwable {
+
+    List<String> withClause = new ArrayList<>();
+    withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE.varname + "'='" + 1 + "'");
+
+    primary.run("use " + primaryDbName)
+      .run("create table t2 (place string) partitioned by (country string)")
+      .run("insert into t2 partition(country='india') values ('bangalore')")
+      .run("insert into t2 partition(country='france') values ('paris')")
+      .run("insert into t2 partition(country='australia') values ('sydney')")
+      .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+      .run("use " + replicatedDbName)
+      .run("show tables like 't2'")
+      .verifyResults(new String[] { "t2" })
+      .run("select distinct(country) from t2")
+      .verifyResults(new String[] { "india", "france", "australia" })
+      .run("select place from t2")
+      .verifyResults(new String[] { "bangalore", "paris", "sydney" })
+      .verifyReplTargetProperty(replicatedDbName);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index dbdee9d..e01c6c4 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -453,6 +453,34 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
   }
 
   @Test
+  public void testMultipleStagesOfReplicationLoadTaskWithPartitionBatching() throws Throwable {
+    WarehouseInstance.Tuple tuple = primary
+      .run("use " + primaryDbName)
+      .run("create table t1 (id int)")
+      .run("insert into t1 values (1), (2)")
+      .run("create table t2 (place string) partitioned by (country string)")
+      .run("insert into table t2 partition(country='india') values ('bangalore')")
+      .run("insert into table t2 partition(country='us') values ('austin')")
+      .run("insert into table t2 partition(country='france') values ('paris')")
+      .run("create table t3 (rank int)")
+      .dump(primaryDbName);
+
+    // each table creation itself takes more than one task, give we are giving a max of 1, we should hit multiple runs.
+    List<String> withClause = new ArrayList<>();
+    withClause.add("'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
+    withClause.add("'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE.varname + "'='1'");
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+      .run("use " + replicatedDbName)
+      .run("show tables")
+      .verifyResults(new String[] { "t1", "t2", "t3" })
+      .run("repl status " + replicatedDbName)
+      .verifyResult(tuple.lastReplicationId)
+      .run("select country from t2 order by country")
+      .verifyResults(new String[] { "france", "india", "us" });
+  }
+
+  @Test
   public void testParallelExecutionOfReplicationBootStrapLoad() throws Throwable {
     WarehouseInstance.Tuple tuple = primary
         .run("use " + primaryDbName)
@@ -1337,12 +1365,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     // Inject a behavior where REPL LOAD failed when try to load table "t2" and partition "uk".
     // So, table "t2" will exist and partition "india" will exist, rest failed as operation failed.
-    BehaviourInjection<List<Partition>, Boolean> alterPartitionStub
+    BehaviourInjection<List<Partition>, Boolean> addPartitionStub
             = new BehaviourInjection<List<Partition>, Boolean>() {
       @Override
       public Boolean apply(List<Partition> ptns) {
         for (Partition ptn : ptns) {
-          if (ptn.getValues().get(0).equals("india")) {
+          if (ptn.getValues().get(0).equals("uk")) {
             injectionPathCalled = true;
             LOG.warn("####getPartition Stub called");
             return false;
@@ -1351,14 +1379,15 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         return true;
       }
     };
-    InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(alterPartitionStub);
+    InjectableBehaviourObjectStore.setAddPartitionsBehaviour(addPartitionStub);
 
     // Make sure that there's some order in which the objects are loaded.
     List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
-            "'hive.in.repl.test.files.sorted'='true'");
+            "'hive.in.repl.test.files.sorted'='true'",
+      "'" + HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE + "' = '1'");
     replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
-    InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(null); // reset the behaviour
-    alterPartitionStub.assertInjectionsPerformed(true, false);
+    InjectableBehaviourObjectStore.resetAddPartitionModifier(); // reset the behaviour
+    addPartitionStub.assertInjectionsPerformed(true, false);
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
@@ -1420,21 +1449,19 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
   @Test
   public void testMoveOptimizationIncrementalFailureAfterCopyReplace() throws Throwable {
-    List<String> withConfigs =
-        Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
     String replicatedDbName_CM = replicatedDbName + "_CM";
     WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
             .run("create table t2 (place string) partitioned by (country string)")
             .run("insert into table t2 partition(country='india') values ('bangalore')")
             .run("create table t1 (place string) partitioned by (country string)")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, primaryDbName, withConfigs);
+    replica.load(replicatedDbName, primaryDbName);
     //delete load ack to reuse the dump
     new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
             + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
             + LOAD_ACKNOWLEDGEMENT.toString()), true);
 
-    replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
+    replica.load(replicatedDbName_CM, primaryDbName);
     replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
 
@@ -1448,18 +1475,16 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
   @Test
   public void testMoveOptimizationIncrementalFailureAfterCopy() throws Throwable {
-    List<String> withConfigs =
-        Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
     String replicatedDbName_CM = replicatedDbName + "_CM";
     WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
             .run("create table t2 (place string) partitioned by (country string)")
             .run("ALTER TABLE t2 ADD PARTITION (country='india')")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, primaryDbName, withConfigs);
+    replica.load(replicatedDbName, primaryDbName);
     //delete load ack to reuse the dump
     new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation
             + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true);
-    replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
+    replica.load(replicatedDbName_CM, primaryDbName);
     replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
 
@@ -1472,9 +1497,6 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
   private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM,
                                     String tbl,  String eventType, WarehouseInstance.Tuple tuple) throws Throwable {
-    List<String> withConfigs =
-        Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
-
     // fail add notification for given event type.
     BehaviourInjection<NotificationEvent, Boolean> callerVerifier
             = new BehaviourInjection<NotificationEvent, Boolean>() {
@@ -1493,13 +1515,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
     try {
-      replica.loadFailure(replicaDb, primaryDbName, withConfigs);
+      replica.loadFailure(replicaDb, primaryDbName);
     } finally {
       InjectableBehaviourObjectStore.resetAddNotificationModifier();
     }
 
     callerVerifier.assertInjectionsPerformed(true, false);
-    replica.load(replicaDb, primaryDbName, withConfigs);
+    replica.load(replicaDb, primaryDbName);
 
     replica.run("use " + replicaDb)
             .run("select country from " + tbl + " where country == 'india'")
@@ -1516,13 +1538,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs);
+      replica.loadFailure(replicatedDbName_CM, primaryDbName);
     } finally {
       InjectableBehaviourObjectStore.resetAddNotificationModifier();
     }
 
     callerVerifier.assertInjectionsPerformed(true, false);
-    replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
+    replica.load(replicatedDbName_CM, primaryDbName);
 
     replica.run("use " + replicatedDbName_CM)
             .run("select country from " + tbl + " where country == 'india'")
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java
index c4b2dab..d61c575 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/partition/add/AlterTableAddPartitionDesc.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.ddl.table.partition.add;
 
 import java.io.Serializable;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -105,6 +106,12 @@ public class AlterTableAddPartitionDesc implements DDLDescWithWriteId, Serializa
       return params;
     }
 
+    public void addPartParams(Map<String, String> partParams) {
+      if (params != null) {
+        params.putAll(partParams);
+      }
+    }
+
     @Explain(displayName = "params", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
     public String getPartParamsForExplain() {
       return params.toString();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 3b07b73..57ce1aa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -33,10 +32,8 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.Serializable;
-import java.io.FileNotFoundException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.ListIterator;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,7 +47,6 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.util.StringUtils;
 
 import static org.apache.hadoop.hive.common.FileUtils.HIDDEN_FILES_PATH_FILTER;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
 
 public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
 
@@ -142,7 +138,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
 
       LOG.debug("ReplCopyTask numFiles: {}", srcFiles.size());
 
-      // in case of move optimization, file is directly copied to destination. So we need to clear the old content, if
+      // in case of acid tables, file is directly copied to destination. So we need to clear the old content, if
       // its a replace (insert overwrite ) operation.
       if (work.getDeleteDestIfExist() && dstFs.exists(toPath)) {
         LOG.debug(" path " + toPath + " is cleaned before renaming");
@@ -226,52 +222,17 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
 
   public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
                                         HiveConf conf, boolean isAutoPurge, boolean needRecycle,
-                                        boolean readSourceAsFileList) {
-    return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle,
-            readSourceAsFileList, false);
-  }
-
-  public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
-                                        HiveConf conf, boolean isAutoPurge, boolean needRecycle,
                                         boolean readSourceAsFileList, String dumpDirectory,
                                         ReplicationMetricCollector metricCollector) {
     return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, isAutoPurge, needRecycle,
-            readSourceAsFileList, false, dumpDirectory, metricCollector);
+            readSourceAsFileList, false, true, dumpDirectory, metricCollector);
   }
 
-  private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
-                                        HiveConf conf, boolean isAutoPurge, boolean needRecycle,
-                                        boolean readSourceAsFileList,
-                                        boolean overWrite) {
-    Task<?> copyTask = null;
-    LOG.debug("ReplCopyTask:getLoadCopyTask: {}=>{}", srcPath, dstPath);
-    if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
-      ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite);
-      rcwork.setReadSrcAsFilesList(readSourceAsFileList);
-      if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) {
-        rcwork.setDeleteDestIfExist(true);
-        rcwork.setAutoPurge(isAutoPurge);
-        rcwork.setNeedRecycle(needRecycle);
-      }
-      // For replace case, duplicate check should not be done. The new base directory will automatically make the older
-      // data invisible. Doing duplicate check and ignoring copy will cause consistency issue if there are multiple
-      // replace events getting replayed in the first incremental load.
-      rcwork.setCheckDuplicateCopy(replicationSpec.needDupCopyCheck() && !replicationSpec.isReplace());
-      LOG.debug("ReplCopyTask:\trcwork");
-      String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
-      rcwork.setDistCpDoAsUser(distCpDoAsUser);
-      copyTask = TaskFactory.get(rcwork, conf);
-    } else {
-      LOG.debug("ReplCopyTask:\tcwork");
-      copyTask = TaskFactory.get(new CopyWork(srcPath, dstPath, false), conf);
-    }
-    return copyTask;
-  }
 
   private static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
                                          HiveConf conf, boolean isAutoPurge, boolean needRecycle,
                                          boolean readSourceAsFileList,
-                                         boolean overWrite,
+                                         boolean overWrite, boolean deleteDestination,
                                          String dumpDirectory,
                                          ReplicationMetricCollector metricCollector) {
     Task<?> copyTask = null;
@@ -280,7 +241,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
       ReplCopyWork rcwork = new ReplCopyWork(srcPath, dstPath, false, overWrite, dumpDirectory,
               metricCollector);
       rcwork.setReadSrcAsFilesList(readSourceAsFileList);
-      if (replicationSpec.isReplace() && (conf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION))) {
+      if (replicationSpec.isReplace() && deleteDestination) {
         rcwork.setDeleteDestIfExist(true);
         rcwork.setAutoPurge(isAutoPurge);
         rcwork.setNeedRecycle(needRecycle);
@@ -300,35 +261,41 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
     return copyTask;
   }
 
-
-  public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
-                                        HiveConf conf) {
-    return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
-      true, false);
-  }
-
   public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
                                         HiveConf conf, String dumpDirectory, ReplicationMetricCollector metricCollector) {
     return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
-            true, false, dumpDirectory, metricCollector);
+      true, false, true, dumpDirectory, metricCollector);
   }
 
-
   /*
    * Invoked in the bootstrap path.
    * Overwrite set to true
    */
   public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
-                                          HiveConf conf, boolean readSourceAsFileList, boolean overWrite) {
+                                        HiveConf conf, boolean readSourceAsFileList, boolean overWrite,
+                                        String dumpDirectory, ReplicationMetricCollector metricCollector) {
     return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
-            readSourceAsFileList, overWrite);
+      readSourceAsFileList, overWrite, true, dumpDirectory, metricCollector);
   }
 
-  public static Task<?> getLoadCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
+  /*
+   * Invoked in the bootstrap dump path. bootstrap dump purge is false.
+   * No purge for dump dir in case of check pointing
+   */
+  public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
                                         HiveConf conf, boolean readSourceAsFileList, boolean overWrite,
-                                        String dumpDirectory, ReplicationMetricCollector metricCollector) {
+                                        boolean deleteDestination, String dumpDirectory,
+                                        ReplicationMetricCollector metricCollector) {
+    return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
+      readSourceAsFileList, overWrite, deleteDestination, dumpDirectory, metricCollector);
+  }
+
+
+  public static Task<?> getDumpCopyTask(ReplicationSpec replicationSpec, Path srcPath, Path dstPath,
+                                        HiveConf conf, String dumpDirectory,
+                                        ReplicationMetricCollector metricCollector) {
     return getLoadCopyTask(replicationSpec, srcPath, dstPath, conf, false, false,
-            readSourceAsFileList, overWrite, dumpDirectory, metricCollector);
+      true, false, true, dumpDirectory, metricCollector);
   }
 
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index ea9bf9a..1fce791 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -163,6 +163,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         }
         Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot);
         boolean isBootstrap = (previousValidHiveDumpPath == null);
+        work.setBootstrap(isBootstrap);
         //If no previous dump is present or previous dump is already loaded, proceed with the dump operation.
         if (shouldDump(previousValidHiveDumpPath)) {
           Path currentDumpPath = getCurrentDumpPath(dumpRoot, isBootstrap);
@@ -524,6 +525,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     // waiting for the concurrent transactions to finish, we start dumping the incremental events
     // and wait only for the remaining time if any.
     if (needBootstrapAcidTablesDuringIncrementalDump()) {
+      work.setBootstrap(true);
       bootDumpBeginReplId = queryState.getConf().getLong(ReplUtils.LAST_REPL_ID_KEY, -1L);
       assert (bootDumpBeginReplId >= 0);
       LOG.info("Dump for bootstrapping ACID tables during an incremental dump for db {}",
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 4da5bac..aecfe75 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -51,6 +51,7 @@ public class ReplDumpWork implements Serializable {
   final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath;
   Long eventTo;
   Long eventFrom;
+  private boolean isBootstrap;
   private static String testInjectDumpDir = null;
   private static boolean testInjectDumpDirAutoIncrement = false;
   static boolean testDeletePreviousDumpMetaPath = false;
@@ -134,6 +135,10 @@ public class ReplDumpWork implements Serializable {
     }
   }
 
+  void setBootstrap(boolean bootstrap) {
+    isBootstrap = bootstrap;
+  }
+
   public void setExternalTblCopyPathIterator(Iterator<String> externalTblCopyPathIterator) {
     if (this.externalTblCopyPathIterator != null) {
       throw new IllegalStateException("External table copy path iterator has already been initialized");
@@ -204,10 +209,12 @@ public class ReplDumpWork implements Serializable {
       replSpec.setInReplicationScope(true);
       EximUtil.DataCopyPath managedTableCopyPath = new EximUtil.DataCopyPath(replSpec);
       managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next());
-      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+      //If its incremental, in checkpointing case, dump dir may exist. We will delete the event dir.
+      //In case of bootstrap checkpointing we will not delete the entire dir and just do a sync
+      Task<?> copyTask = ReplCopyTask.getDumpCopyTask(
               managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(),
-              managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite,
-              getCurrentDumpPath().toString(), getMetricCollector());
+              managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite, !isBootstrap,
+        getCurrentDumpPath().toString(), getMetricCollector());
       tasks.add(copyTask);
       tracker.addTask(copyTask);
       LOG.debug("added task for {}", managedTableCopyPath);
@@ -220,9 +227,9 @@ public class ReplDumpWork implements Serializable {
     if (functionCopyPathIterator != null) {
       while (functionCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
         EximUtil.DataCopyPath binaryCopyPath = functionCopyPathIterator.next();
-        Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+        Task<?> copyTask = ReplCopyTask.getDumpCopyTask(
                 binaryCopyPath.getReplicationSpec(), binaryCopyPath.getSrcPath(), binaryCopyPath.getTargetPath(), conf,
-                getCurrentDumpPath().toString(), getMetricCollector()
+          getCurrentDumpPath().toString(), getMetricCollector()
         );
         tasks.add(copyTask);
         tracker.addTask(copyTask);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 16c906c..e7245bd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -371,7 +371,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     TableContext tableContext = new TableContext(dbTracker, work.dbNameToLoadIn);
     LoadPartitions loadPartitions =
         new LoadPartitions(loadContext, iterator.replLogger(), tableContext, loadTaskTracker,
-        event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector());
+        event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), work.getMetricCollector(),
+          event.lastPartSpecReplicated(), event.lastStageReplicated());
         /*
              the tableTracker here should be a new instance and not an existing one as this can
              only happen when we break in between loading partitions.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
index b9d6679..f5ab30c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/PartitionEvent.java
@@ -18,9 +18,14 @@
 package org.apache.hadoop.hive.ql.exec.repl.bootstrap.events;
 
 import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
+import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 
 public interface PartitionEvent extends TableEvent {
   AlterTableAddPartitionDesc lastPartitionReplicated();
 
+  ReplicationState.PartitionState.Stage lastStageReplicated();
+
+  AlterTableAddPartitionDesc.PartitionDesc lastPartSpecReplicated();
+
   TableEvent asTableEvent();
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
index 2d82408..c7705e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -52,6 +52,18 @@ public class FSPartitionEvent implements PartitionEvent {
   }
 
   @Override
+  public ReplicationState.PartitionState.Stage lastStageReplicated() {
+    assert replicationState != null && replicationState.partitionState != null;
+    return replicationState.partitionState.stage;
+  }
+
+  @Override
+  public AlterTableAddPartitionDesc.PartitionDesc lastPartSpecReplicated() {
+    assert replicationState != null && replicationState.partitionState != null;
+    return replicationState.partitionState.partSpec;
+  }
+
+  @Override
   public TableEvent asTableEvent() {
     return tableEvent;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
index a67184d..d8c995c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/ReplicationState.java
@@ -20,16 +20,34 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load;
 import java.io.Serializable;
 
 import org.apache.hadoop.hive.ql.ddl.table.partition.add.AlterTableAddPartitionDesc;
+import org.apache.hadoop.hive.ql.exec.CopyTask;
+import org.apache.hadoop.hive.ql.exec.MoveTask;
 
 public class ReplicationState implements Serializable {
 
   public static class PartitionState {
     final String tableName;
     public final AlterTableAddPartitionDesc lastReplicatedPartition;
+    public AlterTableAddPartitionDesc.PartitionDesc partSpec;
+    public Stage stage;
+
+    public enum Stage {
+      COPY,
+      PARTITION
+    }
 
     public PartitionState(String tableName, AlterTableAddPartitionDesc lastReplicatedPartition) {
       this.tableName = tableName;
       this.lastReplicatedPartition = lastReplicatedPartition;
+      this.stage = Stage.PARTITION;
+    }
+
+    public PartitionState(String tableName, AlterTableAddPartitionDesc lastReplicatedPartition,
+                          AlterTableAddPartitionDesc.PartitionDesc lastProcessedPartSpec, Stage stage) {
+      this.tableName = tableName;
+      this.lastReplicatedPartition = lastReplicatedPartition;
+      this.partSpec = lastProcessedPartSpec;
+      this.stage = stage;
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index e0c9b96..bcbf20c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -30,7 +30,6 @@ import org.apache.hadoop.hive.ql.ddl.table.partition.drop.AlterTableDropPartitio
 import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.ReplLoadOpType;
@@ -38,8 +37,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
-import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -50,10 +47,6 @@ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
-import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
-import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
-import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.datanucleus.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,7 +61,6 @@ import java.util.Map;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
 import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState.PartitionState;
 import static org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils.REPL_CHECKPOINT_KEY;
 import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
@@ -83,6 +75,8 @@ public class LoadPartitions {
   private final TableEvent event;
   private final TaskTracker tracker;
   private final AlterTableAddPartitionDesc lastReplicatedPartition;
+  private final AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc;
+  private final PartitionState.Stage lastReplicatedStage;
   private final ReplicationMetricCollector metricCollector;
 
   private final ImportTableDesc tableDesc;
@@ -92,23 +86,26 @@ public class LoadPartitions {
                         TableEvent event, String dbNameToLoadIn,
                         TableContext tableContext, ReplicationMetricCollector metricCollector) throws HiveException {
     this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null,
-      metricCollector);
+      metricCollector, null, PartitionState.Stage.PARTITION);
   }
 
   public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
                         TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
                         AlterTableAddPartitionDesc lastReplicatedPartition,
-                        ReplicationMetricCollector metricCollector) throws HiveException {
+                        ReplicationMetricCollector metricCollector,
+                        AlterTableAddPartitionDesc.PartitionDesc lastReplicatedPartitionDesc,
+                        ReplicationState.PartitionState.Stage lastReplicatedStage) throws HiveException {
     this.tracker = new TaskTracker(limiter);
     this.event = event;
     this.context = context;
     this.replLogger = replLogger;
     this.lastReplicatedPartition = lastReplicatedPartition;
     this.tableContext = tableContext;
-
     this.tableDesc = event.tableDesc(dbNameToLoadIn);
     this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
     this.metricCollector = metricCollector;
+    this.lastReplicatedPartitionDesc = lastReplicatedPartitionDesc;
+    this.lastReplicatedStage = lastReplicatedStage;
   }
 
   public TaskTracker tasks() throws Exception {
@@ -176,11 +173,29 @@ public class LoadPartitions {
    * Also, copy relevant stats and other information from original request.
    *
    * @throws SemanticException
+   * @param lastAlterTableAddPartitionDesc
    */
-  private void addConsolidatedPartitionDesc() throws Exception {
+  private void addConsolidatedPartitionDesc(AlterTableAddPartitionDesc lastAlterTableAddPartitionDesc) throws Exception {
+    int maxTasks = 0;
     //Load partitions equal to batch size at one go for metadata only and for external tables.
-    int maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE);
+    if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
+      maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_BATCH_SIZE);
+    } else {
+      maxTasks = context.hiveConf.getIntVar(HiveConf.ConfVars.REPL_LOAD_PARTITIONS_WITH_DATA_COPY_BATCH_SIZE);
+    }
     int currentPartitionCount = 0;
+    Iterator<AlterTableAddPartitionDesc> partitionIterator = event.partitionDescriptions(tableDesc).iterator();
+    //If already a set of partitions are processed as part of previous run, we skip those
+    if (lastAlterTableAddPartitionDesc != null) {
+      while (partitionIterator.hasNext()) {
+        currentPartitionCount++;
+        AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next();
+        if (lastAlterTableAddPartitionDesc.getPartitions().get(0).getPartSpec()
+          .equals(addPartitionDesc.getPartitions().get(0).getPartSpec())) {
+          break;
+        }
+      }
+    }
     List<AlterTableAddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
     int totalPartitionCount = partitionDescs.size();
     while (currentPartitionCount < totalPartitionCount) {
@@ -207,37 +222,28 @@ public class LoadPartitions {
         tableDesc.getTableName(), true, partitions);
 
       //don't need to add ckpt task separately. Added as part of add partition task
-      addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc, null);
-      if (partitions.size() > 0) {
-        LOG.info("Added {} partitions", partitions.size());
+      addPartition((toPartitionCount < totalPartitionCount), consolidatedPartitionDesc);
+      if (!tracker.canAddMoreTasks()) {
+        //No need to do processing as no more tasks can be added. Will be processed in next run. State is already
+        //updated in add partition task
+        return;
       }
       currentPartitionCount = toPartitionCount;
     }
   }
 
   private TaskTracker forNewTable() throws Exception {
-    if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-      // Place all partitions in single task to reduce load on HMS.
-      addConsolidatedPartitionDesc();
-      return tracker;
-    }
-
-    Iterator<AlterTableAddPartitionDesc> iterator = event.partitionDescriptions(tableDesc).iterator();
-    while (iterator.hasNext() && tracker.canAddMoreTasks()) {
-      AlterTableAddPartitionDesc currentPartitionDesc = iterator.next();
-      /*
-       the currentPartitionDesc cannot be inlined as we need the hasNext() to be evaluated post the
-       current retrieved lastReplicatedPartition
-      */
-      addPartition(iterator.hasNext(), currentPartitionDesc, null);
-    }
+    // Place all partitions in single task to reduce load on HMS.
+    addConsolidatedPartitionDesc(null);
     return tracker;
   }
 
-  private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+  private void addPartition(boolean hasMorePartitions, AlterTableAddPartitionDesc addPartitionDesc)
           throws Exception {
-    tracker.addTask(tasksForAddPartition(table, addPartitionDesc, ptnRootTask));
-    if (hasMorePartitions && !tracker.canAddMoreTasks()) {
+    boolean processingComplete = addTasksForPartition(table, addPartitionDesc, null);
+    //If processing is not complete, means replication state is already updated with copy tasks which need
+    //to be processed
+    if (processingComplete && hasMorePartitions && !tracker.canAddMoreTasks()) {
       ReplicationState currentReplicationState =
           new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc));
       updateReplicationState(currentReplicationState);
@@ -245,113 +251,56 @@ public class LoadPartitions {
   }
 
   /**
-   * returns the root task for adding a partition
+   * returns the root task for adding all partitions in a batch
    */
-  private Task<?> tasksForAddPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc, Task<?> ptnRootTask)
+  private boolean addTasksForPartition(Table table, AlterTableAddPartitionDesc addPartitionDesc,
+                                    AlterTableAddPartitionDesc.PartitionDesc lastPartSpec)
           throws MetaException, HiveException {
     Task<?> addPartTask = TaskFactory.get(
       new DDLWork(new HashSet<>(), new HashSet<>(), addPartitionDesc,
               true, (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector),
       context.hiveConf
     );
-    //checkpointing task already added as part of add batch of partition in case for metadata only and external tables
+    //checkpointing task already added as part of add batch of partition
     if (isMetaDataOp() || TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
-      if (ptnRootTask == null) {
-        ptnRootTask = addPartTask;
-      } else {
-        ptnRootTask.addDependentTask(addPartTask);
-      }
-      return ptnRootTask;
+      tracker.addTask(addPartTask);
+      return true;
     }
-
-    AlterTableAddPartitionDesc.PartitionDesc partSpec = addPartitionDesc.getPartitions().get(0);
-    Path sourceWarehousePartitionLocation = new Path(partSpec.getLocation());
-    Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
-    partSpec.setLocation(replicaWarehousePartitionLocation.toString());
-    LOG.debug("adding dependent CopyWork/AddPart/MoveWork for partition "
-      + partSpecToString(partSpec.getPartSpec()) + " with source location: "
-      + partSpec.getLocation());
-    Task<?> ckptTask = ReplUtils.getTableCheckpointTask(
-      tableDesc,
-      (HashMap<String, String>)partSpec.getPartSpec(),
-      context.dumpDirectory,
-      this.metricCollector,
-      context.hiveConf
-    );
-
-    Path stagingDir = replicaWarehousePartitionLocation;
-    // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
-    LoadFileType loadFileType;
-    if (event.replicationSpec().isInReplicationScope() &&
-            context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
-      loadFileType = LoadFileType.IGNORE;
-    } else {
-      loadFileType = event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING;
-      stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
-    }
-    boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
-    Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+    //Add Copy task for all partitions
+    boolean lastProcessedStageFound = false;
+    for (AlterTableAddPartitionDesc.PartitionDesc partSpec : addPartitionDesc.getPartitions()) {
+      if (!tracker.canAddMoreTasks()) {
+        //update replication state with the copy task added with which it needs to proceed next
+        ReplicationState currentReplicationState =
+          new ReplicationState(new PartitionState(table.getTableName(), addPartitionDesc,
+            partSpec, PartitionState.Stage.COPY));
+        updateReplicationState(currentReplicationState);
+        return false;
+      }
+      Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, partSpec);
+      partSpec.setLocation(replicaWarehousePartitionLocation.toString());
+      LOG.debug("adding dependent CopyWork for partition "
+        + partSpecToString(partSpec.getPartSpec()) + " with source location: "
+        + partSpec.getLocation());
+      if (!lastProcessedStageFound && lastPartSpec != null &&
+        lastPartSpec.getLocation() != partSpec.getLocation()) {
+        //Don't process copy task if already processed as part of previous run
+        continue;
+      }
+      lastProcessedStageFound = true;
+      boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_RUN_DATA_COPY_TASKS_ON_TARGET);
+      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
         event.replicationSpec(),
-        new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)),
-        stagingDir,
+        new Path(event.dataPath() + Path.SEPARATOR + Warehouse.makePartPath(partSpec.getPartSpec())),
+        replicaWarehousePartitionLocation,
         context.hiveConf, copyAtLoad, false, (new Path(context.dumpDirectory)).getParent().toString(),
         this.metricCollector
-    );
-
-    Task<?> movePartitionTask = null;
-    if (loadFileType != LoadFileType.IGNORE) {
-      // no need to create move task, if file is moved directly to target location.
-      movePartitionTask = movePartitionTask(table, partSpec, stagingDir, loadFileType);
-    }
-
-    if (ptnRootTask == null) {
-      ptnRootTask = copyTask;
-    } else {
-      ptnRootTask.addDependentTask(copyTask);
-    }
-
-    // Set Checkpoint task as dependant to the tail of add partition tasks. So, if same dump is
-    // retried for bootstrap, we skip current partition update.
-    copyTask.addDependentTask(addPartTask);
-    if (movePartitionTask != null) {
-      addPartTask.addDependentTask(movePartitionTask);
-      movePartitionTask.addDependentTask(ckptTask);
-    } else {
-      addPartTask.addDependentTask(ckptTask);
-    }
-    return ptnRootTask;
-  }
-
-  private String getPartitionName(Path partitionMetadataFullPath) {
-    //Get partition name by removing the metadata base path.
-    //Needed for getting the data path
-    return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length());
-  }
-
-  /**
-   * This will create the move of partition data from temp path to actual path
-   */
-  private Task<?> movePartitionTask(Table table, AlterTableAddPartitionDesc.PartitionDesc partSpec, Path tmpPath,
-                                    LoadFileType loadFileType) {
-    MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false,
-                                    (new Path(context.dumpDirectory)).getParent().toString(), this.metricCollector,
-                                     true);
-    if (AcidUtils.isTransactionalTable(table)) {
-      LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
-        Collections.singletonList(tmpPath),
-        Collections.singletonList(new Path(partSpec.getLocation())),
-        true, null, null);
-      moveWork.setMultiFilesDesc(loadFilesWork);
-    } else {
-      LoadTableDesc loadTableWork = new LoadTableDesc(
-              tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
-              loadFileType, 0L
       );
-      loadTableWork.setInheritTableSpecs(false);
-      moveWork.setLoadTableWork(loadTableWork);
+      tracker.addTask(copyTask);
     }
-    moveWork.setIsInReplicationScope(event.replicationSpec().isInReplicationScope());
-    return TaskFactory.get(moveWork, context.hiveConf);
+    //add partition metadata task once all the copy tasks are added
+    tracker.addDependentTask(addPartTask);
+    return true;
   }
 
   /**
@@ -415,24 +364,39 @@ public class LoadPartitions {
       Map<String, String> currentSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
       encounteredTheLastReplicatedPartition = lastReplicatedPartSpec.equals(currentSpec);
     }
-
+    //Add Copy task pending for previous partition
+    if (PartitionState.Stage.COPY.equals(lastReplicatedStage)) {
+      addTasksForPartition(table, lastPartitionReplicated,
+        lastReplicatedPartitionDesc);
+    }
+    boolean pendingPartitions = false;
     while (partitionIterator.hasNext() && tracker.canAddMoreTasks()) {
+      pendingPartitions = true;
       AlterTableAddPartitionDesc addPartitionDesc = partitionIterator.next();
-      Map<String, String> partSpec = addPartitionDesc.getPartitions().get(0).getPartSpec();
-      Task<?> ptnRootTask = null;
+      AlterTableAddPartitionDesc.PartitionDesc src = addPartitionDesc.getPartitions().get(0);
+      //Add check point task as part of add partition
+      Map<String, String> partParams = new HashMap<>();
+      partParams.put(REPL_CHECKPOINT_KEY, context.dumpDirectory);
+      Path replicaWarehousePartitionLocation = locationOnReplicaWarehouse(table, src);
+      src.setLocation(replicaWarehousePartitionLocation.toString());
+      src.addPartParams(partParams);
+      Map<String, String> partSpec = src.getPartSpec();
+
       ReplLoadOpType loadPtnType = getLoadPartitionType(partSpec);
       switch (loadPtnType) {
         case LOAD_NEW:
           break;
         case LOAD_REPLACE:
-          ptnRootTask = dropPartitionTask(table, partSpec);
+          tracker.addDependentTask(dropPartitionTask(table, partSpec));
           break;
         case LOAD_SKIP:
           continue;
         default:
           break;
       }
-      addPartition(partitionIterator.hasNext(), addPartitionDesc, ptnRootTask);
+    }
+    if (pendingPartitions) {
+      addConsolidatedPartitionDesc(lastPartitionReplicated);
     }
     return tracker;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 11a1036..fb31159 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -60,7 +60,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.TreeMap;
 
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
 import static org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer.isPartitioned;
 
 public class LoadTable {
@@ -270,10 +269,9 @@ public class LoadTable {
     Path dataPath = fromURI;
     Path tmpPath = tgtPath;
 
-    // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
+    // if acid tables, copy the files directly to the target path. No need to create the staging dir.
     LoadFileType loadFileType;
-    if (replicationSpec.isInReplicationScope() &&
-            context.hiveConf.getBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION)) {
+    if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) {
       loadFileType = LoadFileType.IGNORE;
     } else {
       loadFileType = (replicationSpec.isReplace())
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 5e05c73..0e7209d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -89,7 +89,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
 
 /**
  * ImportSemanticAnalyzer.
@@ -437,10 +436,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     boolean isSkipTrash = false;
     boolean needRecycle = false;
 
-    if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean(
-      REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
-      lft = LoadFileType.IGNORE;
-      destPath = loadPath = tgtPath;
+    if (replicationSpec.isInReplicationScope()) {
       isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
       if (table.isTemporary()) {
         needRecycle = false;
@@ -448,27 +444,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         org.apache.hadoop.hive.metastore.api.Database db = x.getHive().getDatabase(table.getDbName());
         needRecycle = db != null && ReplChangeManager.shouldEnableCm(db, table.getTTable());
       }
+    }
+    if (AcidUtils.isTransactionalTable(table)) {
+      String mmSubdir = replace ? AcidUtils.baseDir(writeId)
+              : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
+      destPath = new Path(tgtPath, mmSubdir);
+      /**
+       * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition
+       * directory, i.e. the final destination for these files.  This has to be a copy to preserve
+       * the archive.  MoveTask is optimized to do a 'rename' if files are on the same FileSystem.
+       * So setting 'loadPath' this way will make
+       * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean,
+       * boolean, Long, int)}
+       * skip the unnecessary file (rename) operation but it will perform other things.
+       */
+      loadPath = tgtPath;
+      lft = LoadFileType.KEEP_EXISTING;
     } else {
-      if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) {
-        String mmSubdir = replace ? AcidUtils.baseDir(writeId)
-                : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
-        destPath = new Path(tgtPath, mmSubdir);
-        /**
-         * CopyTask below will copy files from the 'archive' to a delta_x_x in the table/partition
-         * directory, i.e. the final destination for these files.  This has to be a copy to preserve
-         * the archive.  MoveTask is optimized to do a 'rename' if files are on the same FileSystem.
-         * So setting 'loadPath' this way will make
-         * {@link Hive#loadTable(Path, String, LoadFileType, boolean, boolean, boolean,
-         * boolean, Long, int)}
-         * skip the unnecessary file (rename) operation but it will perform other things.
-         */
-        loadPath = tgtPath;
-        lft = LoadFileType.KEEP_EXISTING;
-      } else {
-        destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
-        lft = replace ? LoadFileType.REPLACE_ALL :
-                LoadFileType.OVERWRITE_EXISTING;
-      }
+      destPath = loadPath = x.getCtx().getExternalTmpPath(tgtPath);
+      lft = replace ? LoadFileType.REPLACE_ALL :
+              LoadFileType.OVERWRITE_EXISTING;
     }
 
     if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -649,8 +644,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       LoadFileType loadFileType;
       Path destPath;
-      if (replicationSpec.isInReplicationScope() && (x.getCtx().getConf().getBoolean(
-        REPL_ENABLE_MOVE_OPTIMIZATION.varname, false))) {
+      if (replicationSpec.isInReplicationScope()) {
         loadFileType = LoadFileType.IGNORE;
         destPath = tgtLocation;
         isSkipTrash = MetaStoreUtils.isSkipTrash(table.getParameters());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index b73921c..4c10499 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
@@ -48,16 +47,12 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
 import java.io.IOException;
 import java.net.URI;
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
-import java.util.Base64;
 import java.util.List;
 import java.util.Collections;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
-import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
@@ -264,29 +259,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private boolean ifEnableMoveOptimization(Path filePath, org.apache.hadoop.conf.Configuration conf) throws Exception {
-    if (filePath == null) {
-      throw new HiveException("filePath cannot be null");
-    }
-
-    URI uri = filePath.toUri();
-    String scheme = uri.getScheme();
-    scheme = StringUtils.isBlank(scheme) ? FileSystem.get(uri, conf).getScheme() : scheme;
-    if (StringUtils.isBlank(scheme)) {
-      throw new HiveException("Cannot get valid scheme for " + filePath);
-    }
-
-    LOG.info("scheme is " + scheme);
-
-    String[] schmeList = conf.get(REPL_MOVE_OPTIMIZED_FILE_SCHEMES.varname).toLowerCase().split(",");
-    for (String schemeIter : schmeList) {
-      if (schemeIter.trim().equalsIgnoreCase(scheme.trim())) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   // REPL LOAD
   private void initReplLoad(ASTNode ast) throws HiveException {
     sourceDbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
@@ -363,17 +335,6 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     try {
       assert(sourceDbNameOrPattern != null);
       Path loadPath = getCurrentLoadPath();
-      // Ths config is set to make sure that in case of s3 replication, move is skipped.
-      try {
-        Warehouse wh = new Warehouse(conf);
-        Path filePath = wh.getWhRoot();
-        if (ifEnableMoveOptimization(filePath, conf)) {
-          conf.setBoolVar(REPL_ENABLE_MOVE_OPTIMIZATION, true);
-          LOG.info(" Set move optimization to true for warehouse " + filePath.toString());
-        }
-      } catch (Exception e) {
-        throw new SemanticException(e.getMessage(), e);
-      }
 
       // Now, the dumped path can be one of three things:
       // a) It can be a db dump, in which case we expect a set of dirs, each with a
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java
index faba6e4..ddf687b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/repl/load/message/TestPrimaryToReplicaResourceFunction.java
@@ -92,7 +92,7 @@ public class TestPrimaryToReplicaResourceFunction {
     mockStatic(ReplCopyTask.class);
     Task mock = mock(Task.class);
     when(ReplCopyTask.getLoadCopyTask(any(ReplicationSpec.class), any(Path.class), any(Path.class),
-        any(HiveConf.class))).thenReturn(mock);
+        any(HiveConf.class), any(), any())).thenReturn(mock);
 
     ResourceUri resourceUri = function.destinationResourceUri(new ResourceUri(ResourceType.JAR,
         "hdfs://localhost:9000/user/someplace/ab.jar#e094828883"));
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
index 8673186..b828d32 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
@@ -93,6 +93,8 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
 
   private static com.google.common.base.Function<List<Partition>, Boolean> alterPartitionsModifier = null;
 
+  private static com.google.common.base.Function<List<Partition>, Boolean> addPartitionsModifier = null;
+
   // Methods to set/reset getTable modifier
   public static void setGetTableBehaviour(com.google.common.base.Function<Table, Table> modifier){
     getTableModifier = (modifier == null) ? com.google.common.base.Functions.identity() : modifier;
@@ -154,10 +156,18 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
     setAlterTableModifier(null);
   }
 
+  public static void resetAddPartitionModifier() {
+    setAddPartitionsBehaviour(null);
+  }
+
   public static void setAlterPartitionsBehaviour(com.google.common.base.Function<List<Partition>, Boolean> modifier){
     alterPartitionsModifier = modifier;
   }
 
+  public static void setAddPartitionsBehaviour(com.google.common.base.Function<List<Partition>, Boolean> modifier){
+    addPartitionsModifier = modifier;
+  }
+
 
   // ObjectStore methods to be overridden with injected behavior
   @Override
@@ -317,4 +327,17 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
     }
     return super.alterPartitions(catName, dbname, name, part_vals, newParts, writeId, queryWriteIdList);
   }
+
+  @Override
+  public boolean addPartitions(String catName, String dbName, String tblName, List<Partition> parts)
+    throws InvalidObjectException, MetaException {
+    if (addPartitionsModifier != null) {
+      Boolean success = addPartitionsModifier.apply(parts);
+      if ((success != null) && !success) {
+        throw new MetaException("InjectableBehaviourObjectStore: Invalid addPartitions operation on Catalog : "
+          + catName + " DB: " + dbName + " table: " + tblName);
+      }
+    }
+    return super.addPartitions(catName, dbName, tblName, parts);
+  }
 }