You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/07/21 10:24:23 UTC

[hive] branch master updated: HIVE-23069:Memory efficient iterator should be used during replication.(Pravin Kumar Sinha, reviewed by Aasha Medhi)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d13f23  HIVE-23069:Memory efficient iterator should be used during replication.(Pravin Kumar Sinha, reviewed by Aasha Medhi)
6d13f23 is described below

commit 6d13f23fd5458575c47bd8ef943a69e1e1899f31
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Tue Jul 21 15:54:11 2020 +0530

    HIVE-23069:Memory efficient iterator should be used during replication.(Pravin Kumar Sinha, reviewed by Aasha Medhi)
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +
 .../hive/ql/parse/TestReplicationScenarios.java    | 160 ++++++++++-
 .../parse/TestReplicationScenariosAcidTables.java  |  49 ++++
 .../TestReplicationScenariosExternalTables.java    | 115 ++++++++
 .../org/apache/hadoop/hive/ql/exec/ExportTask.java |   2 +-
 .../hadoop/hive/ql/exec/repl/DirCopyWork.java      |  27 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 317 +++++++++++----------
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java     |  44 ++-
 .../hive/ql/exec/repl/ReplExternalTables.java      |  22 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |  20 ++
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |  35 +++
 .../repl/bootstrap/load/table/LoadPartitions.java  |   4 +-
 .../exec/repl/bootstrap/load/table/LoadTable.java  |   5 +-
 .../hadoop/hive/ql/exec/repl/util/FileList.java    | 175 ++++++++++++
 .../hive/ql/exec/repl/util/FileListStreamer.java   | 141 +++++++++
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |   2 -
 .../ql/exec/repl/util/StringConvertibleObject.java |  26 ++
 .../org/apache/hadoop/hive/ql/parse/EximUtil.java  |  30 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java      |   6 +-
 .../hadoop/hive/ql/parse/ReplicationSpec.java      |  15 +-
 .../hive/ql/parse/repl/dump/PartitionExport.java   |  23 +-
 .../hive/ql/parse/repl/dump/TableExport.java       |  27 +-
 .../repl/dump/events/AbstractEventHandler.java     |  19 ++
 .../repl/dump/events/AddPartitionHandler.java      |  14 +-
 .../parse/repl/dump/events/CommitTxnHandler.java   |  22 +-
 .../parse/repl/dump/events/CreateTableHandler.java |  24 +-
 .../ql/parse/repl/dump/events/InsertHandler.java   |  20 +-
 .../hive/ql/parse/repl/dump/io/FileOperations.java |  94 +++++-
 .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java |  15 +-
 .../hive/ql/exec/repl/util/TestFileList.java       | 164 +++++++++++
 .../hadoop/hive/metastore/ReplChangeManager.java   |   7 +
 31 files changed, 1372 insertions(+), 257 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index d9e3681..2f3ee46 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -524,6 +524,11 @@ public class HiveConf extends Configuration {
             + "task increment that would cross the specified limit."),
     REPL_PARTITIONS_DUMP_PARALLELISM("hive.repl.partitions.dump.parallelism",100,
         "Number of threads that will be used to dump partition data information during repl dump."),
+    REPL_DATA_COPY_LAZY("hive.repl.data.copy.lazy", false,
+            "Indicates whether replication should run data copy tasks during repl load operation."),
+    REPL_FILE_LIST_CACHE_SIZE("hive.repl.file.list.cache.size", 10000,
+        "This config indicates threshold for the maximum number of data copy locations to be kept in memory. \n"
+                + "When the config 'hive.repl.data.copy.lazy' is set to true, this config is not considered."),
     REPL_DUMPDIR_CLEAN_FREQ("hive.repl.dumpdir.clean.freq", "0s",
         new TimeValidator(TimeUnit.SECONDS),
         "Frequency at which timer task runs to purge expired dump dirs."),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 169fed8..66b0d07 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -256,15 +257,32 @@ public class TestReplicationScenarios {
     return incrementalLoadAndVerify(dbName, replDbName);
   }
 
+  private Tuple bootstrapLoadAndVerify(String dbName, String replDbName, List<String> withClauseOptions)
+          throws IOException {
+    return incrementalLoadAndVerify(dbName, replDbName, withClauseOptions);
+  }
+
   private Tuple incrementalLoadAndVerify(String dbName, String replDbName) throws IOException {
     Tuple dump = replDumpDb(dbName);
     loadAndVerify(replDbName, dbName, dump.lastReplId);
     return dump;
   }
 
+  private Tuple incrementalLoadAndVerify(String dbName, String replDbName, List<String> withClauseOptions)
+          throws IOException {
+    Tuple dump = replDumpDb(dbName, withClauseOptions);
+    loadAndVerify(replDbName, dbName, dump.lastReplId, withClauseOptions);
+    return dump;
+  }
+
   private Tuple replDumpDb(String dbName) throws IOException {
+    return replDumpDb(dbName, null);
+  }
+
+  private Tuple replDumpDb(String dbName, List<String> withClauseOptions) throws IOException {
+    String withClause = getWithClause(withClauseOptions);
     advanceDumpDir();
-    String dumpCmd = "REPL DUMP " + dbName;
+    String dumpCmd = "REPL DUMP " + dbName + withClause;
     run(dumpCmd, driver);
     String dumpLocation = getResult(0, 0, driver);
     String lastReplId = getResult(0, 1, true, driver);
@@ -272,8 +290,21 @@ public class TestReplicationScenarios {
     return new Tuple(dumpLocation, lastReplId);
   }
 
+  private String getWithClause(List<String> withClauseOptions) {
+    if (withClauseOptions != null && !withClauseOptions.isEmpty()) {
+      return  " with (" + StringUtils.join(withClauseOptions, ",") + ")";
+    }
+    return "";
+  }
+
   private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId) throws IOException {
-    run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName, driverMirror);
+    loadAndVerify(replDbName, sourceDbNameOrPattern, lastReplId, null);
+  }
+
+  private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId,
+                             List<String> withClauseOptions) throws IOException {
+    String withClause = getWithClause(withClauseOptions);
+    run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName + withClause, driverMirror);
     verifyRun("REPL STATUS " + replDbName, lastReplId, driverMirror);
     return;
   }
@@ -547,6 +578,61 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testBasicWithCMLazyCopy() throws Exception {
+    String name = testName.getMethodName();
+    String dbName = createDB(name, driver);
+    String replDbName = dbName + "_dupe";
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH, name + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, name + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, name + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned", unptn_data, driver);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptn_data_1, driver);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptn_data_2, driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driver);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driver);
+
+    String lazyCopyClause = " with ('" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname  + "'='true')";
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + lazyCopyClause, driver);
+    String replDumpLocn = getResult(0,0, driver);
+    String replDumpId = getResult(0,1,true, driver);
+
+    // Table dropped after "repl dump"
+    run("DROP TABLE " + dbName + ".unptned", driver);
+
+    // Partition droppped after "repl dump"
+    run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver);
+
+    run("REPL LOAD " + dbName + " INTO " + replDbName + lazyCopyClause, driverMirror);
+    verifyRun("REPL STATUS " + replDbName, new String[] {replDumpId}, driverMirror);
+
+    verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptn_data_2, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned_empty", empty, driverMirror);
+    verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror);
+  }
+
+  @Test
   public void testBootstrapLoadOnExistingDb() throws IOException {
     String testName = "bootstrapLoadOnExistingDb";
     String dbName = createDB(testName, driver);
@@ -1560,6 +1646,76 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testIncrementalLoadLazyCopy() throws IOException {
+    String testName = "testIncrementalLoadLazyCopy";
+    String dbName = createDB(testName, driver);
+    String replDbName = dbName + "_dupe";
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE", driver);
+    run("CREATE TABLE " + dbName
+            + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE", driver);
+
+    List<String> lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName, lazyCopyClause);
+
+    String[] unptnData = new String[] {"eleven", "twelve"};
+    String[] ptnData1 = new String[] {"thirteen", "fourteen", "fifteen"};
+    String[] ptnData2 = new String[] {"fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[] {};
+
+    String unptnLocn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath();
+    String ptnLocn1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath();
+    String ptnLocn2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptnLocn, unptnData);
+    createTestDataFile(ptnLocn1, ptnData1);
+    createTestDataFile(ptnLocn2, ptnData2);
+
+    verifySetup("SELECT a from " + dbName + ".ptned_empty", empty, driverMirror);
+    verifySetup("SELECT * from " + dbName + ".unptned_empty", empty, driverMirror);
+
+    run("LOAD DATA LOCAL INPATH '" + unptnLocn + "' OVERWRITE INTO TABLE " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned", unptnData, driver);
+    run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned", driver);
+    run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned", driver);
+    verifySetup("SELECT * from " + dbName + ".unptned_late", unptnData, driver);
+
+    Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName, lazyCopyClause);
+
+    Path hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    FileSystem fs = FileSystem.get(hiveDumpDir.toUri(), hconf);
+    verifyRun("SELECT * from " + replDbName + ".unptned_late", unptnData, driverMirror);
+
+    run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=1)", driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn1 + "' OVERWRITE INTO TABLE " + dbName
+            + ".ptned PARTITION(b=1)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=1", ptnData1, driver);
+    run("LOAD DATA LOCAL INPATH '" + ptnLocn2 + "' OVERWRITE INTO TABLE " + dbName
+            + ".ptned PARTITION(b=2)", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned WHERE b=2", ptnData2, driver);
+
+    run("CREATE TABLE " + dbName
+            + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
+            + ".ptned WHERE b=1", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=1", ptnData1, driver);
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
+            + ".ptned WHERE b=2", driver);
+    verifySetup("SELECT a from " + dbName + ".ptned_late WHERE b=2", ptnData2, driver);
+
+    incrementalDump = incrementalLoadAndVerify(dbName, replDbName);
+    hiveDumpDir = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+
+    verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=1", ptnData1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned_late WHERE b=2", ptnData2, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=2", ptnData2, driverMirror);
+  }
+
+  @Test
   public void testIncrementalInserts() throws IOException {
     String testName = "incrementalInserts";
     String dbName = createDB(testName, driver);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index bf8a00d..cd48d4b 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -1805,6 +1805,55 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   }
 
   @Test
