You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/03/26 05:55:46 UTC

[hive] branch master updated: HIVE-22997: Copy external table to target during Repl Dump operation ( Pravin Kumar Sinha, reviewed by Aasha Medhi, Anishek Agarwal)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4a591b0  HIVE-22997: Copy external table to target during Repl Dump operation ( Pravin Kumar Sinha, reviewed by Aasha Medhi, Anishek Agarwal)
4a591b0 is described below

commit 4a591b0bf79a3e8c8592f2383f209788fd4f86d3
Author: Pravin Kumar Sinha <ma...@gmail.com>
AuthorDate: Thu Mar 26 11:25:27 2020 +0530

    HIVE-22997: Copy external table to target during Repl Dump operation ( Pravin Kumar Sinha, reviewed by Aasha Medhi, Anishek Agarwal)
---
 .../hadoop/hive/ql/parse/ReplicationTestUtils.java |  15 ++
 .../parse/TestReplicationOnHDFSEncryptedZones.java |  12 +-
 .../hive/ql/parse/TestReplicationScenarios.java    | 229 +++++++++++++++++-
 .../TestReplicationScenariosExternalTables.java    | 107 +++++----
 ...icationScenariosExternalTablesMetaDataOnly.java |   4 +-
 .../parse/TestScheduledReplicationScenarios.java   |  49 ++++
 .../parse/TestTableLevelReplicationScenarios.java  |  21 +-
 .../apache/hadoop/hive/ql/plan/api/StageType.java  |   8 +-
 .../apache/hadoop/hive/ql/exec/TaskFactory.java    |  13 +-
 .../{ReplLoadCompleteAckTask.java => AckTask.java} |  17 +-
 .../{ReplLoadCompleteAckWork.java => AckWork.java} |  19 +-
 .../hadoop/hive/ql/exec/repl/DirCopyTask.java      | 210 ++++++++++++++++
 .../hadoop/hive/ql/exec/repl/DirCopyWork.java      |  53 +++++
 .../ql/exec/repl/ExternalTableCopyTaskBuilder.java | 264 ---------------------
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 219 +++++++++++------
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java     |  82 +++++++
 .../hive/ql/exec/repl/ReplExternalTables.java      |  13 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  55 ++---
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  17 +-
 .../org/apache/hadoop/hive/ql/parse/EximUtil.java  |  47 +++-
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |  59 ++---
 .../hive/ql/parse/repl/dump/PartitionExport.java   |  15 +-
 .../hive/ql/parse/repl/dump/TableExport.java       |  23 +-
 .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java |   7 +-
 24 files changed, 999 insertions(+), 559 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
index a82bbad..e0c3ed2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
@@ -27,6 +27,7 @@ import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Arrays;
 import java.util.Set;
@@ -516,6 +517,20 @@ public class ReplicationTestUtils {
     );
   }
 
+  public static List<String> externalTableWithClause(List<String> externalTableBasePathWithClause, Boolean bootstrap,
+                                                     Boolean includeExtTbl) {
+    List<String> withClause = new ArrayList<>(externalTableBasePathWithClause);
+    if (bootstrap != null) {
+      withClause.add("'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES + "'='" + Boolean.toString(bootstrap)
+              + "'");
+    }
+    if (includeExtTbl != null) {
+      withClause.add("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES + "'='" + Boolean.toString(includeExtTbl)
+              + "'");
+    }
+    return withClause;
+  }
+
   public static void assertExternalFileInfo(WarehouseInstance primary,
                                       List<String> expected,
                                       Path externalTableInfoFile) throws IOException {
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
index f6a33bc..bed0235 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -36,7 +36,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_AGGREGATE_STATS_CACHE_ENABLED;
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
@@ -102,12 +104,20 @@ public class TestReplicationOnHDFSEncryptedZones {
           put(HiveConf.ConfVars.REPLDIR.varname, primary.repldDir);
         }}, "test_key123");
 
+    List<String> dumpWithClause = Arrays.asList(
+            "'hive.repl.add.raw.reserved.namespace'='true'",
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
+                    + replica.externalTableWarehouseRoot + "'",
+            "'distcp.options.skipcrccheck'=''",
+            "'" + HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() +"'");
     WarehouseInstance.Tuple tuple =
         primary.run("use " + primaryDbName)
             .run("create table encrypted_table (id int, value string)")
             .run("insert into table encrypted_table values (1,'value1')")
             .run("insert into table encrypted_table values (2,'value2')")
-            .dump(primaryDbName);
+            .dump(primaryDbName, dumpWithClause);
 
     replica
         .run("repl load " + primaryDbName + " into " + replicatedDbName
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 49027a3..efe9fff 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -92,7 +92,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -326,6 +325,58 @@ public class TestReplicationScenarios {
     verifyRun("SELECT * from " + replicatedDbName + ".unptned_empty", empty, driverMirror);
   }
 
+  @Test
+  public void testBootstrapFailedDump() throws IOException {
+    String name = testName.getMethodName();
+    String dbName = createDB(name, driver);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+
+    String[] unptnData = new String[]{"eleven", "twelve"};
+    String[] ptnData1 = new String[]{"thirteen", "fourteen", "fifteen"};
+    String[] ptnData2 = new String[]{"fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptnLocn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptnLocn1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptnLocn2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptnLocn, unptnData);
+    createTestDataFile(ptnLocn1, ptnData1);
+    createTestDataFile(ptnLocn2, ptnData2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver);
+
+    String replicatedDbName = dbName + "_dupe";
+
+
+    EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, true);
+    verifyFail("REPL DUMP " + dbName, driver);
+    advanceDumpDir();
+    EximUtil.ManagedTableCopyPath.setNullSrcPath(hconf, false);
+    Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replicatedDbName);
+    advanceDumpDir();
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
+    Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
+    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)));
+
+    verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptnData, driverMirror);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
+    verifyRun("SELECT a from " + replicatedDbName + ".ptned_empty", empty, driverMirror);
+    verifyRun("SELECT * from " + replicatedDbName + ".unptned_empty", empty, driverMirror);
+  }
+
   private abstract class checkTaskPresent {
     public boolean hasTask(Task rootTask) {
       if (rootTask == null) {
@@ -376,8 +427,7 @@ public class TestReplicationScenarios {
     confTemp.set("hive.repl.enable.move.optimization", "true");
     Path loadPath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     ReplLoadWork replLoadWork = new ReplLoadWork(confTemp, loadPath.toString(), replicadb,
-            null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId),
-        Collections.emptyList());
+            null, null, isIncrementalDump, Long.valueOf(tuple.lastReplId));
     Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
     replLoadTask.initialize(null, null, new TaskQueue(driver.getContext()), driver.getContext());
     replLoadTask.executeTask(null);
@@ -1457,20 +1507,20 @@ public class TestReplicationScenarios {
 
     run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
     run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName
-        + ".ptned PARTITION(b=1)", driver);
+            + ".ptned PARTITION(b=1)", driver);
     verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver);
     run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName
-        + ".ptned PARTITION(b=2)", driver);
+            + ".ptned PARTITION(b=2)", driver);
     verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver);
 
     run("CREATE TABLE " + dbName
-        + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
+            + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
     run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
-        + ".ptned WHERE b=1", driver);
+            + ".ptned WHERE b=1", driver);
     verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver);
 
     run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
-        + ".ptned WHERE b=2", driver);
+            + ".ptned WHERE b=2", driver);
     verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver);
 
     incrementalLoadAndVerify(dbName, replDbName);
@@ -1635,6 +1685,169 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testIncrementalLoadWithOneFailedDump() throws IOException {
+    String nameOfTest = "testIncrementalLoadWithOneFailedDump";
+    String dbName = createDB(nameOfTest, driver);
+    String replDbName = dbName + "_dupe";
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName
+            + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+
+    Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
+
+    String[] unptnData = new String[] {"eleven", "twelve"};
+    String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"};
+    String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[] {};
+
+    String unptnLocn = new Path(TEST_PATH, nameOfTest + "_unptn").toUri().getPath();
+    String ptnLocn1 = new Path(TEST_PATH, nameOfTest + "_ptn1").toUri().getPath();
+    String ptnLocn2 = new Path(TEST_PATH, nameOfTest + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptnLocn, unptnData);
+    createTestDataFile(ptnLocn1, ptnData1);
+    createTestDataFile(ptnLocn2, ptnData2);
+
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror);
+
+    run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver);
+    run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver);
+    run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver);
+
+    Tuple incrementalDump = replDumpDb(dbName);
+
+    //Remove the dump ack file, so that dump is treated as an invalid dump.
+    String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT;
+    Path dumpFinishedAckFilePath = new Path(incrementalDump.dumpLocation, ackFileRelativePath);
+    Path tmpDumpFinishedAckFilePath = new Path(dumpFinishedAckFilePath.getParent(),
+            "old_" + dumpFinishedAckFilePath.getName());
+    FileSystem fs  = FileSystem.get(new Path(incrementalDump.dumpLocation).toUri(), hconf);
+    fs.rename(dumpFinishedAckFilePath, tmpDumpFinishedAckFilePath);
+    loadAndVerify(replDbName, dbName, bootstrapDump.lastReplId);
+
+    fs.rename(tmpDumpFinishedAckFilePath, dumpFinishedAckFilePath);
+    //Repl Load should recover when it finds valid load
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
+
+    verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror);
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName
+            + ".ptned PARTITION(b=1)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName
+            + ".ptned PARTITION(b=2)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver);
+
+    run("CREATE TABLE " + dbName
+            + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
+            + ".ptned WHERE b=1", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver);
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
+            + ".ptned WHERE b=2", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver);
+
+    incrementalLoadAndVerify(dbName, replDbName);
+    verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
+  }
+
+  @Test
+  public void testIncrementalLoadWithPreviousDumpDeleteFailed() throws IOException {
+    String nameOfTest = "testIncrementalLoadWithPreviousDumpDeleteFailed";
+    String dbName = createDB(nameOfTest, driver);
+    String replDbName = dbName + "_dupe";
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName
+            + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+
+    Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
+
+    String[] unptnData = new String[] {"eleven", "twelve"};
+    String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"};
+    String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[] {};
+
+    String unptnLocn = new Path(TEST_PATH, nameOfTest + "_unptn").toUri().getPath();
+    String ptnLocn1 = new Path(TEST_PATH, nameOfTest + "_ptn1").toUri().getPath();
+    String ptnLocn2 = new Path(TEST_PATH, nameOfTest + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptnLocn, unptnData);
+    createTestDataFile(ptnLocn1, ptnData1);
+    createTestDataFile(ptnLocn2, ptnData2);
+
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror);
+
+    run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver);
+    run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver);
+    run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver);
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(true);
+
+    Tuple incrDump = replDumpDb(dbName);
+
+    // Delete some file except ack.
+    Path bootstrapDumpDir = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    String tablePath = dbName + File.separator + "unptned";
+    Path fileToDelete = new Path(bootstrapDumpDir, tablePath);
+    FileSystem fs = FileSystem.get(fileToDelete.toUri(), hconf);
+    fs.delete(fileToDelete, true);
+    assertTrue(fs.exists(bootstrapDumpDir));
+    assertTrue(fs.exists(new Path(bootstrapDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
+
+    loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+
+    verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror);
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName
+            + ".ptned PARTITION(b=1)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName
+            + ".ptned PARTITION(b=2)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver);
+
+    run("CREATE TABLE " + dbName
+            + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
+            + ".ptned WHERE b=1", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver);
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
+            + ".ptned WHERE b=2", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver);
+
+    ReplDumpWork.testDeletePreviousDumpMetaPath(false);
+
+    Path incrHiveDumpDir = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    incrDump = replDumpDb(dbName);
+    //This time delete previous dump dir should work fine.
+    assertFalse(FileSystem.get(fileToDelete.toUri(), hconf).exists(incrHiveDumpDir));
+    assertFalse(fs.exists(bootstrapDumpDir));
+    loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+    verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
+  }
+
+  @Test
   public void testIncrementalInsertToPartition() throws IOException {
     String testName = "incrementalInsertToPartition";
     String dbName = createDB(testName, driver);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 6372d26..1325789 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -138,6 +138,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
   @Test
   public void externalTableReplicationWithDefaultPaths() throws Throwable {
+    List<String> withClauseOptions = externalTableBasePathWithClause();
     //creates external tables with partitions
     WarehouseInstance.Tuple tuple = primary
         .run("use " + primaryDbName)
@@ -148,12 +149,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .run("insert into table t2 partition(country='india') values ('bangalore')")
         .run("insert into table t2 partition(country='us') values ('austin')")
         .run("insert into table t2 partition(country='france') values ('paris')")
-        .dumpWithCommand("repl dump " + primaryDbName);
+        .dump(primaryDbName, withClauseOptions);
 
     // verify that the external table info is written correctly for bootstrap
     assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName);
 
-    List<String> withClauseOptions = externalTableBasePathWithClause();
+
 
     replica.load(replicatedDbName, primaryDbName, withClauseOptions)
         .run("use " + replicatedDbName)
@@ -180,7 +181,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .run("create external table t3 (id int)")
         .run("insert into table t3 values (10)")
         .run("create external table t4 as select id from t3")
-        .dumpWithCommand("repl dump " + primaryDbName);
+        .dump(primaryDbName, withClauseOptions);
 
     // verify that the external table info is written correctly for incremental
     assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation);
