You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/04/09 11:30:20 UTC

[hive] branch master updated: HIVE-23039: Checkpointing for repl dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha, Anishek Agarwal)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 997fb16  HIVE-23039: Checkpointing for repl dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha,Anishek Agarwal)
997fb16 is described below

commit 997fb162eb58d2422d36d6d3171c59b8c40754a7
Author: Aasha Medhi <aa...@gmail.com>
AuthorDate: Thu Apr 9 17:00:05 2020 +0530

    HIVE-23039: Checkpointing for repl dump bootstrap phase (Aasha Medhi, reviewed by Pravin Kumar Sinha,Anishek Agarwal)
---
 .../hive/ql/parse/TestReplicationScenarios.java    |  32 ++-
 .../parse/TestReplicationScenariosAcidTables.java  | 310 +++++++++++++++++++++
 .../TestReplicationScenariosAcrossInstances.java   |  11 +-
 .../TestReplicationScenariosExternalTables.java    |  56 ++--
 .../parse/TestTableLevelReplicationScenarios.java  |  13 +-
 .../org/apache/hadoop/hive/ql/exec/ExportTask.java |   2 +-
 .../hadoop/hive/ql/exec/repl/DirCopyWork.java      |   1 +
 .../apache/hadoop/hive/ql/exec/repl/ReplAck.java   |  35 +++
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     | 120 +++++---
 .../hadoop/hive/ql/exec/repl/ReplLoadTask.java     |   4 +-
 .../hadoop/hive/ql/exec/repl/ReplLoadWork.java     |   7 +-
 .../ql/exec/repl/bootstrap/events/TableEvent.java  |   2 +
 .../events/filesystem/BootstrapEventsIterator.java |   6 +-
 .../events/filesystem/DatabaseEventsIterator.java  |  22 +-
 .../events/filesystem/FSPartitionEvent.java        |   9 +-
 .../bootstrap/events/filesystem/FSTableEvent.java  |  20 +-
 .../repl/bootstrap/load/table/LoadPartitions.java  |  10 +-
 .../exec/repl/bootstrap/load/table/LoadTable.java  |   5 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  10 +-
 .../org/apache/hadoop/hive/ql/parse/EximUtil.java  |   1 +
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java |   7 +-
 .../hive/ql/parse/repl/dump/PartitionExport.java   |   6 +-
 .../hive/ql/parse/repl/dump/TableExport.java       |  98 +++++--
 .../hadoop/hive/ql/exec/repl/TestReplDumpTask.java |  11 +-
 24 files changed, 660 insertions(+), 138 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index efe9fff..c79d4c3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -98,6 +98,8 @@ import java.util.Map;
 
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 import static org.apache.hadoop.hive.metastore.Warehouse.DEFAULT_CATALOG_NAME;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -315,8 +317,8 @@ public class TestReplicationScenarios {
 
     FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
     Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
-    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)));
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
 
     verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptn_data, driverMirror);
     verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptn_data_1, driverMirror);
@@ -367,8 +369,8 @@ public class TestReplicationScenarios {
     advanceDumpDir();
     FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
     Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
-    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)));
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
 
     verifyRun("SELECT * from " + replicatedDbName + ".unptned", unptnData, driverMirror);
     verifyRun("SELECT a from " + replicatedDbName + ".ptned WHERE b=1", ptnData1, driverMirror);
@@ -452,7 +454,7 @@ public class TestReplicationScenarios {
 
     Path loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     //delete load ack to reload the same dump
-    loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+    loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true);
     loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
 
     run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver);