+  public void testManagedTableLazyCopy() throws Throwable {
+    List<String> withClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .run("insert into t2 values (11)")
+            .run("insert into t2 values (12)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1", "2"})
+            .run("select * from t2")
+            .verifyResults(new String[]{"11", "12"})
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2"});
+
+    primary.run("use " + primaryDbName)
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (13)")
+            .run("create table t3(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t4(a string) clustered by (a) into 2 buckets" +
+                    " stored as orc TBLPROPERTIES ('transactional'='true')")
+            .run("insert into t3 values (21)")
+            .run("insert into t4 values (31)")
+            .run("insert into t4 values (32)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1", "2", "3"})
+            .run("select * from t2")
+            .verifyResults(new String[]{"11", "12", "13"})
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2", "t3", "t4"})
+            .run("select * from t3")
+            .verifyResults(new String[]{"21"})
+            .run("select * from t4")
+            .verifyResults(new String[]{"31","32"});
+  }
+
+  @Test
   public void testCheckPointingWithSourceTableDeleted() throws Throwable {
     //To force distcp copy
     List<String> dumpClause = Arrays.asList(
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 493a467..cc1701f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -210,6 +210,121 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true);
   }
 
+  @Test
+  public void externalTableReplicationWithDefaultPathsLazyCopy() throws Throwable {
+    List<String> lazyCopyClause = Arrays.asList("'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'");
+    //creates external tables with partitions
+    WarehouseInstance.Tuple tuple = primary
+            .run("use " + primaryDbName)
+            .run("create external table t1 (id int)")
+            .run("insert into table t1 values (1)")
+            .run("insert into table t1 values (2)")
+            .run("create external table t2 (place string) partitioned by (country string)")
+            .run("insert into table t2 partition(country='india') values ('bangalore')")
+            .run("insert into table t2 partition(country='us') values ('austin')")
+            .run("insert into table t2 partition(country='france') values ('paris')")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for bootstrap
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false);
+
+
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't1'")
+            .verifyResult("t1")
+            .run("show tables like 't2'")
+            .verifyResult("t2")
+            .run("repl status " + replicatedDbName)
+            .verifyResult(tuple.lastReplicationId)
+            .run("select country from t2 where country = 'us'")
+            .verifyResult("us")
+            .run("select country from t2 where country = 'france'")
+            .verifyResult("france")
+            .run("show partitions t2").verifyResults(new String[] {"country=france", "country=india", "country=us"});
+
+    String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    // Ckpt should be set on bootstrapped db.
+    replica.verifyIfCkptSet(replicatedDbName, hiveDumpLocation);
+
+    assertTablePartitionLocation(primaryDbName + ".t1", replicatedDbName + ".t1");
+    assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
+
+    tuple = primary.run("use " + primaryDbName)
+            .run("create external table t3 (id int)")
+            .run("insert into table t3 values (10)")
+            .run("create external table t4 as select id from t3")
+            .dump(primaryDbName, lazyCopyClause);
+
+    // verify that the external table info is written correctly for incremental
+    assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true);
+
+    replica.load(replicatedDbName, primaryDbName, lazyCopyClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 't3'")
+            .verifyResult("t3")
+            .run("select id from t3")
+            .verifyResult("10")
+            .run("select id from t4")
+            .verifyResult("10");
+  }
+
+  @Test
+  public void externalTableReplicationWithCustomPathsLazyCopy() throws Throwable {
+    Path externalTableLocation =
+            new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/" + "a/");
+    DistributedFileSystem fs = primary.miniDFSCluster.getFileSystem();
+    fs.mkdirs(externalTableLocation, new FsPermission("777"));
+
+    List<String> withClause = Arrays.asList(
+            "'distcp.options.update'=''",
+            "'" + HiveConf.ConfVars.REPL_DATA_COPY_LAZY.varname + "'='true'"
+    );
+
+    primary.run("use " + primaryDbName)
+            .run("create external table a (i int, j int) "
+                    + "row format delimited fields terminated by ',' "
+                    + "location '" + externalTableLocation.toUri() + "'")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("show tables like 'a'")
+            .verifyResults(Collections.singletonList("a"))
+            .run("select * From a").verifyResults(Collections.emptyList());
+
+    assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
+
+    //externally add data to location
+    try (FSDataOutputStream outputStream =
+                 fs.create(new Path(externalTableLocation, "file1.txt"))) {
+      outputStream.write("1,2\n".getBytes());
+      outputStream.write("13,21\n".getBytes());
+    }
+    WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("select i From a")
+            .verifyResults(new String[] { "1", "13" })
+            .run("select j from a")
+            .verifyResults(new String[] { "2", "21" });
+
+    // alter table location to something new.
+    externalTableLocation =
+            new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
+    primary.run("use " + primaryDbName)
+            .run("alter table a set location '" + externalTableLocation + "'")
+            .dump(primaryDbName, withClause);
+
+    replica.load(replicatedDbName, primaryDbName, withClause)
+            .run("use " + replicatedDbName)
+            .run("select i From a")
+            .verifyResults(Collections.emptyList());
+    assertTablePartitionLocation(primaryDbName + ".a", replicatedDbName + ".a");
+  }
+
   /**
    * @param sourceTableName  -- Provide the fully qualified table name
    * @param replicaTableName -- Provide the fully qualified table name
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
index d3e9413..b81d961 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
@@ -50,7 +50,7 @@ public class ExportTask extends Task<ExportWork> implements Serializable {
       work.acidPostProcess(db);
       TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(),
           work.getReplicationSpec(), db, null, conf, work.getMmContext());
-      tableExport.write(true);
+      tableExport.write(true, null, false);
     } catch (Exception e) {
       LOG.error("failed", e);
       setException(e);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
index 46f9bb3..04bbd56 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.exec.repl;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.repl.util.StringConvertibleObject;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import java.io.Serializable;
 
@@ -27,10 +28,14 @@ import java.io.Serializable;
 @Explain(displayName = "HDFS Copy Operator", explainLevels = { Explain.Level.USER,
         Explain.Level.DEFAULT,
         Explain.Level.EXTENDED })
-public class DirCopyWork implements Serializable {
+public class DirCopyWork implements Serializable, StringConvertibleObject {
+  private static final String URI_SEPARATOR = "#";
   private static final long serialVersionUID = 1L;
-  private final Path fullyQualifiedSourcePath;
-  private final Path fullyQualifiedTargetPath;
+  private Path fullyQualifiedSourcePath;
+  private Path fullyQualifiedTargetPath;
+
+  public DirCopyWork() {
+  }
 
   public DirCopyWork(Path fullyQualifiedSourcePath, Path fullyQualifiedTargetPath) {
     this.fullyQualifiedSourcePath = fullyQualifiedSourcePath;
@@ -51,4 +56,20 @@ public class DirCopyWork implements Serializable {
   public Path getFullyQualifiedTargetPath() {
     return fullyQualifiedTargetPath;
   }
+
+  @Override
+  public String convertToString() {
+    StringBuilder objInStr = new StringBuilder();
+    objInStr.append(fullyQualifiedSourcePath)
+            .append(URI_SEPARATOR)
+            .append(fullyQualifiedTargetPath);
+    return objInStr.toString();
+  }
+
+  @Override
+  public void loadFromString(String objectInStr) {
+    String paths[] = objectInStr.split(URI_SEPARATOR);
+    this.fullyQualifiedSourcePath = new Path(paths[0]);
+    this.fullyQualifiedTargetPath = new Path(paths[1]);
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 68cc69a..8d76557 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
@@ -63,7 +64,6 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.metadata.events.EventUtils;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
-import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -85,6 +85,7 @@ import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.parse.repl.metric.event.Status;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -471,9 +472,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     String validTxnList = null;
     long waitUntilTime = 0;
     long bootDumpBeginReplId = -1;
-    List<EximUtil.ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
-    List<DirCopyWork> extTableCopyWorks = Collections.emptyList();
+
     List<String> tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
+
     // If we are bootstrapping ACID tables, we need to perform steps similar to a regular
     // bootstrap (See bootstrapDump() for more details. Only difference here is instead of
     // waiting for the concurrent transactions to finish, we start dumping the incremental events
@@ -546,63 +547,75 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       dmd.setReplScope(work.replScope);
     }
     dmd.write(true);
-    // Examine all the tables if required.
-    if (shouldExamineTablesToDump() || (tableList != null)) {
-      // If required wait more for any transactions open at the time of starting the ACID bootstrap.
-      if (needBootstrapAcidTablesDuringIncrementalDump()) {
-        assert (waitUntilTime > 0);
-        validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
-      }
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
+    try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize);
+         FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) {
+      // Examine all the tables if required.
+      if (shouldExamineTablesToDump() || (tableList != null)) {
+        // If required wait more for any transactions open at the time of starting the ACID bootstrap.
+        if (needBootstrapAcidTablesDuringIncrementalDump()) {
+          assert (waitUntilTime > 0);
+          validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
+        }
       /* When same dump dir is resumed because of check-pointing, we need to clear the existing metadata.
       We need to rewrite the metadata as the write id list will be changed.
       We can't reuse the previous write id as it might be invalid due to compaction. */