@@ -244,7 +245,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     // Create base directory but use HDFS path without schema or authority details.
     // Hive should pick up the local cluster's HDFS schema/authority.
     externalTableBasePathWithClause();
-    List<String> loadWithClause = Arrays.asList(
+    List<String> withClause = Arrays.asList(
             "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='"
                     + REPLICA_EXTERNAL_BASE + "'",
             "'distcp.options.update'=''"
@@ -254,9 +255,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .run("create external table a (i int, j int) "
             + "row format delimited fields terminated by ',' "
             + "location '" + externalTableLocation.toUri() + "'")
-        .dump(primaryDbName);
+        .dump(primaryDbName, withClause);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("show tables like 'a'")
         .verifyResults(Collections.singletonList("a"))
@@ -270,11 +271,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
       outputStream.write("1,2\n".getBytes());
       outputStream.write("13,21\n".getBytes());
     }
+    WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
+        .dump(primaryDbName, withClause);
 
-    primary.run("create table b (i int)")
-        .dump(primaryDbName);
-
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("select i From a")
         .verifyResults(new String[] { "1", "13" })
         .run("select j from a")
@@ -285,9 +285,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
     primary.run("use " + primaryDbName)
         .run("alter table a set location '" + externalTableLocation + "'")
-        .dump(primaryDbName);
+        .dump(primaryDbName, withClause);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("select i From a")
         .verifyResults(Collections.emptyList());
@@ -301,18 +301,18 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
     fs.mkdirs(externalTableLocation, new FsPermission("777"));
 
-    List<String> loadWithClause = externalTableBasePathWithClause();
+    List<String> withClause = externalTableBasePathWithClause();
 
     WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
         .run("create external table t2 (place string) partitioned by (country string) row format "
             + "delimited fields terminated by ',' location '" + externalTableLocation.toString()
             + "'")
         .run("insert into t2 partition(country='india') values ('bangalore')")
-        .dumpWithCommand("repl dump " + primaryDbName);
+        .dump(primaryDbName, withClause);
 
     assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't2'")
         .verifyResults(new String[] { "t2" })
@@ -331,11 +331,11 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
     tuple = primary.run("use " + primaryDbName)
         .run("insert into t2 partition(country='australia') values ('sydney')")
-        .dump(primaryDbName);
+        .dump(primaryDbName, withClause);
 
     assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("select distinct(country) from t2")
         .verifyResults(new String[] { "india", "australia" })
@@ -360,9 +360,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     primary.run("use " + primaryDbName)
         .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation
             .toString() + "'")
-        .dump(primaryDbName);
+        .dump(primaryDbName, withClause);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
         .verifyResults(new String[] { "paris" })
@@ -376,9 +376,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
     primary.run("use " + primaryDbName)
         .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
-        .dump(primaryDbName);
+        .dump(primaryDbName, withClause);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
         .verifyResults(new String[] {})
@@ -396,17 +396,17 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     primary.run("use " + primaryDbName)
             .run("insert into table t2 partition(country='france') values ('lyon')")
             .run("alter table t2 set location '" + tmpLocation2 + "'")
-            .dump(primaryDbName);
+            .dump(primaryDbName, withClause);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
     assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
   }
 
   @Test
   public void externalTableIncrementalReplication() throws Throwable {
-    WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName);
+    List<String> withClause = externalTableBasePathWithClause();
+    WarehouseInstance.Tuple tuple = primary.dump(primaryDbName, withClause);
     replica.load(replicatedDbName, primaryDbName);
-
     Path externalTableLocation =
             new Path("/" + testName.getMethodName() + "/t1/");
     DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
@@ -418,12 +418,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             + "'")
         .run("alter table t1 add partition(country='india')")
         .run("alter table t1 add partition(country='us')")
-        .dump(primaryDbName);
+        .dump(primaryDbName, withClause);
 
     assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation);
 
     // Add new data externally, to a partition, but under the partition level top directory
-    // Also, it is added after dumping the events but data should be seen at target after REPL LOAD.
+    // Also, it is added after dumping the events so data should not be seen at target after REPL LOAD.
     Path partitionDir = new Path(externalTableLocation, "country=india");
     try (FSDataOutputStream outputStream = fs.create(new Path(partitionDir, "file.txt"))) {
       outputStream.write("pune\n".getBytes());
@@ -435,16 +435,29 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     }
 
     List<String> loadWithClause = externalTableBasePathWithClause();
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't1'")
         .verifyResult("t1")
         .run("show partitions t1")
         .verifyResults(new String[] { "country=india", "country=us" })
         .run("select place from t1 order by place")
-        .verifyResults(new String[] { "bangalore", "mumbai", "pune" })
+        .verifyResults(new String[] {})
         .verifyReplTargetProperty(replicatedDbName);
 
+    // The Data should be seen after next dump-and-load cycle.
+    tuple = primary.run("use " + primaryDbName)
+            .dump(primaryDbName, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show partitions t1")
+            .verifyResults(new String[] {"country=india", "country=us"})
+            .run("select place from t1 order by place")
+            .verifyResults(new String[] {"bangalore", "mumbai", "pune"})
+            .verifyReplTargetProperty(replicatedDbName);
+
     // Delete one of the file and update another one.
     fs.delete(new Path(partitionDir, "file.txt"), true);
     fs.delete(new Path(partitionDir, "file1.txt"), true);
@@ -453,10 +466,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     }
 
     // Repl load with zero events but external tables location info should present.
-    tuple = primary.dump(primaryDbName);
+    tuple = primary.dump(primaryDbName, withClause);
     assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
             .run("use " + replicatedDbName)
             .run("show tables like 't1'")
             .verifyResult("t1")
@@ -475,7 +488,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     tuple = primary
         .run("alter table t1 drop partition (country='india')")
         .run("alter table t1 drop partition (country='us')")
-        .dump(primaryDbName);
+        .dump(primaryDbName, withClause);
 
     replica.load(replicatedDbName, primaryDbName)
         .run("select * From t1")
@@ -519,8 +532,8 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .verifyFailure(new String[] {"t2" })
             .verifyReplTargetProperty(replicatedDbName);
 
-    dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
-                                   "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
+    dumpWithClause = externalTableWithClause(true, true);
+
     tuple = primary.run("use " + primaryDbName)
             .run("drop table t1")
             .run("create external table t3 (id int)")
@@ -613,8 +626,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .verifyResult("1")
             .verifyReplTargetProperty(replicatedDbName);
 
-    dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
-            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
+    dumpWithClause = externalTableWithClause(true, true);
     primary.run("use " + primaryDbName)
             .run("drop table t1")
             .run("create external table t4 (id int)")
@@ -709,9 +721,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
   @Test
   public void testExternalTablesIncReplicationWithConcurrentDropTable() throws Throwable {
-    List<String> dumpWithClause = Collections.singletonList(
-            "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'"
-    );
+    List<String> dumpWithClause = this.externalTableWithClause(null, true);
     List<String> loadWithClause = externalTableBasePathWithClause();
     WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
             .run("create external table t1 (id int)")
@@ -836,8 +846,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .verifyResult(incTuple.lastReplicationId);
 
     // Take a dump with external tables bootstrapped and load it
-    dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
-            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'");
+    dumpWithClause = externalTableWithClause(true, true);
     WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName)
             .dump(primaryDbName, dumpWithClause);
 