@@ -466,7 +468,7 @@ public class TestReplicationScenarios {
 
     loadPath = new Path(dump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     //delete load ack to reload the same dump
-    loadPath.getFileSystem(hconf).delete(new Path(loadPath, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+    loadPath.getFileSystem(hconf).delete(new Path(loadPath, LOAD_ACKNOWLEDGEMENT.toString()), true);
     loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
 
     run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver);
@@ -902,8 +904,8 @@ public class TestReplicationScenarios {
     Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDbName);
     FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(hconf);
     Path dumpPath = new Path(incrementalDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
-    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
-    assertTrue(fs.exists(new Path(dumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT)));
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertTrue(fs.exists(new Path(dumpPath, LOAD_ACKNOWLEDGEMENT.toString())));
 
     // VERIFY tables and partitions on destination for equivalence.
     verifyRun("SELECT * from " + replDbName + ".unptned_empty", empty, driverMirror);
@@ -1439,8 +1441,8 @@ public class TestReplicationScenarios {
     Path path = new Path(System.getProperty("test.warehouse.dir", ""));
     String tableRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "unptned";
     Path srcFileLocation = new Path(path, tableRelativeSrcPath + File.separator + unptnedFileName1);
-    String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator
-            + "unptned" + File.separator + EximUtil.DATA_PATH_NAME +File.separator + unptnedFileName1;
+    String tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + EximUtil.DATA_PATH_NAME
+            + File.separator + dbName.toLowerCase() + File.separator + "unptned" +File.separator + unptnedFileName1;
     Path tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath);
     //A file in table at src location should be copied to $dumplocation/hive/<db>/<table>/data/<unptned_fileName>
     verifyChecksum(srcFileLocation, tgtFileLocation, true);
@@ -1449,9 +1451,10 @@ public class TestReplicationScenarios {
 
     String partitionRelativeSrcPath = dbName.toLowerCase()+".db" + File.separator + "ptned" + File.separator + "b=1";
     srcFileLocation = new Path(path, partitionRelativeSrcPath + File.separator + ptnedFileName1);
-    tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase()
+    tgtFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + EximUtil.DATA_PATH_NAME
+            + File.separator + dbName.toLowerCase()
             + File.separator + "ptned" + File.separator + "b=1" + File.separator
-            + EximUtil.DATA_PATH_NAME +File.separator + ptnedFileName1;
+            + ptnedFileName1;
     tgtFileLocation = new Path(dump.dumpLocation, tgtFileRelativePath);
     //A partitioned file in table at src location should be copied to
     // $dumplocation/hive/<db>/<table>/<partition>/data/<unptned_fileName>
@@ -1723,7 +1726,8 @@ public class TestReplicationScenarios {
     Tuple incrementalDump = replDumpDb(dbName);
 
     //Remove the dump ack file, so that dump is treated as an invalid dump.
-    String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT;
+    String ackFileRelativePath = ReplUtils.REPL_HIVE_BASE_DIR + File.separator
+            + DUMP_ACKNOWLEDGEMENT.toString();
     Path dumpFinishedAckFilePath = new Path(incrementalDump.dumpLocation, ackFileRelativePath);
     Path tmpDumpFinishedAckFilePath = new Path(dumpFinishedAckFilePath.getParent(),
             "old_" + dumpFinishedAckFilePath.getName());
@@ -1809,7 +1813,7 @@ public class TestReplicationScenarios {
     FileSystem fs = FileSystem.get(fileToDelete.toUri(), hconf);
     fs.delete(fileToDelete, true);
     assertTrue(fs.exists(bootstrapDumpDir));
-    assertTrue(fs.exists(new Path(bootstrapDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT)));
+    assertTrue(fs.exists(new Path(bootstrapDumpDir, DUMP_ACKNOWLEDGEMENT.toString())));
 
     loadAndVerify(replDbName, dbName, incrDump.lastReplId);
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 1e25598..39722a0 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -32,10 +33,12 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.Behaviour
 import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.shims.Utils;
 
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -51,7 +54,11 @@ import java.util.Collections;
 import java.util.Map;
 
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.DUMP_ACKNOWLEDGEMENT;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables.
@@ -655,4 +662,307 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
       assertEquals("REPL LOAD * is not supported", e.getMessage());
     }
   }
+
+  @Test
+  public void testCheckPointingDataDumpFailure() throws Throwable {
+    //To force distcp copy
+    List<String> dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (11)")
+            .run("insert into t2 values (21)")
+            .dump(primaryDbName);
+
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME);
+    long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime();
+    Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+    Path dbDataPath = new Path(dataPath, primaryDbName.toLowerCase());
+    Path tablet1Path = new Path(dbDataPath, "t1");
+    Path tablet2Path = new Path(dbDataPath, "t2");
+    //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+    assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    FileStatus[] statuses = fs.listStatus(tablet2Path);
+    //Delete t2 data
+    fs.delete(statuses[0].getPath(), true);
+    long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime();
+    long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+    long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+    //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater
+    WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause);
+    assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime());
+    assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
+    assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+    assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime());
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "2", "3"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"11", "21"});
+  }
+
+  @Test
+  public void testCheckPointingDataDumpFailureRegularCopy() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (11)")
+            .run("insert into t2 values (21)")
+            .dump(primaryDbName);
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    Path metadataPath = new Path(dumpPath, EximUtil.METADATA_PATH_NAME);
+    long modifiedTimeMetadata = fs.getFileStatus(metadataPath).getModificationTime();
+    Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+    Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+    Path tablet1Path = new Path(dbPath, "t1");
+    Path tablet2Path = new Path(dbPath, "t2");
+    //Delete dump ack and t2 data, metadata should be rewritten, data should be same for t1 but rewritten for t2
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+    assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    FileStatus[] statuses = fs.listStatus(tablet2Path);
+    //Delete t2 data
+    fs.delete(statuses[0].getPath(), true);
+    long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime();
+    long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+    long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+    //Do another dump. It should only dump table t2. Modification time of table t1 should be same while t2 is greater
+    WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName);
+    assertEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    //File is copied again as we are using regular copy
+    assertTrue(modifiedTimeTable1 < fs.getFileStatus(tablet1Path).getModificationTime());
+    assertTrue(modifiedTimeTable1CopyFile < fs.listStatus(tablet1Path)[0].getModificationTime());
+    assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+    assertTrue(modifiedTimeMetadata < fs.getFileStatus(metadataPath).getModificationTime());
+    replica.load(replicatedDbName, primaryDbName)
+            .run("select * from " + replicatedDbName + ".t1")
+            .verifyResults(new String[] {"1", "2", "3"})
+            .run("select * from " + replicatedDbName + ".t2")
+            .verifyResults(new String[]{"11", "21"});
+  }
+
+  @Test
+  public void testCheckPointingWithSourceTableDataInserted() throws Throwable {
+    //To force distcp copy
+    List<String> dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (11)")
+            .run("insert into t2 values (21)")
+            .dump(primaryDbName);
+
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+    Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+    Path tablet1Path = new Path(dbPath, "t1");
+    Path tablet2Path = new Path(dbPath, "t2");
+    long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+    //Delete table 2 data
+    FileStatus[] statuses = fs.listStatus(tablet2Path);
+    //Delete t2 data
+    fs.delete(statuses[0].getPath(), true);
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+    long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+
+    //Do another dump. It should only dump table t2. Also insert new data in existing tables.
+    // New data should be there in target
+    primary.run("use " + primaryDbName)
+            .run("insert into t2 values (13)")
+            .run("insert into t2 values (24)")
+            .run("insert into t1 values (4)")
+            .dump(primaryDbName, dumpClause);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1", "2", "3", "4"})
+            .run("select * from t2")
+            .verifyResults(new String[]{"11", "21", "13", "24"});
+    assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
+    assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+  }
+
+  @Test
+  public void testCheckPointingWithNewTablesAdded() throws Throwable {
+    //To force distcp copy
+    List<String> dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (11)")
+            .run("insert into t2 values (21)")
+            .dump(primaryDbName);
+
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+    Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+    Path tablet1Path = new Path(dbPath, "t1");
+    Path tablet2Path = new Path(dbPath, "t2");
+    long modifiedTimeTable1 = fs.getFileStatus(tablet1Path).getModificationTime();
+    long modifiedTimeTable2 = fs.getFileStatus(tablet2Path).getModificationTime();
+    //Delete table 2 data
+    FileStatus[] statuses = fs.listStatus(tablet2Path);
+    fs.delete(statuses[0].getPath(), true);
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+    long modifiedTimeTable1CopyFile = fs.listStatus(tablet1Path)[0].getModificationTime();
+
+    // Do another dump. It should only dump table t2 and next table.
+    // Also insert new tables. New tables will be there in target
+    primary.run("use " + primaryDbName)
+            .run("insert into t2 values (13)")
+            .run("insert into t2 values (24)")
+            .run("create table t3(a string) STORED AS TEXTFILE")
+            .run("insert into t3 values (1)")
+            .run("insert into t3 values (2)")
+            .run("insert into t3 values (3)")
+            .dump(primaryDbName, dumpClause);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("select * from t1")
+            .verifyResults(new String[]{"1", "2", "3"})
+            .run("select * from t2")
+            .verifyResults(new String[]{"11", "21", "13", "24"})
+            .run("show tables")
+            .verifyResults(new String[]{"t1", "t2", "t3"})
+            .run("select * from t3")
+            .verifyResults(new String[]{"1", "2", "3"});
+    assertEquals(modifiedTimeTable1, fs.getFileStatus(tablet1Path).getModificationTime());
+    assertEquals(modifiedTimeTable1CopyFile, fs.listStatus(tablet1Path)[0].getModificationTime());
+    assertTrue(modifiedTimeTable2 < fs.getFileStatus(tablet2Path).getModificationTime());
+  }
+
+  @Test
+  public void testCheckPointingWithSourceTableDeleted() throws Throwable {
+    //To force distcp copy
+    List<String> dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (11)")
+            .run("insert into t2 values (21)")
+            .dump(primaryDbName);
+
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+
+
+    //Delete dump ack and t2 data, Also drop table. New data will be there in target
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+    assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    Path dataPath = new Path(dumpPath, EximUtil.DATA_PATH_NAME);
+    Path dbPath = new Path(dataPath, primaryDbName.toLowerCase());
+    Path tablet2Path = new Path(dbPath, "t2");
+    FileStatus[] statuses = fs.listStatus(tablet2Path);
+    //Delete t2 data.
+    fs.delete(statuses[0].getPath(), true);
+    //Drop table t1. Target shouldn't have t1 table as metadata dump is rewritten
+    primary.run("use " + primaryDbName)
+            .run("drop table t1")
+            .dump(primaryDbName, dumpClause);
+
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[]{"t2"})
+            .run("select * from t2")
+            .verifyResults(new String[]{"11", "21"});
+  }
+
+  @Test
+  public void testCheckPointingMetadataDumpFailure() throws Throwable {
+    //To force distcp copy
+    List<String> dumpClause = Arrays.asList(
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXSIZE.varname + "'='1'",
+            "'" + HiveConf.ConfVars.HIVE_IN_TEST.varname + "'='false'",
+            "'" + HiveConf.ConfVars.HIVE_EXEC_COPYFILE_MAXNUMFILES.varname + "'='0'",
+            "'" + HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname + "'='"
+                    + UserGroupInformation.getCurrentUser().getUserName() + "'");
+
+    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+            .run("CREATE TABLE t1(a string) STORED AS TEXTFILE")
+            .run("CREATE TABLE t2(a string) STORED AS TEXTFILE")
+            .run("insert into t1 values (1)")
+            .run("insert into t1 values (2)")
+            .run("insert into t1 values (3)")
+            .run("insert into t2 values (11)")
+            .run("insert into t2 values (21)")
+            .dump(primaryDbName);
+    FileSystem fs = new Path(bootstrapDump.dumpLocation).getFileSystem(conf);
+    Path dumpPath = new Path(bootstrapDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+
+    //Delete dump ack and metadata ack, everything should be rewritten in a new dump dir
+    fs.delete(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString()), true);
+    fs.delete(new Path(dumpPath, "_dumpmetadata"), true);
+    assertFalse(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+    //Insert new data
+    primary.run("insert into "+ primaryDbName +".t1 values (12)");
+    primary.run("insert into "+ primaryDbName +".t1 values (13)");
+    //Do another dump. It should dump everything in a new dump dir
+    // checkpointing will not be used
+    WarehouseInstance.Tuple nextDump = primary.dump(primaryDbName, dumpClause);
+    replica.load(replicatedDbName, primaryDbName)
+            .run("use " + replicatedDbName)
+            .run("select * from t2")
+            .verifyResults(new String[]{"11", "21"})
+            .run("select * from t1")
+            .verifyResults(new String[]{"1", "2", "3", "12", "13"});
+    assertNotEquals(nextDump.dumpLocation, bootstrapDump.dumpLocation);
+    dumpPath = new Path(nextDump.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    assertTrue(fs.exists(new Path(dumpPath, DUMP_ACKNOWLEDGEMENT.toString())));
+  }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 56b27a5..33124c8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -55,6 +55,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -1093,13 +1094,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     // To retry with same dump delete the load ack
     new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(
-            hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+            hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true);
     // Retry with same dump with which it was already loaded also fails.
     replica.loadFailure(replicatedDbName, primaryDbName);
 
     // To retry with same dump delete the load ack
     new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(
-            hiveDumpLocation, ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+            hiveDumpLocation, LOAD_ACKNOWLEDGEMENT.toString()), true);
     // Retry from same dump when the database is empty is also not allowed.
     replica.run("drop table t1")
             .run("drop table t2")
@@ -1344,7 +1345,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     //delete load ack to reuse the dump
     new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
             + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
-            + ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+            + LOAD_ACKNOWLEDGEMENT.toString()), true);
 
     replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
     replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