-      Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
-      Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME);
-      FileSystem fs = FileSystem.get(metadataPath.toUri(), conf);
-      try {
-        fs.delete(metadataPath, true);
-      } catch (FileNotFoundException e) {
-        // no worries
-      }
-      Path dbRootMetadata = new Path(metadataPath, dbName);
-      Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName);
-      managedTableCopyPaths = new ArrayList<>();
-      List<Path> extTableLocations = new LinkedList<>();
-      try (Writer writer = new Writer(dumpRoot, conf)) {
-        for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
-          try {
-            Table table = hiveDb.getTable(dbName, tableName);
-
-            // Dump external table locations if required.
-            if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
-                  && shouldDumpExternalTableLocation()) {
-              extTableLocations.addAll(writer.dataLocationDump(table));
-            }
-
-            // Dump the table to be bootstrapped if required.
-            if (shouldBootstrapDumpTable(table)) {
-              HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
-              managedTableCopyPaths.addAll(
-                      dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId,
-                              hiveDb, tableTuple));
-            }
-            if (tableList != null && isTableSatifiesConfig(table)) {
-              tableList.add(tableName);
+        Path bootstrapRoot = new Path(dumpRoot, ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME);
+        Path metadataPath = new Path(bootstrapRoot, EximUtil.METADATA_PATH_NAME);
+        FileSystem fs = FileSystem.get(metadataPath.toUri(), conf);
+        try {
+          fs.delete(metadataPath, true);
+        } catch (FileNotFoundException e) {
+          // no worries
+        }
+        Path dbRootMetadata = new Path(metadataPath, dbName);
+        Path dbRootData = new Path(bootstrapRoot, EximUtil.DATA_PATH_NAME + File.separator + dbName);
+        boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+        try (Writer writer = new Writer(dumpRoot, conf)) {
+          for (String tableName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
+            try {
+              Table table = hiveDb.getTable(dbName, tableName);
+
+              // Dump external table locations if required.
+              if (TableType.EXTERNAL_TABLE.equals(table.getTableType())
+                      && shouldDumpExternalTableLocation()) {
+                writer.dataLocationDump(table, extTableFileList, conf);
+              }
+
+              // Dump the table to be bootstrapped if required.
+              if (shouldBootstrapDumpTable(table)) {
+                HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
+                dumpTable(dbName, tableName, validTxnList, dbRootMetadata, dbRootData, bootDumpBeginReplId,
+                        hiveDb, tableTuple, managedTblList, dataCopyAtLoad);
+              }
+              if (tableList != null && isTableSatifiesConfig(table)) {
+                tableList.add(tableName);
+              }
+            } catch (InvalidTableException te) {
+              // Repl dump shouldn't fail if the table is dropped/renamed while dumping it.
+              // Just log a debug message and skip it.
+              LOG.debug(te.getMessage());
             }
-          } catch (InvalidTableException te) {
-            // Repl dump shouldn't fail if the table is dropped/renamed while dumping it.
-            // Just log a debug message and skip it.
-            LOG.debug(te.getMessage());
           }
         }
+        dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
       }
-      dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
-      extTableCopyWorks = dirLocationsToCopy(extTableLocations);
+      setDataCopyIterators(extTableFileList, managedTblList);
+      work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId);
+      return lastReplId;
+    }
+  }
+
+  private void setDataCopyIterators(FileList extTableFileList, FileList managedTableFileList) {
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    if (dataCopyAtLoad) {
+      work.setManagedTableCopyPathIterator(Collections.<String>emptyList().iterator());
+      work.setExternalTblCopyPathIterator(Collections.<String>emptyList().iterator());
+      LOG.info("Deferring table/partition data copy during dump. It should be done at load.");
+    } else {
+      work.setManagedTableCopyPathIterator(managedTableFileList);
+      work.setExternalTblCopyPathIterator(extTableFileList);
     }
-    work.setDirCopyIterator(extTableCopyWorks.iterator());
-    work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
-    work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, lastReplId);
-    return lastReplId;
   }
 
   private ReplicationMetricCollector initMetricCollection(boolean isBootstrap, Path dumpRoot) {
@@ -756,23 +769,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     LOG.info("Table list file " + tableListFile.toUri() + " is created for table list - " + tableList);
   }
 
-  private List<DirCopyWork> dirLocationsToCopy(List<Path> sourceLocations)
-          throws HiveException {
-    if (sourceLocations.isEmpty()) {
-      return Collections.emptyList();
-    }
-    List<DirCopyWork> list = new ArrayList<>(sourceLocations.size());
-    String baseDir = conf.get(HiveConf.ConfVars.REPL_EXTERNAL_TABLE_BASE_DIR.varname);
-    // this is done to remove any scheme related information that will be present in the base path
-    // specifically when we are replicating to cloud storage
-    Path basePath = ReplExternalTables.getExternalTableBaseDir(conf);
-    for (Path sourcePath : sourceLocations) {
-      Path targetPath = ReplExternalTables.externalTableDataPath(conf, basePath, sourcePath);
-      list.add(new DirCopyWork(sourcePath, targetPath));
-    }
-    return list;
-  }
-
   Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot, Hive hiveDb)
           throws Exception {
     // bootstrap case
@@ -784,8 +780,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     List<String> tableList;
 
     LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern);
-    List<DirCopyWork> extTableCopyWorks = new ArrayList<>();
-    List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
     long timeoutInMs = HiveConf.getTimeVar(conf,
             HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
     long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
@@ -796,95 +790,103 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       //We can't reuse the previous write id as it might be invalid due to compaction
       metadataPath.getFileSystem(conf).delete(metadataPath, true);
     }
-    for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
-      LOG.debug("Dumping db: " + dbName);
-      // TODO : Currently we don't support separate table list for each database.
-      tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
-      Database db = hiveDb.getDatabase(dbName);
-      if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) {
-        // For replicated (target) database, until after first successful incremental load, the database will not be
-        // in a consistent state. Avoid allowing replicating this database to a new target.
-        throw new HiveException("Replication dump not allowed for replicated database" +
-                " with first incremental dump pending : " + dbName);
-      }
-      int estimatedNumTables = Utils.getAllTables(hiveDb, dbName, work.replScope).size();
-      int estimatedNumFunctions = hiveDb.getAllFunctions().size();
-      replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
-              estimatedNumTables,
-              estimatedNumFunctions);
-      replLogger.startLog();
-      Map<String, Long> metricMap = new HashMap<>();
-      metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) estimatedNumTables);
-      metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) estimatedNumFunctions);
-      work.getMetricCollector().reportStageStart(getName(), metricMap);
-      Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb);
-      Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName);
-      dumpFunctionMetadata(dbName, dbRoot, hiveDb);
-
-      String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
-      Exception caught = null;
-      try (Writer writer = new Writer(dbRoot, conf)) {
-        List<Path> extTableLocations = new LinkedList<>();
-        for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
-          LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri());
-          Table table = null;
-
-          try {
-            HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf);
-            table = tableTuple != null ? tableTuple.object : null;
-            if (shouldDumpExternalTableLocation()
-                    && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) {
-              LOG.debug("Adding table {} to external tables list", tblName);
-              extTableLocations.addAll(writer.dataLocationDump(tableTuple.object));
+    int cacheSize = conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE);
+    try (FileList managedTblList = createTableFileList(dumpRoot, EximUtil.FILE_LIST, cacheSize);
+         FileList extTableFileList = createTableFileList(dumpRoot, EximUtil.FILE_LIST_EXTERNAL, cacheSize)) {
+      for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
+        LOG.debug("Dumping db: " + dbName);
+        // TODO : Currently we don't support separate table list for each database.
+        tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
+        Database db = hiveDb.getDatabase(dbName);
+        if ((db != null) && (ReplUtils.isFirstIncPending(db.getParameters()))) {
+          // For replicated (target) database, until after first successful incremental load, the database will not be
+          // in a consistent state. Avoid allowing replicating this database to a new target.
+          throw new HiveException("Replication dump not allowed for replicated database" +
+                  " with first incremental dump pending : " + dbName);
+        }
+        int estimatedNumTables = Utils.getAllTables(hiveDb, dbName, work.replScope).size();
+        int estimatedNumFunctions = hiveDb.getAllFunctions().size();
+        replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
+                estimatedNumTables,
+                estimatedNumFunctions);
+        replLogger.startLog();
+        Map<String, Long> metricMap = new HashMap<>();
+        metricMap.put(ReplUtils.MetricName.TABLES.name(), (long) estimatedNumTables);
+        metricMap.put(ReplUtils.MetricName.FUNCTIONS.name(), (long) estimatedNumFunctions);
+        work.getMetricCollector().reportStageStart(getName(), metricMap);
+        Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb);
+        Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName);
+        dumpFunctionMetadata(dbName, dbRoot, hiveDb);
+
+        String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
+        Exception caught = null;
+        try (Writer writer = new Writer(dbRoot, conf)) {
+          List<Path> extTableLocations = new LinkedList<>();
+          for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.replScope)) {
+            LOG.debug("Dumping table: " + tblName + " to db root " + dbRoot.toUri());
+            Table table = null;
+
+            try {
+              HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(tblName, conf);
+              table = tableTuple != null ? tableTuple.object : null;
+              if (shouldDumpExternalTableLocation()
+                      && TableType.EXTERNAL_TABLE.equals(tableTuple.object.getTableType())) {
+                LOG.debug("Adding table {} to external tables list", tblName);
+                writer.dataLocationDump(tableTuple.object, extTableFileList, conf);
+              }
+              boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+              dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot,
+                      bootDumpBeginReplId,
+                      hiveDb, tableTuple, managedTblList, dataCopyAtLoad);
+            } catch (InvalidTableException te) {
+              // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
+              // Just log a debug message and skip it.
+              LOG.debug(te.getMessage());
+            }
+            dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb);
+            if (tableList != null && isTableSatifiesConfig(table)) {
+              tableList.add(tblName);
             }
-            managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot,
-                    bootDumpBeginReplId,
-                    hiveDb, tableTuple));
-          } catch (InvalidTableException te) {
-            // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
-            // Just log a debug message and skip it.
-            LOG.debug(te.getMessage());
-          }
-          dumpConstraintMetadata(dbName, tblName, dbRoot, hiveDb);
-          if (tableList != null && isTableSatifiesConfig(table)) {
-            tableList.add(tblName);
           }
-        }
-        dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
-        extTableCopyWorks = dirLocationsToCopy(extTableLocations);
-      } catch (Exception e) {
-        caught = e;
-      } finally {
-        try {
-          Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
+          dumpTableListToDumpLocation(tableList, dumpRoot, dbName, conf);
         } catch (Exception e) {
-          if (caught == null) {
-            throw e;
-          } else {
-            LOG.error("failed to reset the db state for " + uniqueKey
-                    + " on failure of repl dump", e);
+          caught = e;
+        } finally {
+          try {
+            Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
+          } catch (Exception e) {
+            if (caught == null) {
+              throw e;
+            } else {
+              LOG.error("failed to reset the db state for " + uniqueKey
+                      + " on failure of repl dump", e);
+              throw caught;
+            }
+          }
+          if (caught != null) {
             throw caught;
           }
         }
-        if (caught != null) {
-          throw caught;
-        }
+        replLogger.endLog(bootDumpBeginReplId.toString());
+        work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId);
       }