@@ -854,7 +863,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
   @Test
   public void replicationWithTableNameContainsKeywords() throws Throwable {
-    List<String> loadWithClause = externalTableBasePathWithClause();
+    List<String> withClause = externalTableBasePathWithClause();
 
     WarehouseInstance.Tuple tuple = primary
             .run("use " + primaryDbName)
@@ -865,9 +874,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("insert into table t2_constraints partition(country='india') values ('bangalore')")
             .run("insert into table t2_constraints partition(country='us') values ('austin')")
             .run("insert into table t2_constraints partition(country='france') values ('paris')")
-            .dump(primaryDbName);
+            .dump(primaryDbName, withClause);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -886,9 +895,9 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("create table t4_tables (id int)")
             .run("insert into table t4_tables values (10)")
             .run("insert into table t4_tables values (20)")
-            .dump(primaryDbName);
+            .dump(primaryDbName, withClause);
 
-    replica.load(replicatedDbName, primaryDbName, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
             .run("use " + replicatedDbName)
             .run("show tables like 't3_bootstrap'")
             .verifyResults(new String[] {"t3_bootstrap"})
@@ -902,6 +911,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     return ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica);
   }
 
+  private List<String> externalTableWithClause(Boolean bootstrapExtTbl, Boolean includeExtTbl)
+          throws IOException, SemanticException {
+    List<String> extTblBaseDir = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica);
+    return ReplicationTestUtils.externalTableWithClause(extTblBaseDir, bootstrapExtTbl, includeExtTbl);
+  }
+
   private void assertExternalFileInfo(List<String> expected, String dumplocation) throws IOException {
     assertExternalFileInfo(expected, dumplocation, null);
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
index c260a7d..8b2c556 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
@@ -477,7 +477,9 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
 
     dumpWithClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
                                    "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'",
-            "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'");
+            "'" + HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY_FOR_EXTERNAL_TABLE.varname + "'='false'",
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'",
+            "'distcp.options.pugpb'=''");
     tuple = primary.run("use " + primaryDbName)
             .run("drop table t1")
             .run("create external table t3 (id int)")
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
index afb53b8..692d40d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
@@ -156,4 +156,53 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
       replica.run("drop scheduled query s2");
     }
   }
+
+  @Test
+  public void testExternalTablesReplLoadBootstrapIncr() throws Throwable {
+    // Bootstrap
+    String withClause = " WITH('" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname
+            + "'='/replica_external_base')";
+    primary.run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into t1 values(1)")
+            .run("insert into t1 values(2)");
+    try (ScheduledQueryExecutionService schqS =
+                 ScheduledQueryExecutionService.startScheduledQueryExecutorService(primary.hiveConf)) {
+      int next = 0;
+      ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
+      primary.run("create scheduled query s1 every 10 minutes as repl dump " + primaryDbName + withClause);
+      primary.run("alter scheduled query s1 execute");
+      Thread.sleep(80000);
+      replica.run("create scheduled query s2 every 10 minutes as repl load " + primaryDbName + " INTO "
+              + replicatedDbName);
+      replica.run("alter scheduled query s2 execute");
+      Thread.sleep(80000);
+      replica.run("use " + replicatedDbName)
+              .run("show tables like 't1'")
+              .verifyResult("t1")
+              .run("select id from t1 order by id")
+              .verifyResults(new String[]{"1", "2"});
+
+      // First incremental, after bootstrap
+      primary.run("use " + primaryDbName)
+              .run("insert into t1 values(3)")
+              .run("insert into t1 values(4)");
+      next++;
+      ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
+      primary.run("alter scheduled query s1 execute");
+      Thread.sleep(80000);
+      replica.run("alter scheduled query s2 execute");
+      Thread.sleep(80000);
+      replica.run("use " + replicatedDbName)
+              .run("show tables like 't1'")
+              .verifyResult("t1")
+              .run("select id from t1 order by id")
+              .verifyResults(new String[]{"1", "2", "3", "4"});
+
+
+    } finally {
+      primary.run("drop scheduled query s1");
+      replica.run("drop scheduled query s2");
+    }
+  }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index ad6c002..78251f2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -866,7 +866,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
             "'" +  HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
             "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'",
             "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'",
-            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'",
+            "'distcp.options.pugpb'=''"
     );
     replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5"};
     bootstrapTables = new String[] {"in2", "in3", "in4", "in5"};
@@ -892,10 +894,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
   @Test
   public void testRenameTableScenariosWithReplaceExternalTable() throws Throwable {
     List<String> loadWithClause = ReplicationTestUtils.externalTableBasePathWithClause(REPLICA_EXTERNAL_BASE, replica);
-    List<String> dumpWithClause = Arrays.asList(
-            "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
-            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'"
-    );
+    List<String> dumpWithClause = ReplicationTestUtils.externalTableWithClause(loadWithClause, true, true);
     String replPolicy = primaryDbName + ".'(in[0-9]+)|(out4)|(out5)|(out1500)'";
     String lastReplId = replicateAndVerify(replPolicy, null, null, dumpWithClause,
             loadWithClause, new String[] {}, new String[] {});
@@ -918,7 +917,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     String newPolicy = primaryDbName + ".'(in[0-9]+)|(out1500)|(in2)'";
     dumpWithClause = Arrays.asList(
             "'" + HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
-            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'"
+            "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='false'",
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'",
+            "'distcp.options.pugpb'=''"
     );
 
     // in2 should be dropped.
@@ -1044,7 +1045,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
             "'" +  HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
             "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_EXTERNAL_TABLES.varname + "'='true'",
             "'" + HiveConf.ConfVars.REPL_BOOTSTRAP_ACID_TABLES.varname + "'='true'",
-            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'",
+            "'distcp.options.pugpb'=''"
     );
 
     replicatedTables = new String[] {"in1", "in2", "in3", "in4", "in5", "in6", "in7", "in9"};
@@ -1059,7 +1062,9 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
 
     dumpWithClause = Arrays.asList(
             "'" +  HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES.varname + "'='true'",
-            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'"
+            "'" + ReplUtils.REPL_DUMP_INCLUDE_ACID_TABLES + "'='true'",
+            "'" + HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname + "'='" + REPLICA_EXTERNAL_BASE + "'",
+            "'distcp.options.pugpb'=''"
     );
 
     // Database replication with ACID and EXTERNAL table.
diff --git a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
index 9b9b68f..25d530c 100644
--- a/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
+++ b/ql/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/ql/plan/api/StageType.java
@@ -7,10 +7,6 @@
 package org.apache.hadoop.hive.ql.plan.api;
 
 