@@ -1370,7 +1371,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     replica.load(replicatedDbName, primaryDbName, withConfigs);
     //delete load ack to reuse the dump
     new Path(bootstrapDump.dumpLocation).getFileSystem(conf).delete(new Path(bootstrapDump.dumpLocation
-            + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+            + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR + LOAD_ACKNOWLEDGEMENT.toString()), true);
     replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
     replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
@@ -1423,7 +1424,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     //delete load ack to reuse the dump
     new Path(tuple.dumpLocation).getFileSystem(conf).delete(new Path(tuple.dumpLocation
             + Path.SEPARATOR + ReplUtils.REPL_HIVE_BASE_DIR + Path.SEPARATOR
-            + ReplUtils.LOAD_ACKNOWLEDGEMENT), true);
+            + LOAD_ACKNOWLEDGEMENT.toString()), true);
 
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 1325789..b7a9888 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -106,8 +106,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .dump(primaryDbName, dumpWithClause);
 
     // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+    Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR),
+            EximUtil.METADATA_PATH_NAME);
     assertFalse(primary.miniDFSCluster.getFileSystem()
-        .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName))));
+        .exists(new Path(metadataPath + relativeExtInfoPath(primaryDbName))));
 
     replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("repl status " + replicatedDbName)
@@ -126,8 +128,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .dump(primaryDbName, dumpWithClause);
 
     // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+    metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR),
+            EximUtil.METADATA_PATH_NAME);
     assertFalse(primary.miniDFSCluster.getFileSystem()
-        .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null))));
+        .exists(new Path(metadataPath + relativeExtInfoPath(null))));
 
     replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
@@ -152,7 +156,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .dump(primaryDbName, withClauseOptions);
 
     // verify that the external table info is written correctly for bootstrap
-    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName);
+    assertExternalFileInfo(Arrays.asList("t1", "t2"), tuple.dumpLocation, primaryDbName, false);
 
 
 
@@ -184,7 +188,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .dump(primaryDbName, withClauseOptions);
 
     // verify that the external table info is written correctly for incremental
-    assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation);
+    assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"), tuple.dumpLocation, true);
 
     replica.load(replicatedDbName, primaryDbName, withClauseOptions)
         .run("use " + replicatedDbName)
@@ -202,7 +206,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .dumpWithCommand("repl dump " + primaryDbName);
 
     // verify that the external table info is written correctly for incremental