-      replLogger.endLog(bootDumpBeginReplId.toString());
-      work.getMetricCollector().reportStageEnd(getName(), Status.SUCCESS, bootDumpBeginReplId);
+      Long bootDumpEndReplId = currentNotificationId(hiveDb);
+      LOG.info("Preparing to return {},{}->{}",
+              dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+      long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
+      dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId);
+      dmd.write(true);
+      setDataCopyIterators(extTableFileList, managedTblList);
+      return bootDumpBeginReplId;
     }
-    Long bootDumpEndReplId = currentNotificationId(hiveDb);
-    LOG.info("Preparing to return {},{}->{}",
-            dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
-    long executorId = conf.getLong(Constants.SCHEDULED_QUERY_EXECUTIONID, 0L);
-    dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot, executorId);
-    dmd.write(true);
+  }
 
-    work.setDirCopyIterator(extTableCopyWorks.iterator());
-    work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
-    return bootDumpBeginReplId;
+  private FileList createTableFileList(Path dumpRoot, String fileName, int cacheSize) {
+    Path backingFile = new Path(dumpRoot, fileName);
+    return new FileList(backingFile, cacheSize, conf);
   }
 
+
   private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
     try {
       return dumpMetaData.getEventFrom() != null;
@@ -927,9 +929,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return dbRoot;
   }
 
-  List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata,
+  void dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata,
                                        Path dbRootData, long lastReplId, Hive hiveDb,
-                                       HiveWrapper.Tuple<Table> tuple) throws Exception {
+                                       HiveWrapper.Tuple<Table> tuple, FileList managedTbleList, boolean dataCopyAtLoad)
+          throws Exception {
     LOG.info("Bootstrap Dump for table " + tblName);
     TableSpec tableSpec = new TableSpec(tuple.object);
     TableExport.Paths exportPaths =
@@ -947,16 +950,18 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
     MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
     tuple.replicationSpec.setRepl(true);
-    List<ManagedTableCopyPath> managedTableCopyPaths = new TableExport(
-            exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser,
-        conf, mmCtx).write(false);
+    new TableExport(exportPaths, tableSpec, tuple.replicationSpec, hiveDb, distCpDoAsUser, conf, mmCtx).write(
+            false, managedTbleList, dataCopyAtLoad);
     work.getMetricCollector().reportStageProgress(getName(), ReplUtils.MetricName.TABLES.name(), 1);
     replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
+  }
+
+  private boolean dataCopyRequired(TableSpec tableSpec) {
     if (tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE)
             || Utils.shouldDumpMetaDataOnly(conf)) {
-      return Collections.emptyList();
+      return false;
     }
-    return managedTableCopyPaths;
+    return true;
   }
 
   private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index bccaf94..21caf44 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -24,9 +24,11 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.ReplCopyTask;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.slf4j.Logger;