-import java.util.Map;
-import java.util.HashMap;
-import org.apache.thrift.TEnum;
-
 public enum StageType implements org.apache.thrift.TEnum {
   CONDITIONAL(0),
   COPY(1),
@@ -30,11 +26,11 @@ public enum StageType implements org.apache.thrift.TEnum {
   REPL_TXN(15),
   REPL_INCREMENTAL_LOAD(16),
   SCHEDULED_QUERY_MAINT(17),
-  REPL_LOAD_COMPLETE_ACK(18);
+  ACK(18);
 
   private final int value;
 
-  private StageType(int value) {
+  StageType(int value) {
     this.value = value;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 4f87bac..c82e8d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -31,12 +31,12 @@ import org.apache.hadoop.hive.ql.exec.repl.ReplDumpTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplLoadTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
+import org.apache.hadoop.hive.ql.exec.repl.AckTask;
+import org.apache.hadoop.hive.ql.exec.repl.AckWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogTask;
 import org.apache.hadoop.hive.ql.exec.repl.ReplStateLogWork;
-import org.apache.hadoop.hive.ql.exec.repl.ReplLoadCompleteAckTask;
-import org.apache.hadoop.hive.ql.exec.repl.ReplLoadCompleteAckWork;
-import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyTask;
-import org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
+import org.apache.hadoop.hive.ql.exec.repl.DirCopyTask;
+import org.apache.hadoop.hive.ql.exec.repl.DirCopyWork;
 import org.apache.hadoop.hive.ql.exec.schq.ScheduledQueryMaintenanceTask;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
@@ -113,11 +113,12 @@ public final class TaskFactory {
     taskvec.add(new TaskTuple<>(ReplDumpWork.class, ReplDumpTask.class));
     taskvec.add(new TaskTuple<>(ReplLoadWork.class, ReplLoadTask.class));
     taskvec.add(new TaskTuple<>(ReplStateLogWork.class, ReplStateLogTask.class));
-    taskvec.add(new TaskTuple<ReplLoadCompleteAckWork>(ReplLoadCompleteAckWork.class, ReplLoadCompleteAckTask.class));
+    taskvec.add(new TaskTuple<AckWork>(AckWork.class, AckTask.class));
     taskvec.add(new TaskTuple<ExportWork>(ExportWork.class, ExportTask.class));
     taskvec.add(new TaskTuple<ReplTxnWork>(ReplTxnWork.class, ReplTxnTask.class));
     taskvec.add(new TaskTuple<DirCopyWork>(DirCopyWork.class, DirCopyTask.class));
-    taskvec.add(new TaskTuple<ScheduledQueryMaintenanceWork>(ScheduledQueryMaintenanceWork.class, ScheduledQueryMaintenanceTask.class));
+    taskvec.add(new TaskTuple<ScheduledQueryMaintenanceWork>(ScheduledQueryMaintenanceWork.class,
+            ScheduledQueryMaintenanceTask.class));
   }
 
   private static ThreadLocal<Integer> tid = new ThreadLocal<Integer>() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
similarity index 75%
rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java
rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
index 975dfb0..03e8c4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckTask.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hive.ql.exec.repl;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -29,18 +28,18 @@ import org.apache.hadoop.hive.ql.plan.api.StageType;
 import java.io.Serializable;
 
 /**
- * ReplLoadCompleteAckTask.
+ * AckTask.
  *
- * Add the load complete acknoledgement.
+ * Add the repl dump/ repl load complete acknowledgement.
  **/
-public class ReplLoadCompleteAckTask extends Task<ReplLoadCompleteAckWork> implements Serializable {
+public class AckTask extends Task<AckWork> implements Serializable {
 
   private static final long serialVersionUID = 1L;
 
   @Override
   public int execute() {
     try {
-      Path ackPath = new Path(work.getDumpPath(), ReplUtils.LOAD_ACKNOWLEDGEMENT);
+      Path ackPath = work.getAckFilePath();
       Utils.create(ackPath, conf);
     } catch (SemanticException e) {
       setException(e);
@@ -51,19 +50,17 @@ public class ReplLoadCompleteAckTask extends Task<ReplLoadCompleteAckWork> imple
 
   @Override
   public StageType getType() {
-    return StageType.REPL_LOAD_COMPLETE_ACK;
+    return StageType.ACK;
   }
 
   @Override
   public String getName() {
-    return "REPL_LOAD_COMPLETE_ACK";
+    return "ACK_TASK";
   }
 
   @Override
   public boolean canExecuteInParallel() {
-    // REPL_LOAD_COMPLETE_ACK is executed only when all its parents are done with execution.
-    // So running it in parallel has no
-    // benefits.
+    // ACK_TASK must be executed only when all its parents are done with execution.
     return false;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java
similarity index 71%
rename from ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java
rename to ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java
index c36ee6d..0fa0a95 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadCompleteAckWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/AckWork.java
@@ -18,27 +18,28 @@
 
 package org.apache.hadoop.hive.ql.exec.repl;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
 
 import java.io.Serializable;
 
 /**
- * ReplLoadCompleteAckWork.
- * FS based Acknowledgement for repl load complete
+ * AckWork.
+ * FS based acknowledgement on repl dump and repl load completion.
  *
  */
-@Explain(displayName = "Repl Load Complete Ack", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class ReplLoadCompleteAckWork implements Serializable {
+@Explain(displayName = "Replication Ack", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
+public class AckWork implements Serializable {
   private static final long serialVersionUID = 1L;
-  private String dumpPath;
+  private Path ackFilePath;
 
-  public String getDumpPath() {
-    return dumpPath;
+  public Path getAckFilePath() {
+    return ackFilePath;
   }
 
-  public ReplLoadCompleteAckWork(String dumpPath) {
-    this.dumpPath = dumpPath;
+  public AckWork(Path ackFilePath) {
+    this.ackFilePath = ackFilePath;
   }
 
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
new file mode 100644
index 0000000..e8a8df1
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyTask.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
+import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import java.security.PrivilegedExceptionAction;
+import java.io.Serializable;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+
+/**
+ * DirCopyTask, mainly to be used to copy External table data.
+ */
+public class DirCopyTask extends Task<DirCopyWork> implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
+  private static final int MAX_COPY_RETRY = 5;
+
+  private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException {
+    FileSystem targetFs = destPath.getFileSystem(conf);
+    boolean createdDir = false;
+    if (!targetFs.exists(destPath)) {
+      // target path is created even if the source path is missing, so that ddl task does not try to create it.
+      if (!targetFs.mkdirs(destPath)) {
+        throw new IOException(destPath + " is not a directory or unable to create one");
+      }
+      createdDir = true;
+    }
+
+    FileStatus status;
+    try {
+      status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath);
+    } catch (FileNotFoundException e) {
+      // Don't delete target path created else ddl task will try to create it using user hive and may fail.
+      LOG.warn("source path missing " + sourcePath);
+      return createdDir;
+    }
+    LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}",
+            destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission());
+    destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup());
+    destPath.getFileSystem(conf).setPermission(destPath, status.getPermission());
+    return createdDir;
+  }
+
+  private boolean setTargetPathOwner(Path targetPath, Path sourcePath, UserGroupInformation proxyUser)
+            throws IOException, InterruptedException {
+    if (proxyUser == null) {
+      return createAndSetPathOwner(targetPath, sourcePath);
+    }
+    return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> createAndSetPathOwner(targetPath, sourcePath));
+  }
+
+  private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser) throws Exception {
+    if (proxyUser == null) {
+      return sourcePath.getFileSystem(conf).exists(sourcePath);
+    }
+    return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () -> sourcePath.getFileSystem(conf).exists(sourcePath));
+  }
+
+  private int handleException(Exception e, Path sourcePath, Path targetPath,
+                              int currentRetry, UserGroupInformation proxyUser) {
+    try {
+      LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e);
+      if (!checkIfPathExist(sourcePath, proxyUser)) {
+        LOG.info("Source path is missing. Ignoring exception.");
+        return 0;
+      }
+    } catch (Exception ex) {
+      LOG.warn("Source path missing check failed. ", ex);
+    }
+    // retry logic only for i/o exception
+    if (!(e instanceof IOException)) {
+      LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e);
+      setException(e);
+      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+    }
+
+    if (currentRetry <= MAX_COPY_RETRY) {
+      LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e);
+    } else {
+      LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e);
+      setException(e);
+      return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode();
+    }
+    int sleepTime = FileUtils.getSleepTime(currentRetry);
+    LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry));
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException timerEx) {
+      LOG.info("Sleep interrupted", timerEx.getMessage());
+    }
+    try {
+      if (proxyUser == null) {
+        proxyUser = Utils.getUGI();
+      }
+      FileSystem.closeAllForUGI(proxyUser);
+    } catch (Exception ex) {
+      LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex);
+    }
+    return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+  }
+
+  @Override
+  public int execute() {
+    String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
+
+    Path sourcePath = work.getFullyQualifiedSourcePath();
+    Path targetPath = work.getFullyQualifiedTargetPath();
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
+      sourcePath = reservedRawPath(work.getFullyQualifiedSourcePath().toUri());
+      targetPath = reservedRawPath(work.getFullyQualifiedTargetPath().toUri());
+    }
+    int currentRetry = 0;
+    int error = 0;
+    UserGroupInformation proxyUser = null;
+    while (currentRetry <= MAX_COPY_RETRY) {
+      try {
+        UserGroupInformation ugi = Utils.getUGI();
+        String currentUser = ugi.getShortUserName();
+        if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
+          proxyUser = UserGroupInformation.createProxyUser(
+                  distCpDoAsUser, UserGroupInformation.getLoginUser());
+        }
+
+        setTargetPathOwner(targetPath, sourcePath, proxyUser);
+
+        // do we create a new conf and only here provide this additional option so that we get away from
+        // differences of data in two location for the same directories ?
+        // basically add distcp.options.delete to hiveconf new object ?
+        FileUtils.distCp(
+                sourcePath.getFileSystem(conf), // source file system
+                Collections.singletonList(sourcePath),  // list of source paths
+                targetPath,
+                false,
+                proxyUser,
+                conf,
+                ShimLoader.getHadoopShims());
+        return 0;
+      } catch (Exception e) {
+        currentRetry++;
+        error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser);
+        if (error == 0) {
+          return 0;
+        }
+      } finally {
+        if (proxyUser != null) {
+          try {
+            FileSystem.closeAllForUGI(proxyUser);
+          } catch (IOException e) {
+            LOG.error("Unable to closeAllForUGI for user " + proxyUser, e);
+            if (error == 0) {
+              setException(e);
+              error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
+            }
+            break;
+          }
+        }
+      }
+    }
+    return error;
+  }
+
+  private static Path reservedRawPath(URI uri) {
+    return new Path(uri.getScheme(), uri.getAuthority(), CopyUtils.RAW_RESERVED_VIRTUAL_PATH + uri.getPath());
+  }
+
+  @Override
+  public StageType getType() {
+    return StageType.COPY;
+  }
+
+  @Override
+  public String getName() {
+    return "DIR_COPY_TASK";
+  }
+
+  @Override
+  public boolean canExecuteInParallel() {
+    return true;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
new file mode 100644
index 0000000..efef052
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.plan.Explain;
+import java.io.Serializable;
+
+/**
+ * DirCopyWork, mainly to be used to copy External table data.
+ */
+@Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER,
+        Explain.Level.DEFAULT,
+        Explain.Level.EXTENDED })
+public class DirCopyWork implements Serializable {
+  private final Path fullyQualifiedSourcePath;
+  private final Path fullyQualifiedTargetPath;
+
+  public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) {
+    this.fullyQualifiedSourcePath = fullyQualifiedSourcePath;
+    this.fullyQualifiedTargetPath = fullyQualifiedTargetPath;
+  }
+  @Override
+  public String toString() {
+    return "DirCopyWork{"
+            + "fullyQualifiedSourcePath=" + getFullyQualifiedSourcePath()
+            + ", fullyQualifiedTargetPath=" + getFullyQualifiedTargetPath()
+            + '}';
+  }
+
+  public Path getFullyQualifiedSourcePath() {
+    return fullyQualifiedSourcePath;
+  }
+
+  public Path getFullyQualifiedTargetPath() {
+    return fullyQualifiedTargetPath;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
deleted file mode 100644
index 5c5543c..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ExternalTableCopyTaskBuilder.java
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec.repl;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
-import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.api.StageType;
-import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.hive.shims.Utils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import java.security.PrivilegedExceptionAction;
-
-import java.io.Serializable;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-public class ExternalTableCopyTaskBuilder {
-  private static final Logger LOG = LoggerFactory.getLogger(ExternalTableCopyTaskBuilder.class);
-  private final ReplLoadWork work;
-  private final HiveConf conf;
-
-  ExternalTableCopyTaskBuilder(ReplLoadWork work, HiveConf conf) {
-    this.work = work;
-    this.conf = conf;
-  }
-
-  List<Task<?>> tasks(TaskTracker tracker) {
-    List<Task<?>> tasks = new ArrayList<>();
-    Iterator<DirCopyWork> itr = work.getPathsToCopyIterator();
-    while (tracker.canAddMoreTasks() && itr.hasNext()) {
-      DirCopyWork dirCopyWork = itr.next();
-      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
-      tasks.add(task);
-      tracker.addTask(task);
-      LOG.debug("added task for {}", dirCopyWork);
-    }
-    return tasks;
-  }
-
-  public static class DirCopyTask extends Task<DirCopyWork> implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(DirCopyTask.class);
-    private static final int MAX_COPY_RETRY = 5;
-
-    private boolean createAndSetPathOwner(Path destPath, Path sourcePath) throws IOException {
-      FileSystem targetFs = destPath.getFileSystem(conf);
-      boolean createdDir = false;
-      if (!targetFs.exists(destPath)) {
-        // target path is created even if the source path is missing, so that ddl task does not try to create it.
-        if (!targetFs.mkdirs(destPath)) {
-          throw new IOException(destPath + " is not a directory or unable to create one");
-        }
-        createdDir = true;
-      }
-
-      FileStatus status;
-      try {
-        status = sourcePath.getFileSystem(conf).getFileStatus(sourcePath);
-      } catch (FileNotFoundException e) {
-        // Don't delete target path created else ddl task will try to create it using user hive and may fail.
-        LOG.warn("source path missing " + sourcePath);
-        return createdDir;
-      }
-      LOG.info("Setting permission for path dest {} from source {} owner {} : {} : {}",
-              destPath, sourcePath, status.getOwner(), status.getGroup(), status.getPermission());
-      destPath.getFileSystem(conf).setOwner(destPath, status.getOwner(), status.getGroup());
-      destPath.getFileSystem(conf).setPermission(destPath, status.getPermission());
-      return createdDir;
-    }
-
-    private boolean setTargetPathOwner(Path targetPath, Path sourcePath, UserGroupInformation proxyUser)
-            throws IOException, InterruptedException {
-      if (proxyUser == null) {
-        return createAndSetPathOwner(targetPath, sourcePath);
-      }
-      return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
-                createAndSetPathOwner(targetPath, sourcePath));
-    }
-
-    private boolean checkIfPathExist(Path sourcePath, UserGroupInformation proxyUser) throws Exception {
-      if (proxyUser == null) {
-        return sourcePath.getFileSystem(conf).exists(sourcePath);
-      }
-      return proxyUser.doAs((PrivilegedExceptionAction<Boolean>) () ->
-              sourcePath.getFileSystem(conf).exists(sourcePath));
-    }
-
-    private int handleException(Exception e, Path sourcePath, Path targetPath,
-                                int currentRetry, UserGroupInformation proxyUser) {
-      try {
-        LOG.info("Checking if source path " + sourcePath + " is missing for exception ", e);
-        if (!checkIfPathExist(sourcePath, proxyUser)) {
-          LOG.info("Source path is missing. Ignoring exception.");
-          return 0;
-        }
-      } catch (Exception ex) {
-        LOG.warn("Source path missing check failed. ", ex);
-      }
-
-      // retry logic only for i/o exception
-      if (!(e instanceof IOException)) {
-        LOG.error("Unable to copy {} to {}", sourcePath, targetPath, e);
-        setException(e);
-        return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-      }
-
-      if (currentRetry <= MAX_COPY_RETRY) {
-        LOG.warn("Unable to copy {} to {}", sourcePath, targetPath, e);
-      } else {
-        LOG.error("Unable to copy {} to {} even after retrying for {} time", sourcePath, targetPath, currentRetry, e);
-        setException(e);
-        return ErrorMsg.REPL_FILE_SYSTEM_OPERATION_RETRY.getErrorCode();
-      }
-
-      int sleepTime = FileUtils.getSleepTime(currentRetry);
-      LOG.info("Sleep for " + sleepTime + " milliseconds before retry no " + (currentRetry));
-      try {
-        Thread.sleep(sleepTime);
-      } catch (InterruptedException timerEx) {
-        LOG.info("Sleep interrupted", timerEx.getMessage());
-      }
-
-      try {
-        if (proxyUser == null) {
-          proxyUser = Utils.getUGI();
-        }
-        FileSystem.closeAllForUGI(proxyUser);
-      } catch (Exception ex) {
-        LOG.warn("Unable to closeAllForUGI for user " + proxyUser, ex);
-      }
-      return ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-    }
-
-    @Override
-    public int execute() {
-      String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
-
-      Path sourcePath = work.fullyQualifiedSourcePath;
-      Path targetPath = work.fullyQualifiedTargetPath;
-      if (conf.getBoolVar(HiveConf.ConfVars.REPL_ADD_RAW_RESERVED_NAMESPACE)) {
-        sourcePath = reservedRawPath(work.fullyQualifiedSourcePath.toUri());
-        targetPath = reservedRawPath(work.fullyQualifiedTargetPath.toUri());
-      }
-      int currentRetry = 0;
-      int error = 0;
-      UserGroupInformation proxyUser = null;
-      while (currentRetry <= MAX_COPY_RETRY) {
-        try {
-          UserGroupInformation ugi = Utils.getUGI();
-          String currentUser = ugi.getShortUserName();
-          if (distCpDoAsUser != null && !currentUser.equals(distCpDoAsUser)) {
-            proxyUser = UserGroupInformation.createProxyUser(
-                    distCpDoAsUser, UserGroupInformation.getLoginUser());
-          }
-
-          setTargetPathOwner(targetPath, sourcePath, proxyUser);
-
-          // do we create a new conf and only here provide this additional option so that we get away from
-          // differences of data in two location for the same directories ?
-          // basically add distcp.options.delete to hiveconf new object ?
-          FileUtils.distCp(
-              sourcePath.getFileSystem(conf), // source file system
-              Collections.singletonList(sourcePath),  // list of source paths
-              targetPath,
-              false,
-              proxyUser,
-              conf,
-              ShimLoader.getHadoopShims());
-          return 0;
-        } catch (Exception e) {
-          currentRetry++;
-          error = handleException(e, sourcePath, targetPath, currentRetry, proxyUser);
-          if (error == 0) {
-            return 0;
-          }
-        } finally {
-          if (proxyUser != null) {
-            try {
-              FileSystem.closeAllForUGI(proxyUser);
-            } catch (IOException e) {
-              LOG.error("Unable to closeAllForUGI for user " + proxyUser, e);
-              if (error == 0) {
-                setException(e);
-                error = ErrorMsg.getErrorMsg(e.getMessage()).getErrorCode();
-              }
-              break;
-            }
-          }
-        }
-      }
-      return error;
-    }
-
-    private static Path reservedRawPath(URI uri) {
-      return new Path(uri.getScheme(), uri.getAuthority(),
-          CopyUtils.RAW_RESERVED_VIRTUAL_PATH + uri.getPath());
-    }
-
-    @Override
-    public StageType getType() {
-      return StageType.COPY;
-    }
-
-    @Override
-    public String getName() {
-      return "DIR_COPY_TASK";
-    }
-
-    @Override
-    public boolean canExecuteInParallel(){
-      return true;
-    }
-  }
-
-  @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER,
-      Explain.Level.DEFAULT,
-      Explain.Level.EXTENDED })
-  public static class DirCopyWork implements Serializable {
-    private final Path fullyQualifiedSourcePath, fullyQualifiedTargetPath;
-
-    public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) {
-      this.fullyQualifiedSourcePath = fullyQualifiedSourcePath;
-      this.fullyQualifiedTargetPath = fullyQualifiedTargetPath;
-    }
-
-    @Override
-    public String toString() {
-      return "DirCopyWork{" +
-          "fullyQualifiedSourcePath=" + fullyQualifiedSourcePath +
-          ", fullyQualifiedTargetPath=" + fullyQualifiedTargetPath +
-          '}';
-    }
-  }
-}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 8da1c48..69f6ffe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -40,9 +40,12 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.AndFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFilter;
 import org.apache.hadoop.hive.metastore.messaging.event.filters.ReplEventFilter;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -52,7 +55,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping;