-    assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation);
+    assertExternalFileInfo(Arrays.asList("t2", "t3", "t4"), tuple.dumpLocation, true);
   }
 
   /**
@@ -310,7 +314,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .run("insert into t2 partition(country='india') values ('bangalore')")
         .dump(primaryDbName, withClause);
 
-    assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName);
+    assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, primaryDbName, false);
 
     replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
@@ -333,7 +337,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .run("insert into t2 partition(country='australia') values ('sydney')")
         .dump(primaryDbName, withClause);
 
-    assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation);
+    assertExternalFileInfo(Collections.singletonList("t2"), tuple.dumpLocation, true);
 
     replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
@@ -420,7 +424,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .run("alter table t1 add partition(country='us')")
         .dump(primaryDbName, withClause);
 
-    assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation);
+    assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true);
 
     // Add new data externally, to a partition, but under the partition level top directory
     // Also, it is added after dumping the events so data should not be seen at target after REPL LOAD.
@@ -467,7 +471,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
     // Repl load with zero events but external tables location info should present.
     tuple = primary.dump(primaryDbName, withClause);
-    assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation);
+    assertExternalFileInfo(Collections.singletonList("t1"), tuple.dumpLocation, true);
 
     replica.load(replicatedDbName, primaryDbName, withClause)
             .run("use " + replicatedDbName)
@@ -519,8 +523,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .dump(primaryDbName, dumpWithClause);
 
     // the _external_tables_file info only should be created if external tables are to be replicated not otherwise
+    Path metadataPath = new Path(new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR),
+            EximUtil.METADATA_PATH_NAME);
     assertFalse(primary.miniDFSCluster.getFileSystem()
-            .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(primaryDbName))));
+            .exists(new Path(metadataPath + relativeExtInfoPath(null))));
 
     replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
@@ -543,11 +549,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .dump(primaryDbName, dumpWithClause);
 
     // the _external_tables_file info should be created as external tables are to be replicated.
+    Path hivePath = new Path(tuple.dumpLocation, ReplUtils.REPL_HIVE_BASE_DIR);
     assertTrue(primary.miniDFSCluster.getFileSystem()
-            .exists(new Path(tuple.dumpLocation, relativeExtInfoPath(null))));
+            .exists(new Path(hivePath + relativeExtInfoPath(null))));
 
     // verify that the external table info is written correctly for incremental
-    assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation);
+    assertExternalFileInfo(Arrays.asList("t2", "t3"), tuple.dumpLocation, true);
 
     // _bootstrap directory should be created as bootstrap enabled on external tables.
     String hiveDumpLocation = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
@@ -762,7 +769,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     }
 
     // Only table t2 should exist in the data location list file.
-    assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation);
+    assertExternalFileInfo(Collections.singletonList("t2"), tupleInc.dumpLocation, true);
 
     // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have
     // inserted data.
@@ -917,20 +924,29 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     return ReplicationTestUtils.externalTableWithClause(extTblBaseDir, bootstrapExtTbl, includeExtTbl);
   }
 
-  private void assertExternalFileInfo(List<String> expected, String dumplocation) throws IOException {
-    assertExternalFileInfo(expected, dumplocation, null);
+  private void assertExternalFileInfo(List<String> expected, String dumplocation,
+                                      boolean isIncremental) throws IOException {
+    assertExternalFileInfo(expected, dumplocation, null, isIncremental);
   }
-  private void assertExternalFileInfo(List<String> expected, String dumplocation, String dbName)
+  private void assertExternalFileInfo(List<String> expected, String dumplocation, String dbName,
+                                      boolean isIncremental)
       throws IOException {
-    Path externalTableInfoFile = new Path(dumplocation, relativeExtInfoPath(dbName));
+    Path hivePath = new Path(dumplocation, ReplUtils.REPL_HIVE_BASE_DIR);
+    Path metadataPath = new Path(hivePath, EximUtil.METADATA_PATH_NAME);
+    Path externalTableInfoFile;
+    if (isIncremental) {
+      externalTableInfoFile = new Path(hivePath + relativeExtInfoPath(dbName));
+    } else {
+      externalTableInfoFile = new Path(metadataPath + relativeExtInfoPath(dbName));
+    }
     ReplicationTestUtils.assertExternalFileInfo(primary, expected, externalTableInfoFile);
   }
-  private String relativeExtInfoPath(String dbName) {
 
+  private String relativeExtInfoPath(String dbName) {
     if (dbName == null) {
-      return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + FILE_NAME;
+      return File.separator + FILE_NAME;
     } else {
-      return ReplUtils.REPL_HIVE_BASE_DIR + File.separator + dbName.toLowerCase() + File.separator + FILE_NAME;
+      return File.separator + dbName.toLowerCase() + File.separator + FILE_NAME;
     }
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 78251f2..93e24ef 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
@@ -242,7 +243,12 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
 
     // Check if the DB dump path have any tables other than the ones listed in bootstrappedTables.
     Path dbPath = new Path(dumpPath, primaryDbName);
-    FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath);
+    FileStatus[] fileStatuses = primary.miniDFSCluster.getFileSystem().listStatus(dbPath, new PathFilter() {
+      @Override
+      public boolean accept(Path path) {
+        return !path.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME);
+      }
+    });
     Assert.assertEquals(fileStatuses.length, bootstrappedTables.length);
 
     // Eg: _bootstrap/<db_name>/t2, _bootstrap/<db_name>/t3 etc
@@ -500,13 +506,14 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
             .dump(replPolicy, dumpWithClause);
 
     String hiveDumpDir = tuple.dumpLocation + File.separator + ReplUtils.REPL_HIVE_BASE_DIR;
+    Path metaDataPath = new Path(hiveDumpDir, EximUtil.METADATA_PATH_NAME);
     // the _external_tables_file info should be created as external tables are to be replicated.
     Assert.assertTrue(primary.miniDFSCluster.getFileSystem()
-            .exists(new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME)));
+            .exists(new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME)));
 
     // Verify that the external table info contains only table "a2".
     ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"),
-            new Path(new Path(hiveDumpDir, primaryDbName.toLowerCase()), FILE_NAME));
+            new Path(new Path(metaDataPath, primaryDbName.toLowerCase()), FILE_NAME));
 
     replica.load(replicatedDbName, replPolicy, loadWithClause)
             .run("use " + replicatedDbName)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
index 56f0c93..d3e9413 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ExportTask.java
@@ -46,7 +46,7 @@ public class ExportTask extends Task<ExportWork> implements Serializable {
       TableExport.Paths exportPaths = new TableExport.Paths(
           work.getAstRepresentationForErrorMsg(), work.getExportRootDir(), conf, false);
       Hive db = getHive();
-      LOG.debug("Exporting data to: {}", exportPaths.exportRootDir());
+      LOG.debug("Exporting data to: {}", exportPaths.metadataExportRootDir());
       work.acidPostProcess(db);
       TableExport tableExport = new TableExport(exportPaths, work.getTableSpec(),
           work.getReplicationSpec(), db, null, conf, work.getMmContext());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
index efef052..46f9bb3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/DirCopyWork.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
         Explain.Level.DEFAULT,
         Explain.Level.EXTENDED })
 public class DirCopyWork implements Serializable {
+  private static final long serialVersionUID = 1L;
   private final Path fullyQualifiedSourcePath;
   private final Path fullyQualifiedTargetPath;
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java
new file mode 100644
index 0000000..db8db5f
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplAck.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec.repl;
+
+/**
+ * ReplAck, used for repl acknowledgement constants.
+ */
+public enum ReplAck {
+    DUMP_ACKNOWLEDGEMENT("_finished_dump"),
+    LOAD_ACKNOWLEDGEMENT("_finished_load");
+  private String ack;
+  ReplAck(String ack) {
+    this.ack = ack;
+  }
+
+  @Override
+  public String toString() {
+    return ack;
+  }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 69f6ffe..2e0af02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -93,6 +93,7 @@ import java.util.UUID;
 import java.util.ArrayList;
 import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 
 public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private static final long serialVersionUID = 1L;
@@ -135,20 +136,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
                 Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
                         .getBytes(StandardCharsets.UTF_8.name())));
-        Path previousHiveDumpPath = getPreviousDumpMetadataPath(dumpRoot);
+        Path previousValidHiveDumpPath = getPreviousValidDumpMetadataPath(dumpRoot);
         //If no previous dump is present or previous dump is already loaded, proceed with the dump operation.