@@ -53,12 +55,13 @@ public class ReplDumpWork implements Serializable {
   private static boolean testInjectDumpDirAutoIncrement = false;
   static boolean testDeletePreviousDumpMetaPath = false;
   private Integer maxEventLimit;
-  private transient Iterator<DirCopyWork> dirCopyIterator;
-  private transient Iterator<EximUtil.ManagedTableCopyPath> managedTableCopyPathIterator;
+  private transient Iterator<String> externalTblCopyPathIterator;
+  private transient Iterator<String> managedTblCopyPathIterator;
   private Path currentDumpPath;
   private List<String> resultValues;
   private boolean shouldOverwrite;
   private transient ReplicationMetricCollector metricCollector;
+  private ReplicationSpec replicationSpec;
 
   public static void injectNextDumpDirForTest(String dumpDir) {
     injectNextDumpDirForTest(dumpDir, false);
@@ -130,22 +133,22 @@ public class ReplDumpWork implements Serializable {
     }
   }
 
-  public void setDirCopyIterator(Iterator<DirCopyWork> dirCopyIterator) {
-    if (this.dirCopyIterator != null) {
-      throw new IllegalStateException("Dir Copy iterator has already been initialized");
+  public void setExternalTblCopyPathIterator(Iterator<String> externalTblCopyPathIterator) {
+    if (this.externalTblCopyPathIterator != null) {
+      throw new IllegalStateException("External table copy path iterator has already been initialized");
     }
-    this.dirCopyIterator = dirCopyIterator;
+    this.externalTblCopyPathIterator = externalTblCopyPathIterator;
   }
 
-  public void setManagedTableCopyPathIterator(Iterator<EximUtil.ManagedTableCopyPath> managedTableCopyPathIterator) {
-    if (this.managedTableCopyPathIterator != null) {
+  public void setManagedTableCopyPathIterator(Iterator<String> managedTblCopyPathIterator) {
+    if (this.managedTblCopyPathIterator != null) {
       throw new IllegalStateException("Managed table copy path iterator has already been initialized");
     }
-    this.managedTableCopyPathIterator = managedTableCopyPathIterator;
+    this.managedTblCopyPathIterator = managedTblCopyPathIterator;
   }
 
   public boolean tableDataCopyIteratorsInitialized() {
-    return dirCopyIterator != null || managedTableCopyPathIterator != null;
+    return externalTblCopyPathIterator != null || managedTblCopyPathIterator != null;
   }
 
   public Path getCurrentDumpPath() {
@@ -169,8 +172,9 @@ public class ReplDumpWork implements Serializable {
       return Collections.emptyList();
     }
     List<Task<?>> tasks = new ArrayList<>();
-    while (dirCopyIterator.hasNext() && tracker.canAddMoreTasks()) {
-      DirCopyWork dirCopyWork = dirCopyIterator.next();
+    while (externalTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
+      DirCopyWork dirCopyWork = new DirCopyWork();
+      dirCopyWork.loadFromString(externalTblCopyPathIterator.next());
       Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
       tasks.add(task);
       tracker.addTask(task);
@@ -184,8 +188,12 @@ public class ReplDumpWork implements Serializable {
       return Collections.emptyList();
     }
     List<Task<?>> tasks = new ArrayList<>();
-    while (managedTableCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
-      EximUtil.ManagedTableCopyPath managedTableCopyPath = managedTableCopyPathIterator.next();
+    while (managedTblCopyPathIterator.hasNext() && tracker.canAddMoreTasks()) {
+      ReplicationSpec replSpec = new ReplicationSpec();
+      replSpec.setIsReplace(true);
+      replSpec.setInReplicationScope(true);
+      EximUtil.ManagedTableCopyPath managedTableCopyPath = new EximUtil.ManagedTableCopyPath(replSpec);
+      managedTableCopyPath.loadFromString(managedTblCopyPathIterator.next());
       Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
               managedTableCopyPath.getReplicationSpec(), managedTableCopyPath.getSrcPath(),
               managedTableCopyPath.getTargetPath(), conf, false, shouldOverwrite);
@@ -207,4 +215,12 @@ public class ReplDumpWork implements Serializable {
   public void setMetricCollector(ReplicationMetricCollector metricCollector) {
     this.metricCollector = metricCollector;
   }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
+
+  public void setReplicationSpec(ReplicationSpec replicationSpec) {
+    this.replicationSpec = replicationSpec;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
index a47a728..b6e0858 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplExternalTables.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.utils.StringUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -45,7 +46,6 @@ import java.net.URI;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 
@@ -135,10 +135,10 @@ public final class ReplExternalTables {
      * table if the table is partitioned and the partition location is outside the table.
      * It returns list of all the external table locations.
      */
-    List<Path> dataLocationDump(Table table) throws InterruptedException, IOException, HiveException {
-      List<Path> extTableLocations = new LinkedList<>();
+    void dataLocationDump(Table table, FileList fileList, HiveConf conf)
+            throws InterruptedException, IOException, HiveException {
       if (!shouldWrite()) {
-        return extTableLocations;
+        return;
       }
       if (!TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
         throw new IllegalArgumentException(
@@ -148,7 +148,7 @@ public final class ReplExternalTables {
       Path fullyQualifiedDataLocation =
           PathBuilder.fullyQualifiedHDFSUri(table.getDataLocation(), FileSystem.get(hiveConf));
       write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
-      extTableLocations.add(fullyQualifiedDataLocation);
+      dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf);
       if (table.isPartitioned()) {
         List<Partition> partitions;
         try {
@@ -157,7 +157,7 @@ public final class ReplExternalTables {
           if (e.getCause() instanceof NoSuchObjectException) {
             // If table is dropped when dump in progress, just skip partitions data location dump
             LOG.debug(e.getMessage());
-            return extTableLocations;
+            return;
           }
           throw e;
         }
@@ -170,11 +170,17 @@ public final class ReplExternalTables {
             fullyQualifiedDataLocation = PathBuilder
                 .fullyQualifiedHDFSUri(partition.getDataLocation(), FileSystem.get(hiveConf));
             write(lineFor(table.getTableName(), fullyQualifiedDataLocation, hiveConf));
-            extTableLocations.add(fullyQualifiedDataLocation);
+            dirLocationToCopy(fileList, fullyQualifiedDataLocation, conf);
           }
         }
       }
-      return extTableLocations;
+    }
+
+    private void dirLocationToCopy(FileList fileList, Path sourcePath, HiveConf conf)
+            throws HiveException {
+      Path basePath = getExternalTableBaseDir(conf);
+      Path targetPath = externalTableDataPath(conf, basePath, sourcePath);
+      fileList.add(new DirCopyWork(sourcePath, targetPath).convertToString());
     }
 
     private static String lineFor(String tableName, Path dataLoc, HiveConf hiveConf)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 7ebc7cf..87543f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -48,12 +48,14 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.HiveTableName;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
@@ -182,6 +184,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     Context loadContext = new Context(work.dumpDirectory, conf, getHive(),
         work.sessionStateLineageState, context);
     TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
+    addLazyDataCopyTask(loadTaskTracker);
     /*
         for now for simplicity we are doing just one directory ( one database ), come back to use
         of multiple databases once we have the basic flow to chain creating of tasks in place for
@@ -330,6 +333,22 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     return 0;
   }
 
+  private void addLazyDataCopyTask(TaskTracker loadTaskTracker) throws IOException {
+    boolean dataCopyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    if (dataCopyAtLoad) {
+      if (work.getExternalTableDataCopyItr() == null) {
+        Path extTableBackingFile = new Path(work.dumpDirectory, EximUtil.FILE_LIST_EXTERNAL);
+        try(FileList fileList = new FileList(extTableBackingFile, 0, conf)) {
+          work.setExternalTableDataCopyItr(fileList);
+        }
+      }
+      if (childTasks == null) {
+        childTasks = new ArrayList<>();
+      }
+      childTasks.addAll(work.externalTableCopyTasks(loadTaskTracker, conf));
+    }
+  }
+
   private TaskTracker addLoadPartitionTasks(Context loadContext, BootstrapEvent next, TaskTracker dbTracker,
                                             BootstrapEventsIterator iterator, Scope scope, TaskTracker loadTaskTracker,
                                             TaskTracker tableTracker) throws Exception {
@@ -541,6 +560,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
     List<Task<?>> childTasks = new ArrayList<>();
     int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
     TaskTracker tracker = new TaskTracker(maxTasks);
+    addLazyDataCopyTask(tracker);
     childTasks.add(builder.build(context, getHive(), LOG, tracker));
     // If there are no more events to be applied, add a task to update the last.repl.id of the
     // target database to the event id of the last event considered by the dump. Next
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 43bf365..4050235 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -22,26 +22,35 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.DatabaseEvent;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.BootstrapEventsIterator;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.ConstraintEventsIterator;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
 
 @Explain(displayName = "Replication Load Operator", explainLevels = { Explain.Level.USER,
     Explain.Level.DEFAULT,
     Explain.Level.EXTENDED })
 public class ReplLoadWork implements Serializable {
+  private static final Logger LOG = LoggerFactory.getLogger(ReplLoadWork.class);
   final String dbNameToLoadIn;
   final ReplScope currentReplScope;
   final String dumpDirectory;
@@ -56,6 +65,7 @@ public class ReplLoadWork implements Serializable {
   private final transient BootstrapEventsIterator bootstrapIterator;
   private transient IncrementalLoadTasksBuilder incrementalLoadTasksBuilder;
   private transient Task<?> rootTask;
+  private Iterator<String> externalTableDataCopyItr;
 
   /*
   these are sessionState objects that are copied over to work to allow for parallel execution.
@@ -176,4 +186,29 @@ public class ReplLoadWork implements Serializable {
   public Long getDumpExecutionId() {
     return dumpExecutionId;
   }
+
+  public List<Task<?>> externalTableCopyTasks(TaskTracker tracker, HiveConf conf) {
+    if (conf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_SKIP_IMMUTABLE_DATA_COPY)) {
+      return Collections.emptyList();
+    }
+    List<Task<?>> tasks = new ArrayList<>();
+    while (externalTableDataCopyItr.hasNext() && tracker.canAddMoreTasks()) {
+      DirCopyWork dirCopyWork = new DirCopyWork();
+      dirCopyWork.loadFromString(externalTableDataCopyItr.next());
+      Task<DirCopyWork> task = TaskFactory.get(dirCopyWork, conf);
+      tasks.add(task);
+      tracker.addTask(task);
+      LOG.debug("Added task for {}", dirCopyWork);
+    }
+    LOG.info("Added total {} tasks for external table locations copy.", tasks.size());
+    return tasks;
+  }
+
+  public Iterator<String> getExternalTableDataCopyItr() {
+    return externalTableDataCopyItr;
+  }
+
+  public void setExternalTableDataCopyItr(Iterator<String> externalTableDataCopyItr) {
+    this.externalTableDataCopyItr = externalTableDataCopyItr;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 025457b..c4cfcf9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -285,12 +285,12 @@ public class LoadPartitions {
               : LoadFileType.OVERWRITE_EXISTING);
       stagingDir = PathUtils.getExternalTmpPath(replicaWarehousePartitionLocation, context.pathInfo);
     }
-
+    boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
         event.replicationSpec(),
         new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)),
         stagingDir,
-        context.hiveConf, false, false
+        context.hiveConf, copyAtLoad, false
     );
 
     Task<?> movePartitionTask = null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 9e236fd..35ea777 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -56,7 +57,6 @@ import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashSet;
@@ -298,8 +298,9 @@ public class LoadTable {
             + table.getCompleteName() + " with source location: "
             + dataPath.toString() + " and target location " + tgtPath.toString());
 
+    boolean copyAtLoad = context.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf,
-            false, false);
+            copyAtLoad, false);
 
     MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
     if (AcidUtils.isTransactionalTable(table)) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
new file mode 100644
index 0000000..b834521
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileList.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+
+/**
+ * A file backed list of Strings which is in-memory till the threshold.
+ */
+public class FileList implements AutoCloseable, Iterator<String> {
+  private static final Logger LOG = LoggerFactory.getLogger(FileList.class);
+  private static int fileListStreamerID = 0;
+  private static final String  FILE_LIST_STREAMER_PREFIX = "file-list-streamer-";
+
+  private LinkedBlockingQueue<String> cache;
+  private volatile boolean thresholdHit = false;
+  private int thresholdPoint;
+  private float thresholdFactor = 0.9f;
+  private Path backingFile;
+  private FileListStreamer fileListStreamer;
+  private String nextElement;
+  private boolean noMoreElement;
+  private HiveConf conf;
+  private BufferedReader backingFileReader;
+
+
+  public FileList(Path backingFile, int cacheSize, HiveConf conf) {
+    this.backingFile = backingFile;
+    this.conf = conf;
+    if (cacheSize > 0) {
+      // Cache size must be > 0 for this list to be used for the write operation.
+      this.cache = new LinkedBlockingQueue<>(cacheSize);
+      fileListStreamer = new FileListStreamer(cache, backingFile, conf);
+      thresholdPoint = getThreshold(cacheSize);
+      LOG.debug("File list backed by {} can be used for write operation.", backingFile);
+    } else {
+      thresholdHit = true;
+    }
+  }
+
+  @VisibleForTesting
+  FileList(Path backingFile, FileListStreamer fileListStreamer, LinkedBlockingQueue<String> cache, HiveConf conf) {
+    this.backingFile = backingFile;
+    this.fileListStreamer = fileListStreamer;
+    this.cache = cache;
+    this.conf = conf;
+    thresholdPoint = getThreshold(cache.remainingCapacity());
+  }
+
+  /**
+   * Only add operation is safe for concurrent operations.
+   */
+  public void add(String entry) throws SemanticException {
+    if (thresholdHit && !fileListStreamer.isAlive()) {
+      throw new SemanticException("List is not getting saved anymore to file " + backingFile.toString());
+    }
+    try {
+      cache.put(entry);
+    } catch (InterruptedException e) {
+      throw new SemanticException(e);
+    }
+    if (!thresholdHit && cache.size() >= thresholdPoint) {
+      initStoreToFile(cache.size());
+    }
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!thresholdHit) {
+      return (cache != null && !cache.isEmpty());
+    }
+    if (nextElement != null) {
+      return true;
+    }
+    if (noMoreElement) {
+      return false;
+    }
+    nextElement = readNextLine();
+    if (nextElement == null) {
+      noMoreElement = true;
+    }
+    return !noMoreElement;
+  }
+
+  @Override
+  public String next() {
+    if (!hasNext()) {
+      throw new NoSuchElementException("No more element in the list backed by " + backingFile);
+    }
+    String retVal = nextElement;
+    nextElement = null;
+    return thresholdHit ? retVal : cache.poll();
+  }
+
+  private synchronized void initStoreToFile(int cacheSize) {
+    if (!thresholdHit) {
+      fileListStreamer.setName(getNextID());
+      fileListStreamer.setDaemon(true);
+      fileListStreamer.start();
+      thresholdHit = true;
+      LOG.info("Started streaming the list elements to file: {}, cache size {}", backingFile, cacheSize);
+    }
+  }
+
+  private String readNextLine() {
+    String nextElement = null;
+    try {
+      if (backingFileReader == null) {
+        FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+        if (fs.exists(backingFile)) {
+          backingFileReader = new BufferedReader(new InputStreamReader(fs.open(backingFile)));
+        }
+      }
+      nextElement = (backingFileReader == null) ? null : backingFileReader.readLine();
+    } catch (IOException e) {
+      LOG.error("Unable to read list from backing file " + backingFile, e);
+    }
+    return nextElement;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (thresholdHit && fileListStreamer != null) {
+      fileListStreamer.close();
+    }
+    if (backingFileReader != null) {
+      backingFileReader.close();
+    }
+    LOG.info("Completed close for File List backed by:{}, thresholdHit:{} ", backingFile, thresholdHit);
+  }
+
+  private static String getNextID() {
+    if (Integer.MAX_VALUE == fileListStreamerID) {
+      //reset the counter
+      fileListStreamerID = 0;
+    }
+    fileListStreamerID++;
+    return FILE_LIST_STREAMER_PREFIX  + fileListStreamerID;
+  }
+
+  public int getThreshold(int cacheSize) {
+    boolean copyAtLoad = conf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    return copyAtLoad ? 0 : (int)(cacheSize * thresholdFactor);
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
new file mode 100644
index 0000000..36fd055
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/FileListStreamer.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Streams the entries from a cache to the backed file.
+ */
+
+public class FileListStreamer extends Thread implements Closeable {
+  private static final Logger LOG = LoggerFactory.getLogger(FileListStreamer.class);
+  private static final long TIMEOUT_IN_SECS = 5L;
+  private volatile boolean signalTostop;
+  private LinkedBlockingQueue<String> cache;
+  private Path backingFile;
+  private Configuration conf;
+  private BufferedWriter backingFileWriter;
+  private volatile boolean valid = true;
+  private final Object COMPLETION_LOCK = new Object();
+  private volatile boolean completed = false;
+
+  public FileListStreamer(LinkedBlockingQueue<String> cache, Path backingFile, Configuration conf) {
+    this.cache = cache;
+    this.backingFile = backingFile;
+    this.conf = conf;
+  }
+
+  BufferedWriter lazyInitWriter() throws IOException {
+    FileSystem fs = FileSystem.get(backingFile.toUri(), conf);
+    BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(fs.create(backingFile)));
+    LOG.info("Initialized a file based store to save a list at: {}", backingFile);
+    return writer;
+  }
+
+  public boolean isValid() {
+    return valid;
+  }
+
+  // Blocks for remaining entries to be flushed to file.
+  @Override
+  public void close() throws IOException {
+    signalTostop = true;
+    synchronized (COMPLETION_LOCK) {
+      while (motiveToWait()) {
+        try {
+          COMPLETION_LOCK.wait(TimeUnit.SECONDS.toMillis(TIMEOUT_IN_SECS));
+        } catch (InterruptedException e) {
+          // no-op
+        }
+      }
+    }
+    if (!isValid()) {
+      throw new IOException("File list is not in a valid state:" + backingFile);
+    }
+  }
+
+  private boolean motiveToWait() {
+    return !completed && valid;
+  }
+
+  @Override
+  public void run() {
+    try {
+      backingFileWriter = lazyInitWriter();
+    } catch (IOException e) {
+      valid = false;
+      throw new RuntimeException("Unable to initialize the file list streamer", e);
+    }
+    boolean exThrown = false;
+    while (!exThrown && (!signalTostop || !cache.isEmpty())) {
+      try {
+        String nextEntry = cache.poll(TIMEOUT_IN_SECS, TimeUnit.SECONDS);
+        if (nextEntry != null) {
+          backingFileWriter.write(nextEntry);
+          backingFileWriter.newLine();
+          LOG.debug("Writing entry {} to file list backed by {}", nextEntry, backingFile);
+        }
+      } catch (Exception iEx) {
+        if (!(iEx instanceof InterruptedException)) {
+          // not draining any more. Inform the producer to avoid OOM.
+          valid = false;
+          LOG.error("Exception while saving the list to file " + backingFile, iEx);
+          exThrown = true;
+        }
+      }
+    }
+    try{
+      closeBackingFile();
+      completed = true;
+    } finally {
+      synchronized (COMPLETION_LOCK) {
+        COMPLETION_LOCK.notify();
+      }
+    }
+    LOG.info("Completed the file list streamer backed by: {}", backingFile);
+  }
+
+  private void closeBackingFile() {
+    try {
+      backingFileWriter.close();
+      LOG.debug("Closed the file list backing file: {}", backingFile);
+    } catch (IOException e) {
+      LOG.error("Exception while closing the file list backing file", e);
+      valid = false;
+    }
+  }
+
+  @VisibleForTesting
+  boolean isInitialized() {
+    return backingFileWriter != null;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index eaa6690..d531f5b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hive.ql.exec.repl.util;
 
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.TableName;
 import org.apache.hadoop.hive.common.repl.ReplConst;
@@ -56,7 +55,6 @@ import org.apache.hadoop.hive.ql.parse.repl.load.UpdatedMetaDataTracker;
 import org.apache.thrift.TException;
 
 import java.io.IOException;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java
new file mode 100644
index 0000000..8c8120b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/StringConvertibleObject.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+public interface StringConvertibleObject {
+
+  public String convertToString ();
+
+  public void loadFromString (String objectInStr);
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index fb6a38c..b35f7ab 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.exec.repl.util.StringConvertibleObject;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -74,6 +75,8 @@ public class EximUtil {
 
   public static final String METADATA_NAME = "_metadata";
   public static final String FILES_NAME = "_files";
+  public static final String FILE_LIST = "_file_list";
+  public static final String FILE_LIST_EXTERNAL = "_file_list_external";
   public static final String DATA_PATH_NAME = "data";
   public static final String METADATA_PATH_NAME = "metadata";
 
@@ -161,12 +164,17 @@ public class EximUtil {
   /**
    * Wrapper class for mapping source and target path for copying managed table data.
    */
-  public static class ManagedTableCopyPath {
+  public static class ManagedTableCopyPath implements StringConvertibleObject {
+    private static final String URI_SEPARATOR = "#";
     private ReplicationSpec replicationSpec;
     private static boolean nullSrcPathForTest = false;
     private Path srcPath;
     private Path tgtPath;
 
+    public ManagedTableCopyPath(ReplicationSpec replicationSpec) {
+      this.replicationSpec = replicationSpec;
+    }
+
     public ManagedTableCopyPath(ReplicationSpec replicationSpec, Path srcPath, Path tgtPath) {
       this.replicationSpec = replicationSpec;
       if (srcPath == null) {
@@ -215,6 +223,26 @@ public class EximUtil {
         nullSrcPathForTest = aNullSrcPath;
       }
     }
+
+    @Override
+    public String convertToString() {
+      StringBuilder objInStr = new StringBuilder();
+      objInStr.append(srcPath)
+              .append(URI_SEPARATOR)
+              .append(tgtPath);
+      return objInStr.toString();
+    }
+
+    @Override
+    public void loadFromString(String objectInStr) {
+      String paths[] = objectInStr.split(URI_SEPARATOR);
+      this.srcPath = new Path(paths[0]);
+      this.tgtPath = new Path(paths[1]);
+    }
+
+    private String getEmptyOrString(String str) {
+      return (str == null) ? "" : str;
+    }
   }
 
   private EximUtil() {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 9cd9137..55c8427 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -501,8 +501,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     Task<?> copyTask = null;
     if (replicationSpec.isInReplicationScope()) {
+      boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
       copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, destPath, x.getConf(),
-              isSkipTrash, needRecycle, copyToMigratedTxnTable, false);
+              isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad);
     } else {
       copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
     }
@@ -648,8 +649,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
       Task<?> copyTask = null;
       if (replicationSpec.isInReplicationScope()) {
+        boolean copyAtLoad = x.getConf().getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
         copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, new Path(srcLocation), destPath,
-                x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, false);
+                x.getConf(), isSkipTrash, needRecycle, copyToMigratedTxnTable, copyAtLoad);
       } else {
         copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 5c8d0ed..8675ace 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 
 import javax.annotation.Nullable;
-import java.text.Collator;
 import java.util.Map;
 
 /**
@@ -54,6 +53,10 @@ public class ReplicationSpec {
   private boolean isRepl = false;
   private boolean isMetadataOnlyForExternalTables = false;
 
+  public void setInReplicationScope(boolean inReplicationScope) {
+    isInReplicationScope = inReplicationScope;
+  }
+
   // Key definitions related to replication.
   public enum KEY {
     REPL_SCOPE("repl.scope"),
@@ -122,7 +125,7 @@ public class ReplicationSpec {
   public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
                          String eventReplicationState, String currentReplicationState,
                          boolean isNoop, boolean isReplace) {
-    this.isInReplicationScope = isInReplicationScope;
+    this.setInReplicationScope(isInReplicationScope);
     this.isMetadataOnly = isMetadataOnly;
     this.eventId = eventReplicationState;
     this.currStateId = currentReplicationState;
@@ -133,15 +136,15 @@ public class ReplicationSpec {
 
   public ReplicationSpec(Function<String, String> keyFetcher) {
     String scope = keyFetcher.apply(ReplicationSpec.KEY.REPL_SCOPE.toString());
-    this.isInReplicationScope = false;
+    this.setInReplicationScope(false);
     this.isMetadataOnly = false;
     this.specType = Type.DEFAULT;
     if (scope != null) {
       if (scope.equalsIgnoreCase("metadata")) {
         this.isMetadataOnly = true;
-        this.isInReplicationScope = true;
+        this.setInReplicationScope(true);
       } else if (scope.equalsIgnoreCase("all")) {
-        this.isInReplicationScope = true;
+        this.setInReplicationScope(true);
       }
     }
     this.eventId = keyFetcher.apply(ReplicationSpec.KEY.EVENT_ID.toString());
@@ -227,7 +230,7 @@ public class ReplicationSpec {
 
   private void init(ASTNode node){
     // -> ^(TOK_REPLICATION $replId $isMetadataOnly)
-    isInReplicationScope = true;
+    setInReplicationScope(true);
     eventId = PlanUtils.stripQuotes(node.getChild(0).getText());
     if ((node.getChildCount() > 1)
             && node.getChild(1).getText().toLowerCase().equals("metadata")) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index 3de5832..65d4fbf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse.repl.dump;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
@@ -74,7 +75,8 @@ class PartitionExport {
     this.callersSession = SessionState.get();
   }
 
-  List<ManagedTableCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask)
+  List<ManagedTableCopyPath> write(final ReplicationSpec forReplicationSpec, boolean isExportTask,
+                                   FileList fileList, boolean dataCopyAtLoad)
           throws InterruptedException, HiveException {
     List<Future<?>> futures = new LinkedList<>();
     List<ManagedTableCopyPath> managedTableCopyPaths = new LinkedList<>();
@@ -114,16 +116,19 @@ class PartitionExport {
         String threadName = Thread.currentThread().getName();
         LOG.debug("Thread: {}, start partition dump {}", threadName, partitionName);
         try {
-          // Data Copy in case of ExportTask
+          // Data Copy in case of ExportTask or when dataCopyAtLoad is true
           List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(),
                   forReplicationSpec, hiveConf);
-          Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName);
+          Path rootDataDumpDir = isExportTask
+                  ? paths.partitionMetadataExportDir(partitionName) : paths.partitionDataExportDir(partitionName);
           new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx)
-                  .export(isExportTask);
+                  .export(isExportTask, dataCopyAtLoad);
           Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName);
           LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
-          return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(),
-                  dataDumpDir);
+          if (!(isExportTask || dataCopyAtLoad)) {
+            fileList.add(new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(),
+                    dataDumpDir).convertToString());
+          }
         } catch (Exception e) {
           throw new RuntimeException(e.getMessage(), e);
         }
@@ -132,11 +137,7 @@ class PartitionExport {
     consumer.shutdown();
     for (Future<?> future : futures) {
       try {
-        Object retVal =  future.get();
-        if (retVal != null) {
-          ManagedTableCopyPath managedTableCopyPath = (ManagedTableCopyPath) retVal;
-          managedTableCopyPaths.add(managedTableCopyPath);
-        }
+        future.get();
       } catch (Exception e) {
         LOG.error("failed", e.getCause());
         throw new HiveException(e.getCause().getMessage(), e.getCause());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index b11afe8..66cf494 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -96,8 +97,7 @@ public class TableExport {
     this.mmCtx = mmCtx;
   }
 
-  public List<ManagedTableCopyPath> write(boolean isExportTask) throws SemanticException {
-    List<ManagedTableCopyPath> managedTableCopyPaths = Collections.emptyList();
+  public void write(boolean isExportTask, FileList fileList, boolean dataCopyAtLoad) throws SemanticException {
     if (tableSpec == null) {
       writeMetaData(null);
     } else if (shouldExport()) {
@@ -105,12 +105,11 @@ public class TableExport {
       writeMetaData(withPartitions);
       if (!replicationSpec.isMetadataOnly()
               && !(replicationSpec.isRepl() && tableSpec.tableHandle.getTableType().equals(TableType.EXTERNAL_TABLE))) {
-        managedTableCopyPaths = writeData(withPartitions, isExportTask);
+        writeData(withPartitions, isExportTask, fileList, dataCopyAtLoad);
       }
     } else if (isExportTask) {
       throw new SemanticException(ErrorMsg.INCOMPATIBLE_SCHEMA.getMsg());
     }
-    return managedTableCopyPaths;
   }
 
   private PartitionIterable getPartitions() throws SemanticException {
@@ -161,26 +160,26 @@ public class TableExport {
     }
   }
 
-  private List<ManagedTableCopyPath> writeData(PartitionIterable partitions, boolean isExportTask)
+  private void writeData(PartitionIterable partitions, boolean isExportTask, FileList fileList, boolean dataCopyAtLoad)
           throws SemanticException {
-    List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
     try {
       if (tableSpec.tableHandle.isPartitioned()) {
         if (partitions == null) {
           throw new IllegalStateException("partitions cannot be null for partitionTable :"
               + tableSpec.getTableName().getTable());
         }
-        managedTableCopyPaths = new PartitionExport(
-                paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask);
+        new PartitionExport(paths, partitions, distCpDoAsUser, conf, mmCtx).write(replicationSpec, isExportTask,
+                fileList, dataCopyAtLoad);
       } else {
         List<Path> dataPathList = Utils.getDataPathList(tableSpec.tableHandle.getDataLocation(),
                 replicationSpec, conf);
-        managedTableCopyPaths.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(),
-                paths.dataExportDir()));
+        if (!(isExportTask || dataCopyAtLoad)) {
+          fileList.add(new ManagedTableCopyPath(replicationSpec, tableSpec.tableHandle.getDataLocation(),
+                  paths.dataExportDir()).convertToString());
+        }
         new FileOperations(dataPathList, paths.dataExportDir(), distCpDoAsUser, conf, mmCtx)
-                .export(isExportTask);
+                .export(isExportTask, (dataCopyAtLoad));
       }
-      return managedTableCopyPaths;
     } catch (Exception e) {
       throw new SemanticException(e.getMessage(), e);
     }
@@ -244,6 +243,10 @@ public class TableExport {
       return exportDir(new Path(metadataExportRootDir(), partitionName));
     }
 
+    Path partitionDataExportDir(String partitionName) throws SemanticException {
+      return exportDir(new Path(dataExportRootDir(), partitionName));
+    }
+
     /**
      * Access to the {@link #metadataExportRootDir} should only be done via this method
      * since the creation of the directory is delayed until we figure out if we want
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
index 82bf384..e758c8d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AbstractEventHandler.java
@@ -38,7 +38,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.security.auth.login.LoginException;
+import java.io.BufferedWriter;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -88,6 +90,23 @@ abstract class AbstractEventHandler<T extends EventMessage> implements EventHand
     return event.getEventId();
   }
 
+  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
+    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  protected void writeEncodedDumpFiles(Context withinContext, Iterable<String> files, Path dataPath)
+          throws IOException {
+    // encoded filename/checksum of files, write into _files
+    try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+      for (String file : files) {
+        fileListWriter.write(file);
+        fileListWriter.newLine();
+      }
+    }
+  }
+
   protected void writeFileEntry(Table table, Partition ptn, String file, Context withinContext)
           throws IOException, LoginException, MetaException, HiveFatalException {
     HiveConf hiveConf = withinContext.hiveConf;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
index 6462e17..3120c96 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AddPartitionHandler.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 
+import java.io.File;
 import java.util.Iterator;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
@@ -95,6 +96,7 @@ class AddPartitionHandler extends AbstractEventHandler {
         withinContext.replicationSpec,
         withinContext.hiveConf);
 
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
     Iterator<PartitionFiles> partitionFilesIter = apm.getPartitionFilesIter().iterator();
 
     // We expect one to one mapping between partitions and file iterators. For external table, this
@@ -103,9 +105,15 @@ class AddPartitionHandler extends AbstractEventHandler {
       for (Partition qlPtn : qlPtns) {
         Iterable<String> files = partitionFilesIter.next().getFiles();
         if (files != null) {
-          // encoded filename/checksum of files, write into _files
-          for (String file : files) {
-            writeFileEntry(qlMdTable, qlPtn, file, withinContext);
+          if (copyAtLoad) {
+            // encoded filename/checksum of files, write into _files
+            Path ptnDataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator
+                    + qlPtn.getName());
+            writeEncodedDumpFiles(withinContext, files, ptnDataPath);
+          } else {
+            for (String file : files) {
+              writeFileEntry(qlMdTable, qlPtn, file, withinContext);
+            }
           }
         }
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index 36369db..fcb919a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
 import javax.security.auth.login.LoginException;
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -56,11 +57,17 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
     return deserializer.getCommitTxnMessage(stringRepresentation);
   }
 
-  private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext)
+  private void writeDumpFiles(Table qlMdTable, Partition ptn, Iterable<String> files, Context withinContext,
+                              Path dataPath)
           throws IOException, LoginException, MetaException, HiveFatalException {
-    // encoded filename/checksum of files, write into _files
-    for (String file : files) {
-      writeFileEntry(qlMdTable, ptn, file, withinContext);
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+    if (copyAtLoad) {
+      // encoded filename/checksum of files, write into _files
+      writeEncodedDumpFiles(withinContext, files, dataPath);
+    } else {
+      for (String file : files) {
+        writeFileEntry(qlMdTable, ptn, file, withinContext);
+      }
     }
   }
 
@@ -81,10 +88,13 @@ class CommitTxnHandler extends AbstractEventHandler<CommitTxnMessage> {
             withinContext.hiveConf);
 
     if ((null == qlPtns) || qlPtns.isEmpty()) {
-      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext);
+      Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      writeDumpFiles(qlMdTable, null, fileListArray.get(0), withinContext, dataPath);
     } else {
       for (int idx = 0; idx < qlPtns.size(); idx++) {
-        writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext);
+        Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator
+                + qlPtns.get(idx).getName());
+        writeDumpFiles(qlMdTable, qlPtns.get(idx), fileListArray.get(idx), withinContext, dataPath);
       }
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
index c732b21..ed78bc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CreateTableHandler.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -26,10 +26,6 @@ import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 
-import java.io.BufferedWriter;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-
 class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
 
   CreateTableHandler(NotificationEvent event) {
@@ -79,23 +75,23 @@ class CreateTableHandler extends AbstractEventHandler<CreateTableMessage> {
         withinContext.replicationSpec,
         withinContext.hiveConf);
 
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
     Iterable<String> files = eventMessage.getFiles();
     if (files != null) {
-      // encoded filename/checksum of files, write into _files
-      for (String file : files) {
-        writeFileEntry(qlMdTable, null, file, withinContext);
+      if (copyAtLoad) {
+        // encoded filename/checksum of files, write into _files
+        Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+        writeEncodedDumpFiles(withinContext, files, dataPath);
+      } else {
+        for (String file : files) {
+          writeFileEntry(qlMdTable, null, file, withinContext);
+        }
       }
     }
 
     withinContext.createDmd(this).write();
   }
 
-  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
-    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
-    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
-    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
-  }
-
   @Override
   public DumpType dumpType() {
     return DumpType.EVENT_CREATE_TABLE;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 09662da..704b996 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
 
+import java.io.File;
 import java.util.Collections;
 import java.util.List;
 
@@ -76,6 +77,8 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> {
         withinContext.hiveConf);
     Iterable<String> files = eventMessage.getFiles();
 
+    boolean copyAtLoad = withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY);
+
     /*
       * Insert into/overwrite operation shall operate on one or more partitions or even partitions from multiple tables.
       * But, Insert event is generated for each partition to which the data is inserted.
@@ -83,9 +86,20 @@ class InsertHandler extends AbstractEventHandler<InsertMessage> {
      */
     Partition ptn = (null == qlPtns || qlPtns.isEmpty()) ? null : qlPtns.get(0);
     if (files != null) {
-      // encoded filename/checksum of files, write into _files
-      for (String file : files) {
-        writeFileEntry(qlMdTable, ptn, file, withinContext);
+      if (copyAtLoad) {
+        // encoded filename/checksum of files, write into _files
+        Path dataPath = null;
+        if ((null == qlPtns) || qlPtns.isEmpty()) {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+        } else {
+          dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME + File.separator
+                  + qlPtns.get(0).getName());
+        }
+        writeEncodedDumpFiles(withinContext, files, dataPath);
+      } else {
+        for (String file : files) {
+          writeFileEntry(qlMdTable, ptn, file, withinContext);
+        }
       }
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
index 5754bf2..80bfdfb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/io/FileOperations.java
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump.io;
 
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
@@ -28,14 +31,22 @@ import javax.security.auth.login.LoginException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.utils.Retry;
+import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.CopyUtils;
 import org.apache.hadoop.hive.ql.plan.ExportWork.MmContext;
+import org.apache.hadoop.hive.shims.Utils;
+import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,9 +79,11 @@ public class FileOperations {
     exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
   }
 
-  public void export(boolean isExportTask) throws Exception {
+  public void export(boolean isExportTask, boolean dataCopyAtLoad) throws Exception {
     if (isExportTask) {
       copyFiles();
+    } else if (dataCopyAtLoad) {
+      exportFilesAsList();
     } else {
       validateSrcPathListExists();
     }
@@ -165,4 +178,83 @@ public class FileOperations {
       throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
     }
   }
+
+  /**
+   * This needs the root data directory to which the data needs to be exported to.
+   * The data export here is a list of files either in table/partition that are written to the _files
+   * in the exportRootDataDir provided.
+   */
+   void exportFilesAsList() throws SemanticException, IOException, LoginException {
+    if (dataPathList.isEmpty()) {
+      return;
+    }
+    Retry<Void> retryable = new Retry<Void>(IOException.class) {
+      @Override
+      public Void execute() throws Exception {
+        try (BufferedWriter writer = writer()) {
+          for (Path dataPath : dataPathList) {
+            writeFilesList(listFilesInDir(dataPath), writer, AcidUtils.getAcidSubDir(dataPath));
+          }
+        } catch (IOException e) {
+          if (e instanceof FileNotFoundException) {
+            logger.error("exporting data files in dir : " + dataPathList + " to " + exportRootDataDir + " failed");
+            throw new FileNotFoundException(FILE_NOT_FOUND.format(e.getMessage()));
+          }
+          // in case of io error, reset the file system object
+          FileSystem.closeAllForUGI(Utils.getUGI());
+          dataFileSystem = dataPathList.get(0).getFileSystem(hiveConf);
+          exportFileSystem = exportRootDataDir.getFileSystem(hiveConf);
+          Path exportPath = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+          if (exportFileSystem.exists(exportPath)) {
+            exportFileSystem.delete(exportPath, true);
+          }
+          throw e;
+        }
+        return null;
+      }
+    };
+    try {
+      retryable.run();
+    } catch (Exception e) {
+      throw new SemanticException(e);
+    }
+  }
+
+  private void writeFilesList(FileStatus[] fileStatuses, BufferedWriter writer, String encodedSubDirs)
+          throws IOException {
+    for (FileStatus fileStatus : fileStatuses) {
+      if (fileStatus.isDirectory()) {
+        // Write files inside the sub-directory.
+        Path subDir = fileStatus.getPath();
+        writeFilesList(listFilesInDir(subDir), writer, encodedSubDir(encodedSubDirs, subDir));
+      } else {
+        writer.write(encodedUri(fileStatus, encodedSubDirs));
+        writer.newLine();
+      }
+    }
+  }
+
+  private BufferedWriter writer() throws IOException {
+    Path exportToFile = new Path(exportRootDataDir, EximUtil.FILES_NAME);
+    logger.debug("exporting data files in dir : " + dataPathList + " to " + exportToFile);
+    return new BufferedWriter(
+            new OutputStreamWriter(exportFileSystem.create(exportToFile))
+    );
+  }
+
+  private String encodedSubDir(String encodedParentDirs, Path subDir) {
+    if (null == encodedParentDirs) {
+      return subDir.getName();
+    } else {
+      return encodedParentDirs + Path.SEPARATOR + subDir.getName();
+    }
+  }
+
+  private String encodedUri(FileStatus fileStatus, String encodedSubDir)
+          throws IOException {
+    ReplChangeManager replChangeManager = ReplChangeManager.getInstance();
+    Path currentDataFilePath = fileStatus.getPath();
+    String checkSum = ReplChangeManager.checksumFor(currentDataFilePath, dataFileSystem);
+    return replChangeManager.encodeFileUri(currentDataFilePath.toString(), checkSum, encodedSubDir);
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 8454b9c..a6def15 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -21,9 +21,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.repl.ReplScope;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.repl.util.FileList;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.dump.HiveWrapper;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
 import org.apache.hadoop.hive.ql.parse.repl.metric.ReplicationMetricCollector;
@@ -121,8 +121,8 @@ public class TestReplDumpTask {
     when(queryState.getConf()).thenReturn(conf);
     when(conf.getLong("hive.repl.last.repl.id", -1L)).thenReturn(1L);
     when(conf.getBoolVar(HiveConf.ConfVars.REPL_INCLUDE_EXTERNAL_TABLES)).thenReturn(false);
-    when(HiveConf.getVar(conf,
-            HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h");
+    when(HiveConf.getVar(conf, HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT)).thenReturn("1h");
+    when(conf.getIntVar(HiveConf.ConfVars.REPL_FILE_LIST_CACHE_SIZE)).thenReturn(10000);
 
     whenNew(Writer.class).withAnyArguments().thenReturn(mock(Writer.class));
     whenNew(HiveWrapper.class).withAnyArguments().thenReturn(mock(HiveWrapper.class));
@@ -131,16 +131,15 @@ public class TestReplDumpTask {
       private int tableDumpCount = 0;
 
       @Override
-      List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList,
-                                                    Path dbRootMetadata, Path dbRootData,
-                                               long lastReplId, Hive hiveDb,
-                                               HiveWrapper.Tuple<Table> tuple)
+      void dumpTable(String dbName, String tblName, String validTxnList,
+                     Path dbRootMetadata, Path dbRootData,
+                     long lastReplId, Hive hiveDb,
+                     HiveWrapper.Tuple<Table> tuple, FileList managedTableDirFileList, boolean dataCopyAtLoad)
           throws Exception {
         tableDumpCount++;
         if (tableDumpCount > 1) {
           throw new TestException();
         }
-        return Collections.emptyList();
       }
     };
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
new file mode 100644
index 0000000..37ac5d6
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/util/TestFileList.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.repl.util;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedWriter;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Tests the File List implementation.
+ */
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LoggerFactory.class})
+public class TestFileList {
+
+  @Mock
+  private BufferedWriter bufferedWriter;
+
+
+  @Test
+  public void testNoStreaming() throws Exception {
+    Object tuple[] =  setupAndGetTuple(100, false);
+    FileList fileList = (FileList) tuple[0];
+    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    assertFalse(isStreamingToFile(fileListStreamer));
+  }
+
+  @Test
+  public void testAlwaysStreaming() throws Exception {
+    Object tuple[] =  setupAndGetTuple(100, true);
+    FileList fileList = (FileList) tuple[0];
+    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
+    assertFalse(fileListStreamer.isInitialized());
+    fileList.add("Entry1");
+    waitForStreamingInitialization(fileListStreamer);
+    assertTrue(isStreamingToFile(fileListStreamer));
+    fileList.close();
+    waitForStreamingClosure(fileListStreamer);
+  }
+
+  @Test
+  public void testStreaminOnCacheHit() throws Exception {
+    Object tuple[] =  setupAndGetTuple(5, false);
+    FileList fileList = (FileList) tuple[0];
+    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
+    fileList.add("Entry1");
+    fileList.add("Entry2");
+    fileList.add("Entry3");
+    Thread.sleep(5000L);
+    assertFalse(fileListStreamer.isInitialized());
+    fileList.add("Entry4");
+    fileList.add("Entry5");
+    waitForStreamingInitialization(fileListStreamer);
+    fileList.close();
+    waitForStreamingClosure(fileListStreamer);
+  }
+
+  @Test
+  public void testConcurrentAdd() throws Exception {
+    Object tuple[] =  setupAndGetTuple(100, false);
+    FileList fileList = (FileList) tuple[0];
+    FileListStreamer fileListStreamer = (FileListStreamer) tuple[1];
+    int numOfEntries = 1000;
+    int numOfThreads = 10;
+    ExecutorService executorService = Executors.newFixedThreadPool(numOfThreads);
+
+    for (int i=1; i<=numOfEntries; i++) {
+      executorService.submit(() -> {
+        try {
+          fileList.add("someEntry");
+        } catch (SemanticException e) {
+          throw new RuntimeException("Unbale to add to file list.");
+        }
+      });
+    }
+    executorService.awaitTermination(1, TimeUnit.MINUTES);
+    waitForStreamingInitialization(fileListStreamer);
+    fileList.close();
+    waitForStreamingClosure(fileListStreamer);
+    ArgumentCaptor<String> entryArgs = ArgumentCaptor.forClass(String.class);
+    Mockito.verify(bufferedWriter, Mockito.times(numOfEntries)).write(entryArgs.capture());
+  }
+
+  private void waitForStreamingInitialization(FileListStreamer fileListStreamer) throws InterruptedException {
+    long sleepTime = 1000L;
+    int iter = 0;
+    while (!fileListStreamer.isInitialized()) {
+      Thread.sleep(sleepTime);
+      iter++;
+      if (iter == 5) {
+        throw new IllegalStateException("File Streamer not initialized till 5s.");
+      }
+    }
+  }
+
+  private void waitForStreamingClosure(FileListStreamer fileListStreamer) throws InterruptedException {
+    long sleepTime = 1000L;
+    int iter = 0;
+    while (!isStreamingClosedProperly(fileListStreamer)) {
+      Thread.sleep(sleepTime);
+      iter++;
+      if (iter == 5) {
+        throw new IllegalStateException("File Streamer not getting closed till 5s.");
+      }
+    }
+  }
+
+  private Object[] setupAndGetTuple(int cacheSize, boolean lazyDataCopy) throws Exception {
+    HiveConf hiveConf = Mockito.mock(HiveConf.class);
+    Mockito.when(hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DATA_COPY_LAZY)).thenReturn(lazyDataCopy);
+    Path backingFile = new Path("/tmp/backingFile");
+    LinkedBlockingQueue<String> cache =  new LinkedBlockingQueue<>(cacheSize);
+    FileListStreamer fileListStreamer = Mockito.spy(new FileListStreamer(cache, backingFile, hiveConf));
+    FileList fileList = new FileList(backingFile, fileListStreamer, cache, hiveConf);
+    Mockito.doReturn(bufferedWriter).when(fileListStreamer).lazyInitWriter();
+    Object[] tuple  = new Object[] {fileList, fileListStreamer};
+    return tuple;
+  }
+
+  private boolean isStreamingToFile(FileListStreamer fileListStreamer) {
+    return fileListStreamer.isInitialized() && fileListStreamer.isAlive();
+  }
+
+  private boolean isStreamingClosedProperly(FileListStreamer fileListStreamer) {
+    return fileListStreamer.isInitialized() && !fileListStreamer.isAlive() && fileListStreamer.isValid();
+  }
+}
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 110b335..3af74ba 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -148,6 +148,13 @@ public class ReplChangeManager {
     return instance;
   }
 
+  public static synchronized ReplChangeManager getInstance() {
+    if (!inited) {
+      throw new IllegalStateException("Replication Change Manager is not initialized.");
+    }
+    return instance;
+  }
+
   private ReplChangeManager(Configuration conf) throws MetaException {
     try {
       if (!inited) {