+import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -75,6 +78,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.login.LoginException;
+import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
@@ -84,16 +88,18 @@ import java.util.List;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Base64;
-import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.UUID;
+import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
 
 public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
+  private static final long serialVersionUID = 1L;
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
   private static final String FUNCTION_METADATA_FILE_NAME = EximUtil.METADATA_NAME;
   private static final long SLEEP_TIME = 60000;
-  Set<String> tablesForBootstrap = new HashSet<>();
+  private Set<String> tablesForBootstrap = new HashSet<>();
 
   public enum ConstraintFileType {COMMON("common", "c_"), FOREIGNKEY("fk", "f_");
     private final String name;
@@ -122,33 +128,35 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   @Override
   public int execute() {
     try {
-      Hive hiveDb = getHive();
-      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
-              Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
-                      .getBytes(StandardCharsets.UTF_8.name())));
-      Path previousDumpMetaPath = getPreviousDumpMetadataPath(dumpRoot);
-      Path previousHiveDumpPath =
-              previousDumpMetaPath != null ? new Path(previousDumpMetaPath, ReplUtils.REPL_HIVE_BASE_DIR) : null;
-      //If no previous dump is present or previous dump was loaded, proceed
-      if (shouldDump(previousHiveDumpPath)) {
-        Path currentDumpPath = new Path(dumpRoot, getNextDumpDir());
-        Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
-        DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf);
-        // Initialize ReplChangeManager instance since we will require it to encode file URI.
-        ReplChangeManager.getInstance(conf);
-        Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
-        Long lastReplId;
-        if (previousHiveDumpPath == null) {
-          lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
+      if (work.tableDataCopyIteratorsInitialized()) {
+        initiateDataCopyTasks();
+      } else {
+        Hive hiveDb = getHive();
+        Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+                Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
+                        .getBytes(StandardCharsets.UTF_8.name())));
+        Path previousHiveDumpPath = getPreviousDumpMetadataPath(dumpRoot);
+        //If no previous dump is present or previous dump is already loaded, proceed with the dump operation.
+        if (shouldDump(previousHiveDumpPath)) {
+          Path currentDumpPath = new Path(dumpRoot, getNextDumpDir());
+          Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
+          DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf);
+          // Initialize ReplChangeManager instance since we will require it to encode file URI.
+          ReplChangeManager.getInstance(conf);
+          Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
+          Long lastReplId;
+          if (previousHiveDumpPath == null) {
+            lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
+          } else {
+            work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath));
+            lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
+          }
+          work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
+          work.setCurrentDumpPath(currentDumpPath);
+          initiateDataCopyTasks();
         } else {
-          work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath));
-          lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
+          LOG.info("Previous Dump is not yet loaded");
         }
-        prepareReturnValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
-        writeDumpCompleteAck(hiveDumpRoot);
-        deleteAllPreviousDumpMeta(dumpRoot, currentDumpPath);
-      } else {
-        LOG.warn("Previous Dump is not yet loaded");
       }
     } catch (Exception e) {
       LOG.error("failed", e);
@@ -158,20 +166,59 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return 0;
   }
 