-        if (shouldDump(previousHiveDumpPath)) {
-          Path currentDumpPath = new Path(dumpRoot, getNextDumpDir());
+        if (shouldDump(previousValidHiveDumpPath)) {
+          Path currentDumpPath = getCurrentDumpPath(dumpRoot);
           Path hiveDumpRoot = new Path(currentDumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
           DumpMetaData dmd = new DumpMetaData(hiveDumpRoot, conf);
           // Initialize ReplChangeManager instance since we will require it to encode file URI.
           ReplChangeManager.getInstance(conf);
           Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
           Long lastReplId;
-          if (previousHiveDumpPath == null) {
+          if (previousValidHiveDumpPath == null) {
             lastReplId = bootStrapDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
           } else {
-            work.setEventFrom(getEventFromPreviousDumpMetadata(previousHiveDumpPath));
+            work.setEventFrom(getEventFromPreviousDumpMetadata(previousValidHiveDumpPath));
             lastReplId = incrementalDump(hiveDumpRoot, dmd, cmRoot, hiveDb);
           }
           work.setResultValues(Arrays.asList(currentDumpPath.toUri().toString(), String.valueOf(lastReplId)));
@@ -166,6 +167,16 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return 0;
   }
 
+  private Path getCurrentDumpPath(Path dumpRoot) throws IOException {
+    Path previousDumpPath = getPreviousDumpPath(dumpRoot);
+    if (previousDumpPath != null && !validDump(previousDumpPath) && shouldResumePreviousDump(previousDumpPath)) {
+      //Resume previous dump
+      return previousDumpPath;
+    } else {
+      return new Path(dumpRoot, getNextDumpDir());
+    }
+  }
+
   private void initiateDataCopyTasks() throws SemanticException, IOException {
     TaskTracker taskTracker = new TaskTracker(conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS));
     List<Task<?>> childTasks = new ArrayList<>();
@@ -183,7 +194,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private void finishRemainingTasks() throws SemanticException, IOException {
     prepareReturnValues(work.getResultValues());
     Path dumpAckFile = new Path(work.getCurrentDumpPath(),
-            ReplUtils.REPL_HIVE_BASE_DIR + File.separator + ReplUtils.DUMP_ACKNOWLEDGEMENT);
+            ReplUtils.REPL_HIVE_BASE_DIR + File.separator
+                    + ReplAck.DUMP_ACKNOWLEDGEMENT.toString());
     Utils.create(dumpAckFile, conf);
     deleteAllPreviousDumpMeta(work.getCurrentDumpPath());
   }
@@ -233,7 +245,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return 0L;
   }
 
-  private Path getPreviousDumpMetadataPath(Path dumpRoot) throws IOException {
+  private Path getPreviousValidDumpMetadataPath(Path dumpRoot) throws IOException {
     FileStatus latestValidStatus = null;
     FileSystem fs = dumpRoot.getFileSystem(conf);
     if (fs.exists(dumpRoot)) {
@@ -241,8 +253,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       for (FileStatus status : statuses) {
         LOG.info("Evaluating previous dump dir path:{}", status.getPath());
         if (latestValidStatus == null) {
-          latestValidStatus = validDump(fs, status.getPath()) ? status : null;
-        } else if (validDump(fs, status.getPath())
+          latestValidStatus = validDump(status.getPath()) ? status : null;
+        } else if (validDump(status.getPath())
                 && status.getModificationTime() > latestValidStatus.getModificationTime()) {
           latestValidStatus = status;
         }
@@ -254,10 +266,14 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return latestDumpDir;
   }
 
-  private boolean validDump(FileSystem fs, Path dumpDir) throws IOException {
+  private boolean validDump(Path dumpDir) throws IOException {
     //Check if it was a successful dump
-    Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR);
-    return fs.exists(new Path(hiveDumpDir, ReplUtils.DUMP_ACKNOWLEDGEMENT));
+    if (dumpDir != null) {
+      FileSystem fs = dumpDir.getFileSystem(conf);
+      Path hiveDumpDir = new Path(dumpDir, ReplUtils.REPL_HIVE_BASE_DIR);
+      return fs.exists(new Path(hiveDumpDir, ReplAck.DUMP_ACKNOWLEDGEMENT.toString()));
+    }
+    return false;
   }
 
   private boolean shouldDump(Path previousDumpPath) throws IOException {
@@ -267,7 +283,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       return true;
     } else {
       FileSystem fs = previousDumpPath.getFileSystem(conf);
-      return fs.exists(new Path(previousDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT));
+      return fs.exists(new Path(previousDumpPath, LOAD_ACKNOWLEDGEMENT.toString()));
     }
   }
 
@@ -471,8 +487,10 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
             // Dump the table to be bootstrapped if required.
             if (shouldBootstrapDumpTable(table)) {
               HiveWrapper.Tuple<Table> tableTuple = new HiveWrapper(hiveDb, dbName).table(table);
+              Path dbDataRoot = new Path(dbRoot, EximUtil.DATA_PATH_NAME);
               managedTableCopyPaths.addAll(
-                      dumpTable(dbName, tableName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId,
+                      dumpTable(dbName, tableName, validTxnList,
+                              dbRoot, dbDataRoot, bootDumpBeginReplId,
                               hiveDb, tableTuple));
             }
             if (tableList != null && isTableSatifiesConfig(table)) {
@@ -611,16 +629,20 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     List<String> tableList;
 
     LOG.info("Bootstrap Dump for db {}", work.dbNameOrPattern);
+    List<DirCopyWork> extTableCopyWorks = new ArrayList<>();
+    List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
     long timeoutInMs = HiveConf.getTimeVar(conf,
             HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
     long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
-
     String validTxnList = getValidTxnListForReplDump(hiveDb, waitUntilTime);
-    List<DirCopyWork> extTableCopyWorks = new ArrayList<>();
-    List<ManagedTableCopyPath> managedTableCopyPaths = new ArrayList<>();
+    Path metadataPath = new Path(dumpRoot, EximUtil.METADATA_PATH_NAME);
+    if (shouldResumePreviousDump(dmd)) {
+      //clear the metadata. We need to rewrite the metadata as the write id list will be changed
+      //We can't reuse the previous write id as it might be invalid due to compaction
+      metadataPath.getFileSystem(conf).delete(metadataPath, true);
+    }
     for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
       LOG.debug("Dumping db: " + dbName);
-
       // TODO : Currently we don't support separate table list for each database.
       tableList = work.replScope.includeAllTables() ? null : new ArrayList<>();
       Database db = hiveDb.getDatabase(dbName);
@@ -634,8 +656,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
               Utils.getAllTables(hiveDb, dbName, work.replScope).size(),
               hiveDb.getAllFunctions().size());
       replLogger.startLog();
-      Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId, hiveDb);
-      dumpFunctionMetadata(dbName, dumpRoot, hiveDb);
+      Path dbRoot = dumpDbMetadata(dbName, metadataPath, bootDumpBeginReplId, hiveDb);
+      Path dbDataRoot = new Path(new Path(dumpRoot, EximUtil.DATA_PATH_NAME), dbName);
+      dumpFunctionMetadata(dbName, dbRoot, hiveDb);
 
       String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
       Exception caught = null;
@@ -653,7 +676,8 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
               LOG.debug("Adding table {} to external tables list", tblName);
               extTableLocations.addAll(writer.dataLocationDump(tableTuple.object));
             }
-            managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dumpRoot, bootDumpBeginReplId,
+            managedTableCopyPaths.addAll(dumpTable(dbName, tblName, validTxnList, dbRoot, dbDataRoot,
+                    bootDumpBeginReplId,
                     hiveDb, tableTuple));
           } catch (InvalidTableException te) {
             // Bootstrap dump shouldn't fail if the table is dropped/renamed while dumping it.
@@ -677,11 +701,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
             throw e;
           } else {
             LOG.error("failed to reset the db state for " + uniqueKey
-                + " on failure of repl dump", e);
+                    + " on failure of repl dump", e);
             throw caught;
           }
         }
-        if(caught != null) {
+        if (caught != null) {
           throw caught;
         }
       }
@@ -689,21 +713,36 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
     Long bootDumpEndReplId = currentNotificationId(hiveDb);
     LOG.info("Preparing to return {},{}->{}",
-        dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
+            dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
     dmd.setDump(DumpType.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
     dmd.write();
+
     work.setDirCopyIterator(extTableCopyWorks.iterator());
     work.setManagedTableCopyPathIterator(managedTableCopyPaths.iterator());
     return bootDumpBeginReplId;
   }
 
+  private boolean shouldResumePreviousDump(DumpMetaData dumpMetaData) {
+    try {
+      return dumpMetaData.getEventFrom() != null;
+    } catch (Exception e) {
+      LOG.info("No previous dump present");
+      return false;
+    }
+  }
+
+  private boolean shouldResumePreviousDump(Path dumpPath) {
+    Path hiveDumpPath = new Path(dumpPath, ReplUtils.REPL_HIVE_BASE_DIR);
+    return shouldResumePreviousDump(new DumpMetaData(hiveDumpPath, conf));
+  }
+
   long currentNotificationId(Hive hiveDb) throws TException {
     return hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
   }
 
-  Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) throws Exception {
-    Path dbRoot = getBootstrapDbRoot(dumpRoot, dbName, false);
+  Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) throws Exception {
     // TODO : instantiating FS objects are generally costly. Refactor
+    Path dbRoot = getBootstrapDbRoot(metadataRoot, dbName, false);
     FileSystem fs = dbRoot.getFileSystem(conf);
     Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
     HiveWrapper.Tuple<Database> database = new HiveWrapper(hiveDb, dbName, lastReplId).database();
@@ -711,12 +750,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return dbRoot;
   }
 
-  List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, Path dumproot,
-                                  long lastReplId, Hive hiveDb, HiveWrapper.Tuple<Table> tuple) throws Exception {
+  List<ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRootMetadata,
+                                       Path dbRootData, long lastReplId, Hive hiveDb,
+                                       HiveWrapper.Tuple<Table> tuple) throws Exception {
     LOG.info("Bootstrap Dump for table " + tblName);
     TableSpec tableSpec = new TableSpec(tuple.object);
     TableExport.Paths exportPaths =
-        new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
+        new TableExport.Paths(work.astRepresentationForErrorMsg, dbRootMetadata, dbRootData, tblName, conf, true);
     String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
     tuple.replicationSpec.setIsReplace(true);  // by default for all other objects this is false
     if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
@@ -827,8 +867,26 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
   }
 