-  private void deleteAllPreviousDumpMeta(Path dumpRoot, Path currentDumpPath) throws IOException {
-    FileSystem fs = dumpRoot.getFileSystem(conf);
-    if (fs.exists(dumpRoot)) {
-      FileStatus[] statuses = fs.listStatus(dumpRoot,
-        path -> !path.equals(currentDumpPath) && !path.toUri().getPath().equals(currentDumpPath.toString()));
-      for (FileStatus status : statuses) {
-        fs.delete(status.getPath(), true);
+  private void initiateDataCopyTasks() throws SemanticException, IOException {
+    TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
+    List<Task<?>> childTasks = new ArrayList<>();
+    childTasks.addAll(work.externalTableCopyTasks(taskTracker, conf));
+    childTasks.addAll(work.managedTableCopyTasks(taskTracker, conf));
+    if (childTasks.isEmpty()) {
+      //All table data copy work finished.
+      finishRemainingTasks();
+    } else {
+      DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
+      this.childTasks = childTasks;
+    }
+  }
+
+  private void finishRemainingTasks() throws SemanticException, IOException {
+    prepareReturnValues(work.getResultValues());
+    Path dumpAckFile = new Path(work.getCurrentDumpPath(),
+            ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT);
+    Utils.create(dumpAckFile, conf);
+    deleteAllPreviousDumpMeta(work.getCurrentDumpPath());
+  }
+
+  private void prepareReturnValues(List<String> values) throws SemanticException {
+    LOG.debug("prepareReturnValues : " + dumpSchema);
+    for (String s : values) {
+      LOG.debug("    > " + s);
+    }
+    Utils.writeOutput(Collections.singletonList(values), new Path(work.resultTempPath), conf);
+  }
+
+  private void deleteAllPreviousDumpMeta(Path currentDumpPath) {
+    try {
+      Path dumpRoot = getDumpRoot(currentDumpPath);
+      FileSystem fs = dumpRoot.getFileSystem(conf);
+      if (fs.exists(dumpRoot)) {
+        FileStatus[] statuses = fs.listStatus(dumpRoot,
+          path -> !path.equals(currentDumpPath) && !path.toUri().getPath().equals(currentDumpPath.toString()));
+        for (FileStatus status : statuses) {
+          fs.delete(status.getPath(), true);
+        }
       }
+    } catch (Exception ex) {
+      LOG.warn("Possible leak on disk, could not delete the previous dump directory:" + currentDumpPath, ex);
     }
   }
 
-  private void writeDumpCompleteAck(Path currentDumpPath) throws SemanticException {
-    Path ackPath = new Path(currentDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT);
-    Utils.create(ackPath, conf);
+  private Path getDumpRoot(Path currentDumpPath) {
+    if (ReplDumpWork.testDeletePreviousDumpMetaPath && conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+      //testDeleteDumpMetaDumpPath to be used only for test.
+      return null;
+    } else {
+      return currentDumpPath.getParent();
+    }
   }
 
   private Long getEventFromPreviousDumpMetadata(Path previousDumpPath) throws SemanticException {
@@ -187,20 +234,30 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   }
 
   private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException {
+    FileStatus latestValidStatus = null;
     FileSystem fs = dumpRoot.getFileSystem(conf);
     if (fs.exists(dumpRoot)) {
       FileStatus[] statuses = fs.listStatus(dumpRoot);
-      if (statuses.length > 0)  {
-        FileStatus latestUpdatedStatus = statuses[0];
-        for (FileStatus status : statuses) {
-          if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) {
-            latestUpdatedStatus = status;
-          }
+      for (FileStatus status : statuses) {
+        LOG.info("Evaluating previous dump dir path:{}", status.getPath());
+        if (latestValidStatus == null) {
+          latestValidStatus = validDump(fs, status.getPath()) ? status : null;
+        } else if (validDump(fs, status.getPath())
+                && status.getModificationTime() > latestValidStatus.getModificationTime()) {
+          latestValidStatus = status;
         }
-        return latestUpdatedStatus.getPath();
       }
     }
-    return null;
+    Path latestDumpDir = (latestValidStatus == null)
+         ? null : new Path(latestValidStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR);
+    LOG.info("Selecting latest valid dump dir as {}", (latestDumpDir == null) ? "null" : latestDumpDir.toString());
+    return latestDumpDir;
+  }
+
+  private boolean validDump(FileSystem fs, Path dumpDir) throws IOException {
+    //Check if it was a successful dump
+    Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR);
+    return fs.exists(new Path(hiveDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT));
   }
 
   private boolean shouldDump(Path previousDumpPath) throws IOException {
@@ -214,14 +271,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
   }
 
-  private void prepareReturnValues(List<String> values) throws SemanticException {
-    LOG.debug("prepareReturnValues : " + dumpSchema);
-    for (String s : values) {
-      LOG.debug("    > " + s);
-    }
-    Utils.writeOutput(Collections.singletonList(values), new Path(work.resultTempPath), conf);
-  }
-
   /**
    * Decide whether to examine all the tables to dump. We do this if
    * 1. External tables are going to be part of the dump : In which case we need to list their
@@ -326,6 +375,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
+    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
+    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
 
     List<String> tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
 
@@ -403,8 +454,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
       }
 
+      managedTableCopyPaths = new ArrayList<>();
       Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, true);
-
+      List<Path> extTableLocations = new LinkedList<>();
       try (Writer writer = new Writer(dumpRoot, conf)) {
         for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
           try {
@@ -413,13 +465,15 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
             // Dump external table locations if required.
             if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
                   && shouldDumpExternalTableLocation()) {
-              writer.dataLocationDump(table);
+              extTableLocations.addAll(writer.dataLocationDump(table));
             }
 
             // Dump the table to be bootstrapped if required.
             if (shouldBootstrapDumpTable(table)) {
               HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
-              dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb, tableTuple);
+              managedTableCopyPaths.addAll(
+                      dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId,
+                              hiveDb, tableTuple));
             }
             if (tableList != null && isTableSatifiesConfig(table)) {
               tableList.add(tableName);
@@ -432,8 +486,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         }
       }
       dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
+      extTableCopyWorks = dirLocationsToCopy(extTableLocations);
     }
-
+    work.setDirCopyIterator(extTableCopyWorks.iterator());
+    work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
     return lastReplId;
   }
 
@@ -530,6 +586,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList);
   }
 
+  private List<DirCopyWork> dirLocationsToCopy(List<Path> sourceLocations)
+          throws HiveException {
+    List<DirCopyWork> list = new ArrayList<>(sourceLocations.size());
+    String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
+    // this is done to remove any scheme related information that will be present in the base path
+    // specifically when we are replicating to cloud storage
+    Path basePath = new Path(baseDir);
+    for (Path sourcePath : sourceLocations) {
+      Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath);
+      list.add(new DirCopyWork(sourcePath, targetPath));
+    }
+    return list;
+  }
+
   Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           throws Exception {
     // bootstrap case
@@ -546,6 +616,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
 
     String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+    List<DirCopyWork> extTableCopyWorks = new ArrayList<>();
+    List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
     for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
       LOG.debug("Dumping db: " + dbName);
 
@@ -568,6 +640,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
       Exception caught = null;
       try (Writer writer = new Writer(dbRoot, conf)) {
+        List<Path> extTableLocations = new LinkedList<>();
         for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
           LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri());
           Table table = null;
@@ -578,10 +651,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
             if (shouldDumpExternalTableLocation()
                     && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) {
               LOG.debug("Adding table {} to external tables list", tblName);
-              writer.dataLocationDump(tableTuple.object);
+              extTableLocations.addAll(writer.dataLocationDump(tableTuple.object));
             }
-            dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId, hiveDb,
-                tableTuple);
+            managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId,
+                    hiveDb, tableTuple));
           } catch (InvalidTableException te) {
             // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
             // Just log a debug message and skip it.
@@ -593,6 +666,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           }
         }
         dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
+        extTableCopyWorks = dirLocationsToCopy(extTableLocations);
       } catch (Exception e) {
         caught = e;
       } finally {
@@ -618,9 +692,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
     dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
     dmd.write();
-
-    // Set the correct last repl id to return to the user
-    // Currently returned bootDumpBeginReplId as we don't consolidate the events after bootstrap
+    work.setDirCopyIterator(extTableCopyWorks.iterator());
+    work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
     return bootDumpBeginReplId;
   }
 
@@ -638,8 +711,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return dbRoot;
   }
 
-  void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot, long lastReplId,
-      Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception {
+  List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot,
+                                  long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception {
     LOG.info("Bootstrap Dump for table " + tblName);
     TableSpec tableSpec = new TableSpec(tuple.object);
     TableExport.Paths exportPaths =
@@ -657,20 +730,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
     MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
     tuple.replicationSpec.setRepl(true);
-    List<ReplPathMapping> replPathMappings = new TableExport(
+    List<ManagedTableCopyPath> managedTableCopyPaths = new TableExport(
             exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(false);
     replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
     if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE)
             || Utils.shouldDumpMetaDataOnly(conf)) {
-      return;
-    }
-    for (ReplPathMapping replPathMapping: replPathMappings) {
-      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
-              tuple.replicationSpec, replPathMapping.getSrcPath(), replPathMapping.getTargetPath(), conf, false);
-      this.addDependentTask(copyTask);
-      LOG.info("Scheduled a repl copy task from [{}] to [{}]",
-              replPathMapping.getSrcPath(), replPathMapping.getTargetPath());
+      return Collections.emptyList();
     }
+    return managedTableCopyPaths;
   }
 
   private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 9b11bae..1f0d702 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -18,29 +18,51 @@
 package org.apache.hadoop.hive.ql.exec.repl;
 
 import com.google.common.primitives.Ints;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.repl.ReplScope;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.plan.Explain;
+import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 
 @Explain(displayName = "Replication Dump Operator", explainLevels = { Explain.Level.USER,
     Explain.Level.DEFAULT,
     Explain.Level.EXTENDED })
 public class ReplDumpWork implements Serializable {
+  private static final long serialVersionUID = 1L;
+  private static final Logger LOG = LoggerFactory.getLogger(ReplDumpWork.class);
   final ReplScope replScope;
   final ReplScope oldReplScope;
   final String dbNameOrPattern, astRepresentationForErrorMsg, resultTempPath;
   Long eventTo;
   Long eventFrom;
   static String testInjectDumpDir = null;
+  static boolean testDeletePreviousDumpMetaPath = false;
   private Integer maxEventLimit;
+  private transient Iterator<DirCopyWork> dirCopyIterator;
+  private transient Iterator<EximUtil.ManagedTableCopyPath> managedTableCopyPathIterator;
+  private Path currentDumpPath;
+  private List<String> resultValues;
 
   public static void injectNextDumpDirForTest(String dumpDir) {
     testInjectDumpDir = dumpDir;
   }
 
+  public static void testDeletePreviousDumpMetaPath(boolean failDeleteDumpMeta) {
+    testDeletePreviousDumpMetaPath = failDeleteDumpMeta;
+  }
+
   public ReplDumpWork(ReplScope replScope, ReplScope oldReplScope,
                       String astRepresentationForErrorMsg,
                       String resultTempPath) {
@@ -87,4 +109,64 @@ public class ReplDumpWork implements Serializable {
           .debug("eventTo not specified, using current event id : {}", eventTo);
     }
   }
+
+  public void setDirCopyIterator(Iterator<DirCopyWork> dirCopyIterator) {
+    if (this.dirCopyIterator != null) {
+      throw new IllegalStateException("Dir Copy iterator has already been initialized");
+    }
+    this.dirCopyIterator = dirCopyIterator;
+  }
+
+  public void setManagedTableCopyPathIterator(Iterator<EximUtil.ManagedTableCopyPath> managedTableCopyPathIterator) {
+    if (this.managedTableCopyPathIterator != null) {
+      throw new IllegalStateException("Managed table copy path iterator has already been initialized");
+    }
+    this.managedTableCopyPathIterator = managedTableCopyPathIterator;
+  }
+
+  public boolean tableDataCopyIteratorsInitialized() {
+    return dirCopyIterator != null || managedTableCopyPathIterator != null;
+  }
+
+  public Path getCurrentDumpPath() {
+    return currentDumpPath;
+  }
+
+  public void setCurrentDumpPath(Path currentDumpPath) {
+    this.currentDumpPath = currentDumpPath;
+  }
+
+  public List<String> getResultValues() {
+    return resultValues;
+  }
+
+  public void setResultValues(List<String> resultValues) {
+    this.resultValues = resultValues;
+  }
+
+  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+    List<Task<?>> tasks = new ArrayList<>();
+    while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) {
+      DirCopyWork dirCopyWork = dirCopyIterator.next();
+      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+      tasks.add(task);
+      tracker.addTask(task);
+      LOG.debug("added task for {}", dirCopyWork);
+    }
+    return tasks;
+  }
+
+  public List<Task<?>> managedTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+    List<Task<?>> tasks = new ArrayList<>();
+    while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
+      EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next();
+      Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
+              managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(),
+              managedTableCopyPath.getTargetPath(), conf, false);
+      tasks.add(copyTask);
+      tracker.addTask(copyTask);
+      LOG.debug("added task for {}", managedTableCopyPath);
+    }
+    return tasks;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
index c7aa007..fddee28 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -43,6 +43,7 @@ import java.io.StringWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -120,11 +121,12 @@ public final class ReplExternalTables {
     /**
      * this will dump a single line per external table. it can include additional lines for the same
      * table if the table is partitioned and the partition location is outside the table.
+     * It returns list of all the external table locations.
      */
-    void dataLocationDump(Table table)
-        throws InterruptedException, IOException, HiveException {
+    List<Path> dataLocationDump(Table table) throws InterruptedException, IOException, HiveException {
+      List<Path> extTableLocations = new LinkedList<>();
       if (!shouldWrite()) {
-        return;
+        return extTableLocations;
       }
       if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
         throw new IllegalArgumentException(
@@ -134,6 +136,7 @@ public final class ReplExternalTables {
       Path fullyQualifiedDataLocation =
           PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf));
       write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
+      extTableLocations.add(fullyQualifiedDataLocation);
       if (table.isPartitioned()) {
         List<Partition> partitions;
         try {
@@ -142,7 +145,7 @@ public final class ReplExternalTables {
           if (e.getCause() instanceof NoSuchObjectException) {
             // If table is dropped when dump in progress, just skip partitions data location dump
             LOG.debug(e.getMessage());
-            return;
+            return extTableLocations;
           }
           throw e;
         }
@@ -155,9 +158,11 @@ public final class ReplExternalTables {
             fullyQualifiedDataLocation = PathBuilder
                 .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf));
             write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
+            extTableLocations.add(fullyQualifiedDataLocation);
           }
         }
       }
+      return extTableLocations;
     }
 
     private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index e792bc5..3427b59 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.exec.repl;
 
 import com.google.common.collect.Collections2;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -44,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -126,16 +128,8 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       if (!iterator.hasNext() && constraintIterator.hasNext()) {
         loadingConstraint = true;
       }
-      while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()) ||
-              (work.getPathsToCopyIterator().hasNext())) && loadTaskTracker.canAddMoreTasks()) {
-        // First start the distcp tasks to copy the files related to external table. The distcp tasks should be
-        // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these
-        // directory with proper permission and owner.
-        if (work.getPathsToCopyIterator().hasNext()) {
-          scope.rootTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(loadTaskTracker));
-          break;
-        }
-
+      while ((iterator.hasNext() || (loadingConstraint && constraintIterator.hasNext()))
+              && loadTaskTracker.canAddMoreTasks()) {
         BootstrapEvent next;
         if (!loadingConstraint) {
           next = iterator.next();
@@ -252,8 +246,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
 
       boolean addAnotherLoadTask = iterator.hasNext()
           || loadTaskTracker.hasReplicationState()
-          || constraintIterator.hasNext()
-          || work.getPathsToCopyIterator().hasNext();
+          || constraintIterator.hasNext();
 
       if (addAnotherLoadTask) {
         createBuilderTask(scope.rootTasks);
@@ -262,8 +255,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       // Update last repl ID of the database only if the current dump is not incremental. If bootstrap
       // is combined with incremental dump, it contains only tables to bootstrap. So, needn't change
       // last repl ID of the database.
-      if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.getPathsToCopyIterator().hasNext()
-              && !work.isIncrementalLoad()) {
+      if (!iterator.hasNext() && !constraintIterator.hasNext() && !work.isIncrementalLoad()) {
         loadTaskTracker.update(updateDatabaseLastReplID(maxTasks, loadContext, scope));
         work.updateDbEventState(null);
       }
@@ -320,18 +312,17 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   }
 
   private void createReplLoadCompleteAckTask() {
-    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks()
-            && !work.getPathsToCopyIterator().hasNext())
-            || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks()
-            && !work.getPathsToCopyIterator().hasNext())) {
+    if ((work.isIncrementalLoad() && !work.incrementalLoadTasksBuilder().hasMoreWork() && !work.hasBootstrapLoadTasks())
+            || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
-      ReplLoadCompleteAckWork replLoadCompleteAckWork = new ReplLoadCompleteAckWork(work.dumpDirectory);
-      Task<ReplLoadCompleteAckWork> loadCompleteAckWorkTask = TaskFactory.get(replLoadCompleteAckWork, conf);
+      AckWork replLoadAckWork = new AckWork(
+              new Path(work.dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT));
+      Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf);
       if (this.childTasks.isEmpty()) {
-        this.childTasks.add(loadCompleteAckWorkTask);
+        this.childTasks.add(loadAckWorkTask);
       } else {
         DAGTraversal.traverse(this.childTasks,
-                new AddDependencyToLeaves(Collections.singletonList(loadCompleteAckWorkTask)));
+                new AddDependencyToLeaves(Collections.singletonList(loadAckWorkTask)));
       }
     }
   }
@@ -431,7 +422,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       IncrementalLoadTasksBuilder builder = work.incrementalLoadTasksBuilder();
 
       // If incremental events are already applied, then check and perform if need to bootstrap any tables.
-      if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) {
+      if (!builder.hasMoreWork() && work.isLastReplIDUpdated()) {
         if (work.hasBootstrapLoadTasks()) {
           LOG.debug("Current incremental dump have tables to be bootstrapped. Switching to bootstrap "
                   + "mode after applying all events.");
@@ -442,20 +433,13 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
       List<Task<?>> childTasks = new ArrayList<>();
       int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
 
-      // First start the distcp tasks to copy the files related to external table. The distcp tasks should be
-      // started first to avoid ddl task trying to create table/partition directory. Distcp task creates these
-      // directory with proper permission and owner.
       TaskTracker tracker = new TaskTracker(maxTasks);
-      if (work.getPathsToCopyIterator().hasNext()) {
-        childTasks.addAll(new ExternalTableCopyTaskBuilder(work, conf).tasks(tracker));
-      } else {
-        childTasks.add(builder.build(context, getHive(), LOG, tracker));
-      }
+      childTasks.add(builder.build(context, getHive(), LOG, tracker));
 
       // If there are no more events to be applied, add a task to update the last.repl.id of the
       // target database to the event id of the last event considered by the dump. Next
       // incremental cycle won't consider the events in this dump again if it starts from this id.
-      if (!builder.hasMoreWork() && !work.getPathsToCopyIterator().hasNext()) {
+      if (!builder.hasMoreWork()) {
         // The name of the database to be loaded into is either specified directly in REPL LOAD
         // command i.e. when dbNameToLoadIn has a valid dbname or is available through dump
         // metadata during table level replication.
@@ -484,14 +468,13 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
                   TaskFactory.get(new DDLWork(new HashSet<>(), new HashSet<>(), alterDbDesc), conf);
 
           DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(updateReplIdTask));
+          work.setLastReplIDUpdated(true);
           LOG.debug("Added task to set last repl id of db " + dbName + " to " + lastEventid);
         }
       }
 
-      // Either the incremental has more work or the external table file copy has more paths to process.
-      // Once all the incremental events are applied and external tables file copies are done, enable
-      // bootstrap of tables if exist.
-      if (builder.hasMoreWork() || work.getPathsToCopyIterator().hasNext() || work.hasBootstrapLoadTasks()) {
+      // Once all the incremental events are applied, enable bootstrap of tables if exist.
+      if (builder.hasMoreWork() || work.hasBootstrapLoadTasks()) {
         DAGTraversal.traverse(childTasks, new AddDependencyToLeaves(TaskFactory.get(work, conf)));
       }
       this.childTasks = childTasks;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 370c5ec..474d8c2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -34,9 +34,6 @@ import org.apache.hadoop.hive.ql.exec.Task;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
 
 @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
     Explain.Level.DEFAULT,
@@ -45,6 +42,7 @@ public class ReplLoadWork implements Serializable {
   final String dbNameToLoadIn;
   final ReplScope currentReplScope;
   final String dumpDirectory;
+  private boolean lastReplIDUpdated;
 
   private final ConstraintEventsIterator constraintsIterator;
   private int loadTaskRunCount = 0;
@@ -52,7 +50,6 @@ public class ReplLoadWork implements Serializable {
   private final transient BootstrapEventsIterator bootstrapIterator;
   private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder;
   private transient Task<?> rootTask;
-  private final transient Iterator<DirCopyWork> pathsToCopyIterator;
 
   /*
   these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -63,8 +60,7 @@ public class ReplLoadWork implements Serializable {
 
   public ReplLoadWork(HiveConf hiveConf, String dumpDirectory,
                       String dbNameToLoadIn, ReplScope currentReplScope,
-                      LineageState lineageState, boolean isIncrementalDump, Long eventTo,
-                      List<DirCopyWork> pathsToCopyIterator) throws IOException {
+                      LineageState lineageState, boolean isIncrementalDump, Long eventTo) throws IOException {
     sessionStateLineageState = lineageState;
     this.dumpDirectory = dumpDirectory;
     this.dbNameToLoadIn = dbNameToLoadIn;
@@ -99,7 +95,6 @@ public class ReplLoadWork implements Serializable {
       this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
       incrementalLoadTasksBuilder = null;
     }
-    this.pathsToCopyIterator = pathsToCopyIterator.iterator();
   }
 
   BootstrapEventsIterator bootstrapIterator() {
@@ -147,7 +142,11 @@ public class ReplLoadWork implements Serializable {
     this.rootTask = rootTask;
   }
 
-  public Iterator<DirCopyWork> getPathsToCopyIterator() {
-    return pathsToCopyIterator;
+  public boolean isLastReplIDUpdated() {
+    return lastReplIDUpdated;
+  }
+
+  public void setLastReplIDUpdated(boolean lastReplIDUpdated) {
+    this.lastReplIDUpdated = lastReplIDUpdated;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index e65cbf5..bc90ea1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -51,7 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.StandardCharsets;
@@ -158,37 +157,61 @@ public class EximUtil {
   }
 
   /**
-   * Wrapper class for mapping replication source and target path for copying data.
+   * Wrapper class for mapping source and target path for copying managed table data.
    */
-  public static class ReplPathMapping {
+  public static class ManagedTableCopyPath {
+    private ReplicationSpec replicationSpec;
+    private static boolean nullSrcPathForTest = false;
     private Path srcPath;
     private Path tgtPath;
 
-    public ReplPathMapping(Path srcPath, Path tgtPath) {
+    public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) {
+      this.replicationSpec = replicationSpec;
       if (srcPath == null) {
-        throw new IllegalArgumentException("Source Path can not be null.");
+        throw new IllegalArgumentException("Source path can not be null.");
       }
       this.srcPath = srcPath;
       if (tgtPath == null) {
-        throw new IllegalArgumentException("Target Path can not be null.");
+        throw new IllegalArgumentException("Target path can not be null.");
       }
       this.tgtPath = tgtPath;
     }
 
     public Path getSrcPath() {
+      if (nullSrcPathForTest) {
+        return null;
+      }
       return srcPath;
     }
 
-    public void setSrcPath(Path srcPath) {
-      this.srcPath = srcPath;
-    }
-
     public Path getTargetPath() {
       return tgtPath;
     }
 
-    public void setTargetPath(Path targetPath) {
-      this.tgtPath = targetPath;
+    @Override
+    public String toString() {
+      return "ManagedTableCopyPath{"
+              + "fullyQualifiedSourcePath=" + srcPath
+              + ", fullyQualifiedTargetPath=" + tgtPath
+              + '}';
+    }
+
+    public ReplicationSpec getReplicationSpec() {
+      return replicationSpec;
+    }
+
+    public void setReplicationSpec(ReplicationSpec replicationSpec) {
+      this.replicationSpec = replicationSpec;
+    }
+
+    /**
+     * To be used only for testing purpose.
+     * It has been used to make repl dump operation fail.
+     */
+    public static void setNullSrcPath(HiveConf conf, boolean aNullSrcPath) {
+      if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
+        nullSrcPathForTest = aNullSrcPath;
+      }
     }
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index a6b15bc..8802139 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
-import org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables;
 import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
@@ -43,22 +42,18 @@ import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.Base64;
 import java.util.List;
-import java.util.ArrayList;
 import java.util.Collections;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES;
-import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Reader;
-import static org.apache.hadoop.hive.ql.exec.repl.ExternalTableCopyTaskBuilder.DirCopyWork;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
@@ -400,8 +395,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
                 dmd.getReplScope(),
-                queryState.getLineageState(), evDump, dmd.getEventTo(),
-                dirLocationsToCopy(loadPath, evDump));
+                queryState.getLineageState(), evDump, dmd.getEventTo());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
       } else {
         LOG.warn("Previous Dump Already Loaded");
@@ -415,51 +409,30 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private Path getCurrentLoadPath() throws IOException, SemanticException {
     Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
             Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase()
-            .getBytes(StandardCharsets.UTF_8.name())));
+                    .getBytes(StandardCharsets.UTF_8.name())));
     final FileSystem fs = loadPathBase.getFileSystem(conf);
-
     // Make fully qualified path for further use.
     loadPathBase = fs.makeQualified(loadPathBase);
-
-    if (!fs.exists(loadPathBase)) {
-      // supposed dump path does not exist.
-      LOG.error("File not found " + loadPathBase.toUri().toString());
-      throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
-    }
-    FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
-    if (statuses.length > 0) {
-      //sort based on last modified. Recent one is at the beginning
-      FileStatus latestUpdatedStatus = statuses[0];
-      for (FileStatus status : statuses) {
-        if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) {
-          latestUpdatedStatus = status;
+    if (fs.exists(loadPathBase)) {
+      FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
+      if (statuses.length > 0) {
+        //sort based on last modified. Recent one is at the beginning
+        FileStatus latestUpdatedStatus = statuses[0];
+        for (FileStatus status : statuses) {
+          if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) {
+            latestUpdatedStatus = status;
+          }
+        }
+        Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR);
+        if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))
+                && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) {
+          return hiveDumpPath;
         }
-      }
-      Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR);
-      if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))
-              && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) {
-        return hiveDumpPath;
       }
     }
     return null;
   }
 