-  void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) throws Exception {
-    Path functionsRoot = new Path(new Path(dumpRoot, dbName), ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
+  private Path getPreviousDumpPath(Path dumpRoot) throws IOException {
+    FileSystem fs = dumpRoot.getFileSystem(conf);
+    if (fs.exists(dumpRoot)) {
+      FileStatus[] statuses = fs.listStatus(dumpRoot);
+      if (statuses.length > 0) {
+        FileStatus latestValidStatus = statuses[0];
+        for (FileStatus status : statuses) {
+          LOG.info("Evaluating previous dump dir path:{}", status.getPath());
+          if (status.getModificationTime() > latestValidStatus.getModificationTime()) {
+            latestValidStatus = status;
+          }
+        }
+        return latestValidStatus.getPath();
+      }
+    }
+    return null;
+  }
+
+  void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) throws Exception {
+    Path functionsRoot = new Path(dbMetadataRoot, ReplUtils.FUNCTIONS_ROOT_DIR_NAME);
     List<String> functionNames = hiveDb.getFunctions(dbName, "*");
     for (String functionName : functionNames) {
       HiveWrapper.Tuple<Function> tuple = functionTuple(functionName, dbName, hiveDb);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
index 3427b59..a593555 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadTask.java
@@ -45,7 +45,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.table.TableContext;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
-import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.exec.repl.util.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -65,6 +64,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.LoadDatabase.AlterDatabase;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 
 public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   private final static int ZERO_TASKS = 0;
@@ -316,7 +316,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
             || (!work.isIncrementalLoad() && !work.hasBootstrapLoadTasks())) {
       //All repl load tasks are executed and status is 0, create the task to add the acknowledgement
       AckWork replLoadAckWork = new AckWork(
-              new Path(work.dumpDirectory, ReplUtils.LOAD_ACKNOWLEDGEMENT));
+              new Path(work.dumpDirectory, LOAD_ACKNOWLEDGEMENT.toString()));
       Task<AckWork> loadAckWorkTask = TaskFactory.get(replLoadAckWork, conf);
       if (this.childTasks.isEmpty()) {
         this.childTasks.add(loadAckWorkTask);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
index 474d8c2..56efa32 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplLoadWork.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.filesystem.Constrain
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadEventsIterator;
 import org.apache.hadoop.hive.ql.exec.repl.incremental.IncrementalLoadTasksBuilder;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.session.LineageState;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -91,8 +92,10 @@ public class ReplLoadWork implements Serializable {
         this.constraintsIterator = null;
       }
     } else {
-      this.bootstrapIterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, true, hiveConf);
-      this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
+      this.bootstrapIterator = new BootstrapEventsIterator(new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME)
+              .toString(), dbNameToLoadIn, true, hiveConf);
+      this.constraintsIterator = new ConstraintEventsIterator(
+              new Path(dumpDirectory, EximUtil.METADATA_PATH_NAME).toString(), hiveConf);
       incrementalLoadTasksBuilder = null;
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
index 10732b0..05ef274 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
@@ -42,4 +42,6 @@ public interface TableEvent extends BootstrapEvent {
    * Exposing the FileSystem implementation outside which is what it should NOT do.
    */
   Path metadataPath();
+
+  Path dataPath();
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
index 1af6a4c..5bbe20c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/BootstrapEventsIterator.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.ReplicationState;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.BootstrapEvent;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.load.log.BootstrapLoadLogger;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 
@@ -82,8 +81,11 @@ public class BootstrapEventsIterator implements Iterator<BootstrapEvent> {
           throws IOException {
     Path path = new Path(dumpDirectory);
     FileSystem fileSystem = path.getFileSystem(hiveConf);
+    if (!fileSystem.exists(path)) {
+      throw new IllegalArgumentException("No data to load in path " + dumpDirectory);
+    }
     FileStatus[] fileStatuses =
-        fileSystem.listStatus(new Path(dumpDirectory), ReplUtils.getBootstrapDirectoryFilter(fileSystem));
+        fileSystem.listStatus(path, ReplUtils.getBootstrapDirectoryFilter(fileSystem));
     if ((fileStatuses == null) || (fileStatuses.length == 0)) {
       throw new IllegalArgumentException("No data to load in path " + dumpDirectory);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
index 72baee6..a311f7a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/DatabaseEventsIterator.java
@@ -57,7 +57,7 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
     this.hiveConf = hiveConf;
     FileSystem fileSystem = dbLevelPath.getFileSystem(hiveConf);
     // this is only there for the use case where we are doing table only replication and not database level
-    if (!fileSystem.exists(new Path(dbLevelPath + Path.SEPARATOR + EximUtil.METADATA_NAME))) {
+    if (!fileSystem.exists(new Path(dbLevelPath, EximUtil.METADATA_NAME))) {
       databaseEventProcessed = true;
     }
 
@@ -129,7 +129,8 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
             continue;
           }
           if (next.getPath().toString().endsWith(EximUtil.METADATA_NAME)) {
-            String replacedString = next.getPath().toString().replace(dbLevelPath.toString(), "");
+            String replacedString = next.getPath().toString()
+                    .replace(dbLevelPath.toString(), "");
             List<String> filteredNames = Arrays.stream(replacedString.split(Path.SEPARATOR))
                 .filter(StringUtils::isNotBlank)
                 .collect(Collectors.toList());
@@ -174,7 +175,15 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
       LOG.debug("functions directory: {}", next.toString());
       return postProcessing(new FSFunctionEvent(next));
     }
-    return postProcessing(new FSTableEvent(hiveConf, next.toString()));
+    return postProcessing(new FSTableEvent(hiveConf, next.toString(),
+            new Path(getDbLevelDataPath(), next.getName()).toString()));
+  }
+
+  private Path getDbLevelDataPath() {
+    if (dbLevelPath.toString().contains(Path.SEPARATOR + ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME + Path.SEPARATOR)) {
+      return new Path(dbLevelPath, EximUtil.DATA_PATH_NAME);
+    }
+    return new Path(new Path(dbLevelPath.getParent().getParent(), EximUtil.DATA_PATH_NAME), dbLevelPath.getName());
   }
 
   private BootstrapEvent postProcessing(BootstrapEvent bootstrapEvent) {
@@ -187,11 +196,14 @@ class DatabaseEventsIterator implements Iterator<BootstrapEvent> {
   private BootstrapEvent eventForReplicationState() {
     if (replicationState.partitionState != null) {
       BootstrapEvent
-          bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(), replicationState);
+          bootstrapEvent = new FSPartitionEvent(hiveConf, previous.toString(),
+              new Path(getDbLevelDataPath(), previous.getName()).toString(),
+              replicationState);
       replicationState = null;
       return bootstrapEvent;
     } else if (replicationState.lastTableReplicated != null) {
-      FSTableEvent event = new FSTableEvent(hiveConf, previous.toString());
+      FSTableEvent event = new FSTableEvent(hiveConf, previous.toString(),
+              new Path(new Path(dbLevelPath, EximUtil.DATA_PATH_NAME), previous.getName()).toString());
       replicationState = null;
       return event;
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
index a79f5b7..2d82408 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -34,9 +34,9 @@ public class FSPartitionEvent implements PartitionEvent {
   private final ReplicationState replicationState;
   private final TableEvent tableEvent;
 
-  FSPartitionEvent(HiveConf hiveConf, String metadataDir,
+  FSPartitionEvent(HiveConf hiveConf, String metadataDir, String dataDir,
       ReplicationState replicationState) {
-    tableEvent = new FSTableEvent(hiveConf, metadataDir);
+    tableEvent = new FSTableEvent(hiveConf, metadataDir, dataDir);
     this.replicationState = replicationState;
   }
 
@@ -87,4 +87,9 @@ public class FSPartitionEvent implements PartitionEvent {
   public Path metadataPath() {
     return tableEvent.metadataPath();
   }
+
+  @Override
+  public Path dataPath() {
+    return tableEvent.dataPath();
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 6d38c03..cd3d619 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -52,16 +52,19 @@ import java.util.Map;
 import static org.apache.hadoop.hive.ql.util.HiveStrictManagedMigration.getHiveUpdater;
 
 public class FSTableEvent implements TableEvent {
-  private final Path fromPath;
+  private final Path fromPathMetadata;
+  private final Path fromPathData;
   private final MetaData metadata;
   private final HiveConf hiveConf;
 
-  FSTableEvent(HiveConf hiveConf, String metadataDir) {
+  FSTableEvent(HiveConf hiveConf, String metadataDir, String dataDir) {
     try {
       URI fromURI = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(metadataDir));
-      fromPath = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+      fromPathMetadata = new Path(fromURI.getScheme(), fromURI.getAuthority(), fromURI.getPath());
+      URI fromURIData = EximUtil.getValidatedURI(hiveConf, PlanUtils.stripQuotes(dataDir));
+      fromPathData = new Path(fromURIData.getScheme(), fromURIData.getAuthority(), fromURIData.getPath());
       FileSystem fs = FileSystem.get(fromURI, hiveConf);
-      metadata = EximUtil.readMetaData(fs, new Path(fromPath, EximUtil.METADATA_NAME));
+      metadata = EximUtil.readMetaData(fs, new Path(fromPathMetadata, EximUtil.METADATA_NAME));
       this.hiveConf = hiveConf;
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -82,7 +85,12 @@ public class FSTableEvent implements TableEvent {
 
   @Override
   public Path metadataPath() {
-    return fromPath;
+    return fromPathMetadata;
+  }
+
+  @Override
+  public Path dataPath() {
+    return fromPathData;
   }
 
   /**
@@ -150,7 +158,7 @@ public class FSTableEvent implements TableEvent {
     //TODO: if partitions are loaded lazily via the iterator then we will have to avoid conversion of everything here as it defeats the purpose.
     for (Partition partition : metadata.getPartitions()) {
       // TODO: this should ideally not create AddPartitionDesc per partition
-      AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPath, tblDesc, partition);
+      AlterTableAddPartitionDesc partsDesc = addPartitionDesc(fromPathMetadata, tblDesc, partition);
       descs.add(partsDesc);
     }
     return descs;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 05a590a..b98f1f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.HiveTableName;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -57,7 +56,6 @@ import org.datanucleus.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -243,7 +241,7 @@ public class LoadPartitions {
 
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(
         event.replicationSpec(),
-        new Path(sourceWarehousePartitionLocation, EximUtil.DATA_PATH_NAME),
+        new Path(event.dataPath() + Path.SEPARATOR + getPartitionName(sourceWarehousePartitionLocation)),
         stagingDir,
         context.hiveConf, false
     );
@@ -272,6 +270,12 @@ public class LoadPartitions {
     return ptnRootTask;
   }
 
+  private String getPartitionName(Path partitionMetadataFullPath) {
+    //Get partition name by removing the metadata base path.
+    //Needed for getting the data path
+    return partitionMetadataFullPath.toString().substring(event.metadataPath().toString().length());
+  }
+
   /**
    * This will create the move of partition data from temp path to actual path
    */
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index 82a3031..bb20687 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -223,7 +222,7 @@ public class LoadTable {
     if (shouldCreateLoadTableTask) {
       LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
       Task<?> loadTableTask = loadTableTask(table, replicationSpec, new Path(tblDesc.getLocation()),
-              event.metadataPath());
+              event.dataPath());
       parentTask.addDependentTask(loadTableTask);
     }
     tracker.addTask(tblRootTask);
@@ -272,7 +271,7 @@ public class LoadTable {
 
   private Task<?> loadTableTask(Table table, ReplicationSpec replicationSpec, Path tgtPath,
       Path fromURI) {
-    Path dataPath = new Path(fromURI, EximUtil.DATA_PATH_NAME);
+    Path dataPath = fromURI;
     Path tmpPath = tgtPath;
 
     // if move optimization is enabled, copy the files directly to the target path. No need to create the staging dir.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 211c3f0..939cbc3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -94,10 +94,6 @@ public class ReplUtils {
   // Configuration to enable/disable dumping ACID tables. Used only for testing and shouldn't be
   // seen in production or in case of tests other than the ones where it's required.
   public static final String REPL_DUMP_INCLUDE_ACID_TABLES = "hive.repl.dump.include.acid.tables";
-  //Acknowledgement for repl dump complete
-  public static final String DUMP_ACKNOWLEDGEMENT = "_finished_dump";
-  //Acknowledgement for repl load complete
-  public static final String LOAD_ACKNOWLEDGEMENT = "_finished_load";
   /**
    * Bootstrap REPL LOAD operation type on the examined object based on ckpt state.
    */
@@ -240,7 +236,8 @@ public class ReplUtils {
       try {
         return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.INC_BOOTSTRAP_ROOT_DIR_NAME)
                 && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME)
-                && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME);
+                && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME)
+                && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -251,7 +248,8 @@ public class ReplUtils {
     return p -> {
       try {
         return fs.isDirectory(p) && !p.getName().equalsIgnoreCase(ReplUtils.REPL_TABLE_LIST_DIR_NAME)
-                && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME);
+                && !p.getName().equalsIgnoreCase(EximUtil.DATA_PATH_NAME)
+                && !p.getName().equalsIgnoreCase(EximUtil.METADATA_PATH_NAME);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index bc90ea1..5ada55f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -74,6 +74,7 @@ public class EximUtil {
   public static final String METADATA_NAME = "_metadata";
   public static final String FILES_NAME = "_files";
   public static final String DATA_PATH_NAME = "data";
+  public static final String METADATA_PATH_NAME = "metadata";
 
   private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 8802139..c4ff070 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.ReplAck;
 import org.apache.hadoop.hive.ql.exec.repl.ReplDumpWork;
 import org.apache.hadoop.hive.ql.exec.repl.ReplLoadWork;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
@@ -54,6 +55,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_ENABLE_MOVE_OPTIMIZATION;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_MOVE_OPTIMIZED_FILE_SCHEMES;
+import static org.apache.hadoop.hive.ql.exec.repl.ReplAck.LOAD_ACKNOWLEDGEMENT;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_DBNAME;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPLACE;
 import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_REPL_CONFIG;
@@ -424,8 +426,9 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
           }
         }
         Path hiveDumpPath = new Path(latestUpdatedStatus.getPath(), ReplUtils.REPL_HIVE_BASE_DIR);
-        if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.DUMP_ACKNOWLEDGEMENT))
-                && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, ReplUtils.LOAD_ACKNOWLEDGEMENT))) {
+        if (loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath,
+                ReplAck.DUMP_ACKNOWLEDGEMENT.toString()))
+                && !loadPathBase.getFileSystem(conf).exists(new Path(hiveDumpPath, LOAD_ACKNOWLEDGEMENT.toString()))) {
           return hiveDumpPath;
         }
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
index c3b1081..73dc606 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/PartitionExport.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.PartitionIterable;
-import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.EximUtil.ManagedTableCopyPath;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.repl.dump.io.FileOperations;
@@ -118,12 +117,13 @@ class PartitionExport {
           // this the data copy
           List<Path> dataPathList = Utils.getDataPathList(partition.getDataLocation(),
                   forReplicationSpec, hiveConf);
-          Path rootDataDumpDir = paths.partitionExportDir(partitionName);
+          Path rootDataDumpDir = paths.partitionMetadataExportDir(partitionName);
           new FileOperations(dataPathList, rootDataDumpDir, distCpDoAsUser, hiveConf, mmCtx)
                   .export(isExportTask);
+          Path dataDumpDir = new Path(paths.dataExportRootDir(), partitionName);
           LOG.debug("Thread: {}, finish partition dump {}", threadName, partitionName);
           return new ManagedTableCopyPath(forReplicationSpec, partition.getDataLocation(),
-                  new Path(rootDataDumpDir, EximUtil.DATA_PATH_NAME));
+                  dataDumpDir);
         } catch (Exception e) {
           throw new RuntimeException(e.getMessage(), e);
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 683f3c0..b11afe8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -198,22 +198,29 @@ public class TableExport {
   public static class Paths {
     private final String astRepresentationForErrorMsg;
     private final HiveConf conf;
-    //variable access should not be done and use exportRootDir() instead.
-    private final Path _exportRootDir;
+    //metadataExportRootDir and dataExportRootDir variable access should not be done and use
+    // metadataExportRootDir() and dataExportRootDir() instead.
+    private final Path metadataExportRootDir;
+    private final Path dataExportRootDir;
     private final FileSystem exportFileSystem;
-    private boolean writeData, exportRootDirCreated = false;
+    private boolean writeData, metadataExportRootDirCreated = false, dataExportRootDirCreated = false;
 
-    public Paths(String astRepresentationForErrorMsg, Path dbRoot, String tblName, HiveConf conf,
+    public Paths(String astRepresentationForErrorMsg, Path dbMetadataRoot, Path dbDataRoot,
+                 String tblName, HiveConf conf,
         boolean shouldWriteData) throws SemanticException {
       this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
       this.conf = conf;
       this.writeData = shouldWriteData;
-      Path tableRoot = new Path(dbRoot, tblName);
-      URI exportRootDir = EximUtil.getValidatedURI(conf, tableRoot.toUri().toString());
-      validateTargetDir(exportRootDir);
-      this._exportRootDir = new Path(exportRootDir);
+      Path tableRootForMetadataDump = new Path(dbMetadataRoot, tblName);
+      Path tableRootForDataDump = new Path(dbDataRoot, tblName);
+      URI metadataExportRootDirUri = EximUtil.getValidatedURI(conf, tableRootForMetadataDump.toUri().toString());
+      validateTargetDir(metadataExportRootDirUri);
+      URI dataExportRootDirUri = EximUtil.getValidatedURI(conf, tableRootForDataDump.toUri().toString());
+      validateTargetDataDir(dataExportRootDirUri);
+      this.metadataExportRootDir = new Path(metadataExportRootDirUri);
+      this.dataExportRootDir = new Path(dataExportRootDirUri);
       try {
-        this.exportFileSystem = this._exportRootDir.getFileSystem(conf);
+        this.exportFileSystem = this.metadataExportRootDir.getFileSystem(conf);
       } catch (IOException e) {
         throw new SemanticException(e);
       }
@@ -223,37 +230,58 @@ public class TableExport {
         boolean shouldWriteData) throws SemanticException {
       this.astRepresentationForErrorMsg = astRepresentationForErrorMsg;
       this.conf = conf;
-      this._exportRootDir = new Path(EximUtil.getValidatedURI(conf, path));
+      this.metadataExportRootDir = new Path(EximUtil.getValidatedURI(conf, path));
+      this.dataExportRootDir = new Path(new Path(EximUtil.getValidatedURI(conf, path)), EximUtil.DATA_PATH_NAME);
       this.writeData = shouldWriteData;
       try {
-        this.exportFileSystem = _exportRootDir.getFileSystem(conf);
+        this.exportFileSystem = metadataExportRootDir.getFileSystem(conf);
       } catch (IOException e) {
         throw new SemanticException(e);
       }
     }
 
-    Path partitionExportDir(String partitionName) throws SemanticException {
-      return exportDir(new Path(exportRootDir(), partitionName));
+    Path partitionMetadataExportDir(String partitionName) throws SemanticException {
+      return exportDir(new Path(metadataExportRootDir(), partitionName));
     }
 
     /**
-     * Access to the {@link #_exportRootDir} should only be done via this method
+     * Access to the {@link #metadataExportRootDir} should only be done via this method
      * since the creation of the directory is delayed until we figure out if we want
      * to write something or not. This is specifically important to prevent empty non-native
      * directories being created in repl dump.
      */
-    public Path exportRootDir() throws SemanticException {
-      if (!exportRootDirCreated) {
+    public Path metadataExportRootDir() throws SemanticException {
+      if (!metadataExportRootDirCreated) {
         try {
-          if (!exportFileSystem.exists(this._exportRootDir) && writeData) {
-            exportFileSystem.mkdirs(this._exportRootDir);
+          if (!exportFileSystem.exists(this.metadataExportRootDir) && writeData) {
+            exportFileSystem.mkdirs(this.metadataExportRootDir);
           }
-          exportRootDirCreated = true;
+          metadataExportRootDirCreated = true;
         } catch (IOException e) {
           throw new SemanticException(e);
         }
       }
-      return _exportRootDir;
+      return metadataExportRootDir;
+    }
+
+    /**
+     * Access to the {@link #dataExportRootDir} should only be done via this method
+     * since the creation of the directory is delayed until we figure out if we want
+     * to write something or not. This is specifically important to prevent empty non-native
+     * directories being created in repl dump.
+     */
+    public Path dataExportRootDir() throws SemanticException {
+      if (!dataExportRootDirCreated) {
+        try {
+          if (!exportFileSystem.exists(this.dataExportRootDir) && writeData) {
+            exportFileSystem.mkdirs(this.dataExportRootDir);
+          }
+          dataExportRootDirCreated = true;
+        } catch (IOException e) {
+          throw new SemanticException(e);
+        }
+      }
+      return dataExportRootDir;
     }
 
     private Path exportDir(Path exportDir) throws SemanticException {
@@ -269,7 +297,7 @@ public class TableExport {
     }
 
     private Path metaDataExportFile() throws SemanticException {
-      return new Path(exportRootDir(), EximUtil.METADATA_NAME);
+      return new Path(metadataExportRootDir(), EximUtil.METADATA_NAME);
     }
 
     /**
@@ -277,7 +305,7 @@ public class TableExport {
      * Partition's data export directory is created within the export semantics of partition.
      */
     private Path dataExportDir() throws SemanticException {
-      return exportDir(new Path(exportRootDir(), EximUtil.DATA_PATH_NAME));
+      return exportDir(dataExportRootDir());
     }
 
     /**
@@ -310,6 +338,30 @@ public class TableExport {
         throw new SemanticException(astRepresentationForErrorMsg, e);
       }
     }
+
+    /**
+     * this level of validation might not be required as the root directory in which we dump will
+     * be different for each run hence possibility of it having data is not there.
+     */
+    private void validateTargetDataDir(URI rootDirExportFile) throws SemanticException {
+      try {
+        FileSystem fs = FileSystem.get(rootDirExportFile, conf);
+        Path toPath = new Path(rootDirExportFile.getScheme(), rootDirExportFile.getAuthority(),
+                rootDirExportFile.getPath());
+        try {
+          FileStatus tgt = fs.getFileStatus(toPath);
+          // target exists
+          if (!tgt.isDirectory()) {
+            throw new SemanticException(
+                    astRepresentationForErrorMsg + ": " + "Target is not a directory : "
+                            + rootDirExportFile);
+          }
+        } catch (FileNotFoundException ignored) {
+        }
+      } catch (IOException e) {
+        throw new SemanticException(astRepresentationForErrorMsg, e);
+      }
+    }
   }
 
   public static class AuthEntities {
@@ -343,7 +395,7 @@ public class TableExport {
           authEntities.inputs.add(new ReadEntity(tableSpec.tableHandle));
         }
       }
-      authEntities.outputs.add(toWriteEntity(paths.exportRootDir(), conf));
+      authEntities.outputs.add(toWriteEntity(paths.metadataExportRootDir(), conf));
     } catch (Exception e) {
       throw new SemanticException(e);
     }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
index 81fac25..9973e9a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/TestReplDumpTask.java
@@ -84,11 +84,11 @@ public class TestReplDumpTask {
     }
 
     @Override
-    void dumpFunctionMetadata(String dbName, Path dumpRoot, Hive hiveDb) {
+    void dumpFunctionMetadata(String dbName, Path dbMetadataRoot, Hive hiveDb) {
     }
 
     @Override
-    Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId, Hive hiveDb) {
+    Path dumpDbMetadata(String dbName, Path metadataRoot, long lastReplId, Hive hiveDb) {
       return Mockito.mock(Path.class);
     }
 
@@ -128,8 +128,9 @@ public class TestReplDumpTask {
       private int tableDumpCount = 0;
 
       @Override
-      List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot,
-                                               Path replDataDir, long lastReplId, Hive hiveDb,
+      List<EximUtil.ManagedTableCopyPath> dumpTable(String dbName, String tblName, String validTxnList,
+                                                    Path dbRootMetadata, Path dbRootData,
+                                               long lastReplId, Hive hiveDb,
                                                HiveWrapper.Tuple<Table> tuple)
           throws Exception {
         tableDumpCount++;
@@ -146,7 +147,7 @@ public class TestReplDumpTask {
     );
 
     try {
-      task.bootStrapDump(mock(Path.class), null, mock(Path.class), hive);
+      task.bootStrapDump(new Path("mock"), null, mock(Path.class), hive);
     } finally {
       verifyStatic();
       Utils.resetDbBootstrapDumpState(same(hive), eq("default"), eq(dbRandomKey));