-  private List<DirCopyWork> dirLocationsToCopy(Path loadPath, boolean isIncrementalPhase)
-      throws HiveException, IOException {
-    List<DirCopyWork> list = new ArrayList<>();
-    String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
-    // this is done to remove any scheme related information that will be present in the base path
-    // specifically when we are replicating to cloud storage
-    Path basePath = new Path(baseDir);
-
-    for (String location : new Reader(conf, loadPath, isIncrementalPhase).sourceLocationsToCopy()) {
-      Path sourcePath = new Path(location);
-      Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath);
-      list.add(new DirCopyWork(sourcePath, targetPath));
-    }
-    return list;
-  }
-
   private void setConfigs(ASTNode node) throws SemanticException {
     Map<String, String> replConfigs = getProps(node);
     if (null != replConfigs) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 454998f..c3b1081 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping;
+import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
@@ -75,10 +75,10 @@ class PartitionExport {
     this.callersSession = SessionState.get();
   }
 
-  List<ReplPathMapping> write(final ReplicationSpec forReplicationSpec, boolean isExportTask)
+  List<ManagedTableCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask)
           throws InterruptedException, HiveException {
     List<Future<?>> futures = new LinkedList<>();
-    List<ReplPathMapping> replCopyPathMappings = new LinkedList<>(); //Collections.synchronizedList(new LinkedList<>());
+    List<ManagedTableCopyPath> managedTableCopyPaths = new LinkedList<>();
     ExecutorService producer = Executors.newFixedThreadPool(1,
         new ThreadFactoryBuilder().setNameFormat("partition-submitter-thread-%d").build());
     futures.add(producer.submit(() -> {
@@ -122,7 +122,8 @@ class PartitionExport {
           new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx)
                   .export(isExportTask);
           LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
-          return new ReplPathMapping(partition.getDataLocation(), new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME));
+          return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(),
+                  new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME));
         } catch (Exception e) {
           throw new RuntimeException(e.getMessage(), e);
         }
@@ -133,8 +134,8 @@ class PartitionExport {
       try {
         Object retVal =  future.get();
         if (retVal != null) {
-          ReplPathMapping replPathMapping = (ReplPathMapping)retVal;
-          replCopyPathMappings.add(replPathMapping);
+          ManagedTableCopyPath managedTableCopyPath = (ManagedTableCopyPath) retVal;
+          managedTableCopyPaths.add(managedTableCopyPath);
         }
       } catch (Exception e) {
         LOG.error("failed", e.getCause());
@@ -143,6 +144,6 @@ class PartitionExport {
     }
     // may be drive this via configuration as well.
     consumer.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
-    return replCopyPathMappings;
+    return managedTableCopyPaths;
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index a26b159..683f3c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -33,7 +33,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ReplPathMapping;
+import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
@@ -47,7 +47,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -97,8 +96,8 @@ public class TableExport {
     this.mmCtx = mmCtx;
   }
 
-  public List<ReplPathMapping> write(boolean isExportTask) throws SemanticException {
-    List<ReplPathMapping> replPathMappings = Collections.emptyList();
+  public List<ManagedTableCopyPath> write(boolean isExportTask) throws SemanticException {
+    List<ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
     if (tableSpec == null) {
       writeMetaData(null);
     } else if (shouldExport()) {
@@ -106,12 +105,12 @@ public class TableExport {
       writeMetaData(withPartitions);
       if (!replicationSpec.isMetadataOnly()
               && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) {
-        replPathMappings = writeData(withPartitions, isExportTask);
+        managedTableCopyPaths = writeData(withPartitions, isExportTask);
       }
     } else if (isExportTask) {
       throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg());
     }
-    return replPathMappings;
+    return managedTableCopyPaths;
   }
 
   private PartitionIterable getPartitions() throws SemanticException {
@@ -162,24 +161,26 @@ public class TableExport {
     }
   }
 
-  private List<ReplPathMapping> writeData(PartitionIterable partitions, boolean isExportTask) throws SemanticException {
-    List<ReplPathMapping> replCopyPathMappings = new LinkedList<>();
+  private List<ManagedTableCopyPath> writeData(PartitionIterable partitions, boolean isExportTask)
+          throws SemanticException {
+    List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
     try {
       if (tableSpec.tableHandle.isPartitioned()) {
         if (partitions == null) {
           throw new IllegalStateException("partitions cannot be null for partitionTable :"
               + tableSpec.getTableName().getTable());
         }
-        replCopyPathMappings = new PartitionExport(
+        managedTableCopyPaths = new PartitionExport(
                 paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask);
       } else {
         List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
                 replicationSpec, conf);
-        replCopyPathMappings.add(new ReplPathMapping(tableSpec.tableHandle.getDataLocation(), paths.dataExportDir()));
+        managedTableCopyPaths.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(),
+                paths.dataExportDir()));
         new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx)
                 .export(isExportTask);
       }
-      return replCopyPathMappings;
+      return managedTableCopyPaths;
     } catch (Exception e) {
       throw new SemanticException(e.getMessage(), e);
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 2651ea4..81fac25 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.junit.Test;
@@ -127,13 +128,15 @@ public class TestReplDumpTask {
       private int tableDumpCount = 0;
 
       @Override
-      void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path replDataDir,
-                     long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple)
+      List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot,
+                                               Path replDataDir, long lastReplId, Hive hiveDb,
+                                               HiveWrapper.Tuple<Table> tuple)
           throws Exception {
         tableDumpCount++;
         if (tableDumpCount > 1) {
           throw new TestException();
         }
+        return Collections.emptyList();
       }
     };