You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by an...@apache.org on 2020/03/09 03:27:12 UTC

[hive] branch master updated: HIVE-22954:Schedule Repl Load using Hive Scheduler (Aasha Medhi, reviewed by Anishek Agarwal / Pravin Kumar Sinha)

This is an automated email from the ASF dual-hosted git repository.

anishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b83836  HIVE-22954:Schedule Repl Load using Hive Scheduler (Aasha Medhi, reviewed by Anishek Agarwal / Pravin Kumar Sinha)
3b83836 is described below

commit 3b838362d3ba6300f30e830f02dd219e7005a3d7
Author: Anishek Agarwal <an...@gmail.com>
AuthorDate: Mon Mar 9 08:56:58 2020 +0530

    HIVE-22954:Schedule Repl Load using Hive Scheduler (Aasha Medhi, reviewed by Anishek Agarwal / Pravin Kumar Sinha)
---
 .../ql/parse/BaseReplicationAcrossInstances.java   |   2 +
 .../parse/BaseReplicationScenariosAcidTables.java  |   3 +
 .../hadoop/hive/ql/parse/ReplicationTestUtils.java |   4 +-
 .../apache/hadoop/hive/ql/parse/TestCopyUtils.java |   6 +-
 .../ql/parse/TestMetaStoreEventListenerInRepl.java |  13 +-
 .../ql/parse/TestReplicationOfHiveStreaming.java   |  61 ++---
 .../parse/TestReplicationOnHDFSEncryptedZones.java |   9 +-
 .../hive/ql/parse/TestReplicationScenarios.java    | 170 +++++++------
 .../parse/TestReplicationScenariosAcidTables.java  |  69 +++---
 ...estReplicationScenariosAcidTablesBootstrap.java |  32 +--
 .../TestReplicationScenariosAcrossInstances.java   | 268 ++++++---------------
 .../TestReplicationScenariosExternalTables.java    |  78 +++---
 ...icationScenariosExternalTablesMetaDataOnly.java |  46 ++--
 ...licationScenariosIncrementalLoadAcidTables.java |  10 +-
 .../parse/TestReplicationWithTableMigration.java   |  51 ++--
 .../parse/TestReplicationWithTableMigrationEx.java |  52 ++--
 .../parse/TestScheduledReplicationScenarios.java   |  52 ++--
 .../ql/parse/TestStatsReplicationScenarios.java    |  34 +--
 .../parse/TestTableLevelReplicationScenarios.java  |  16 +-
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |  34 ++-
 .../java/org/apache/hive/jdbc/TestJdbcDriver2.java |  12 +-
 .../org/apache/hive/jdbc/TestJdbcWithMiniHS2.java  |   5 +-
 .../org/apache/hadoop/hive/ql/parse/HiveParser.g   |  14 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |  27 ++-
 .../hive/ql/parse/ReplicationSemanticAnalyzer.java | 150 ++++++++----
 .../hadoop/hive/ql/parse/TestParseUtils.java       |   2 +-
 .../ql/parse/TestReplicationSemanticAnalyzer.java  |  82 +------
 .../clientnegative/repl_dump_requires_admin.q      |   2 +-
 .../clientnegative/repl_load_requires_admin.q      |   4 +-
 .../queries/clientpositive/repl_load_old_version.q |  10 -
 .../clientnegative/repl_load_requires_admin.q.out  |   3 +-
 31 files changed, 595 insertions(+), 726 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
index d321cca..b805b19 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationAcrossInstances.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.shims.Utils;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -57,6 +58,7 @@ public class BaseReplicationAcrossInstances {
     }};
     localOverrides.putAll(overrides);
     primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+    localOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
index cb6bd2d..38580c1 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/BaseReplicationScenariosAcidTables.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
 import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.DriverFactory;
@@ -87,6 +88,7 @@ public class BaseReplicationScenariosAcidTables {
     acidEnableConf.putAll(overrides);
 
     primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+    acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
     HashMap<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
@@ -94,6 +96,7 @@ public class BaseReplicationScenariosAcidTables {
         put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
         put("hive.metastore.client.capability.check", "false");
     }};
+    overridesForHiveConf1.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
index 936acc4..a82bbad 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/ReplicationTestUtils.java
@@ -265,11 +265,11 @@ public class ReplicationTestUtils {
                                                               List<String> selectStmtList,
                                                   List<String[]> expectedValues, String lastReplId) throws Throwable {
     WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
     verifyResultsInReplica(replica, replicatedDbName, selectStmtList, expectedValues);
 
-    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
     verifyResultsInReplica(replica, replicatedDbName, selectStmtList, expectedValues);
     return incrementalDump;
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
index 77a0d33..9648c72 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestCopyUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.hive.shims.Utils;
@@ -95,6 +96,7 @@ public class TestCopyUtils {
       put(ConfVars.HIVE_DISTCP_DOAS_USER.varname, currentUser);
     }};
     primary = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf);
+    overridesForHiveConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstanceWithMR(LOG, miniDFSCluster, overridesForHiveConf);
   }
 
@@ -121,7 +123,7 @@ public class TestCopyUtils {
    */
   @Test
   public void testPrivilegedDistCpWithSameUserAsCurrentDoesNotTryToImpersonate() throws Throwable {
-    WarehouseInstance.Tuple tuple = primary
+    primary
         .run("use " + primaryDbName)
         .run("create table t1 (id int)")
         .run("insert into t1 values (1),(2),(3)")
@@ -132,7 +134,7 @@ public class TestCopyUtils {
       We have to do a comparision on the data of table t1 in replicated database because even though the file
       copy will fail due to impersonation failure the driver will return a success code 0. May be something to look at later
     */
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("select * from " + replicatedDbName + ".t1")
         .verifyResults(Arrays.asList("1", "2", "3", "12", "11", "13"));
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
index 7b0f634..703d16f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestMetaStoreEventListenerInRepl.java
@@ -84,6 +84,7 @@ public class TestMetaStoreEventListenerInRepl {
     }};
 
     primary = new WarehouseInstance(LOG, miniDFSCluster, conf);
+    conf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstance(LOG, miniDFSCluster, conf);
   }
 
@@ -172,26 +173,26 @@ public class TestMetaStoreEventListenerInRepl {
   @Test
   public void testReplEvents() throws Throwable {
     Map<String, Set<String>> eventsMap = prepareBootstrapData(primaryDbName);
-    WarehouseInstance.Tuple bootstrapDump = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
     ReplMetaStoreEventListenerTestImpl.clearSanityData();
 
     eventsMap = prepareIncData(primaryDbName);
     LOG.info(testName.getMethodName() + ": first incremental dump and load.");
-    WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, incDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
     ReplMetaStoreEventListenerTestImpl.clearSanityData();
 
     // Second incremental, after bootstrap
     eventsMap = prepareInc2Data(primaryDbName);
     LOG.info(testName.getMethodName() + ": second incremental dump and load.");
-    WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     ReplMetaStoreEventListenerTestImpl.checkEventSanity(eventsMap, replicatedDbName);
     ReplMetaStoreEventListenerTestImpl.clearSanityData();
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java
index 5c8f902..62457b3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOfHiveStreaming.java
@@ -91,6 +91,7 @@ public class TestReplicationOfHiveStreaming {
     acidEnableConf.putAll(overrides);
 
     primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+    acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
   }
 
@@ -116,8 +117,8 @@ public class TestReplicationOfHiveStreaming {
 
   @Test
   public void testHiveStreamingUnpartitionedWithTxnBatchSizeAsOne() throws Throwable {
-    WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
 
     // Create an ACID table.
     String tblName = "alerts";
@@ -148,8 +149,8 @@ public class TestReplicationOfHiveStreaming {
     connection.commitTransaction();
 
     // Replicate the committed data which should be visible.
-    WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " order by msg")
             .verifyResults((new String[] {"val1", "val2"}));
@@ -160,8 +161,8 @@ public class TestReplicationOfHiveStreaming {
     connection.write("4,val4".getBytes());
 
     // Replicate events before committing txn. The uncommitted data shouldn't be seen.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " order by msg")
             .verifyResults((new String[] {"val1", "val2"}));
@@ -169,8 +170,8 @@ public class TestReplicationOfHiveStreaming {
     connection.commitTransaction();
 
     // After commit, the data should be replicated and visible.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " order by msg")
             .verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -182,8 +183,8 @@ public class TestReplicationOfHiveStreaming {
     connection.abortTransaction();
 
     // Aborted data shouldn't be visible.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " order by msg")
             .verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -194,8 +195,8 @@ public class TestReplicationOfHiveStreaming {
 
   @Test
   public void testHiveStreamingStaticPartitionWithTxnBatchSizeAsOne() throws Throwable {
-    WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
 
     // Create an ACID table.
     String tblName = "alerts";
@@ -233,8 +234,8 @@ public class TestReplicationOfHiveStreaming {
     connection.commitTransaction();
 
     // Replicate the committed data which should be visible.
-    WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
             .verifyResults((new String[] {"val1", "val2"}));
@@ -245,8 +246,8 @@ public class TestReplicationOfHiveStreaming {
     connection.write("4,val4".getBytes());
 
     // Replicate events before committing txn. The uncommitted data shouldn't be seen.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
             .verifyResults((new String[] {"val1", "val2"}));
@@ -254,8 +255,8 @@ public class TestReplicationOfHiveStreaming {
     connection.commitTransaction();
 
     // After commit, the data should be replicated and visible.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
             .verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -267,8 +268,8 @@ public class TestReplicationOfHiveStreaming {
     connection.abortTransaction();
 
     // Aborted data shouldn't be visible.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
             .verifyResults((new String[] {"val1", "val2", "val3", "val4"}));
@@ -279,8 +280,8 @@ public class TestReplicationOfHiveStreaming {
 
   @Test
   public void testHiveStreamingDynamicPartitionWithTxnBatchSizeAsOne() throws Throwable {
-    WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, bootstrapDump.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
 
     // Create an ACID table.
     String tblName = "alerts";
@@ -315,8 +316,8 @@ public class TestReplicationOfHiveStreaming {
     connection.commitTransaction();
 
     // Replicate the committed data which should be visible.
-    WarehouseInstance.Tuple incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='China' order by msg")
             .verifyResults((new String[] {"val11"}))
@@ -329,8 +330,8 @@ public class TestReplicationOfHiveStreaming {
     connection.write("14,val14,Asia,India".getBytes());
 
     // Replicate events before committing txn. The uncommitted data shouldn't be seen.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
             .verifyResults((new String[] {"val12"}));
@@ -338,8 +339,8 @@ public class TestReplicationOfHiveStreaming {
     connection.commitTransaction();
 
     // After committing the txn, the data should be visible.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
             .verifyResults((new String[] {"val12", "val14"}))
@@ -353,8 +354,8 @@ public class TestReplicationOfHiveStreaming {
     connection.abortTransaction();
 
     // Aborted data should not be visible.
-    incrDump = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, incrDump.dumpLocation)
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("select msg from " + tblName + " where continent='Asia' and country='India' order by msg")
             .verifyResults((new String[] {"val12", "val14"}))
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
index 0a69d63..f6a33bc 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationOnHDFSEncryptedZones.java
@@ -99,6 +99,7 @@ public class TestReplicationOnHDFSEncryptedZones {
           put(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname, "false");
           put(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER.varname,
                   UserGroupInformation.getCurrentUser().getUserName());
+          put(HiveConf.ConfVars.REPLDIR.varname, primary.repldDir);
         }}, "test_key123");
 
     WarehouseInstance.Tuple tuple =
@@ -109,8 +110,8 @@ public class TestReplicationOnHDFSEncryptedZones {
             .dump(primaryDbName);
 
     replica
-        .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
-                + "' with('hive.repl.add.raw.reserved.namespace'='true', "
+        .run("repl load " + primaryDbName + " into " + replicatedDbName
+                + " with('hive.repl.add.raw.reserved.namespace'='true', "
                 + "'hive.repl.replica.external.table.base.dir'='" + replica.externalTableWarehouseRoot + "', "
                 + "'distcp.options.pugpbx'='', 'distcp.options.skipcrccheck'='')")
         .run("use " + replicatedDbName)
@@ -140,8 +141,8 @@ public class TestReplicationOnHDFSEncryptedZones {
             .dump(primaryDbName);
 
     replica
-        .run("repl load " + replicatedDbName + " from '" + tuple.dumpLocation
-            + "' with('hive.repl.add.raw.reserved.namespace'='true')")
+        .run("repl load " + primaryDbName + " into " + replicatedDbName
+            + " with('hive.repl.add.raw.reserved.namespace'='true')")
         .run("use " + replicatedDbName)
         .run("repl status " + replicatedDbName)
         .verifyResult(tuple.lastReplicationId)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 003533a..b709ce7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -253,7 +253,7 @@ public class TestReplicationScenarios {
 
   private Tuple incrementalLoadAndVerify(String dbName, String replDbName) throws IOException {
     Tuple dump = replDumpDb(dbName);
-    loadAndVerify(replDbName, dump.dumpLocation, dump.lastReplId);
+    loadAndVerify(replDbName, dbName, dump.lastReplId);
     return dump;
   }
 
@@ -267,8 +267,8 @@ public class TestReplicationScenarios {
     return new Tuple(dumpLocation, lastReplId);
   }
 
-  private void loadAndVerify(String replDbName, String dumpLocation, String lastReplId) throws IOException {
-    run("REPL LOAD " + replDbName + " FROM '" + dumpLocation + "'", driverMirror);
+  private void loadAndVerify(String replDbName, String sourceDbNameOrPattern, String lastReplId) throws IOException {
+    run("REPL LOAD " + sourceDbNameOrPattern + " INTO " + replDbName, driverMirror);
     verifyRun("REPL STATUS " + replDbName, lastReplId, driverMirror);
     return;
   }
@@ -393,7 +393,7 @@ public class TestReplicationScenarios {
     assertEquals(false, hasMoveTask(task));
     assertEquals(true, hasPartitionTask(task));
 
-    loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId);
+    loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
 
     run("insert into table " + dbName + ".t2 partition(country='india') values ('delhi')", driver);
     dump = replDumpDb(dbName);
@@ -404,7 +404,7 @@ public class TestReplicationScenarios {
     assertEquals(true, hasMoveTask(task));
     assertEquals(true, hasPartitionTask(task));
 
-    loadAndVerify(dbNameReplica, dump.dumpLocation, dump.lastReplId);
+    loadAndVerify(dbNameReplica, dbName, dump.lastReplId);
 
     run("insert into table " + dbName + ".t2 partition(country='us') values ('sf')", driver);
     dump = replDumpDb(dbName);
@@ -458,7 +458,7 @@ public class TestReplicationScenarios {
     // Partition droppped after "repl dump"
     run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)", driver);
 
-    run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
     verifyRun("REPL STATUS " + replDbName, new String[] {replDumpId}, driverMirror);
 
     verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror);
@@ -495,7 +495,7 @@ public class TestReplicationScenarios {
     createDB(dbName + "_withtable", driverMirror);
     run("CREATE TABLE " + dbName + "_withtable.unptned(a string) STORED AS TEXTFILE", driverMirror);
     // Load using same dump to a DB with table. It should fail as DB is not empty.
-    verifyFail("REPL LOAD " + dbName + "_withtable FROM '" + replDumpLocn + "'", driverMirror);
+    verifyFail("REPL LOAD " + dbName + " INTO " + dbName + "_withtable ", driverMirror);
 
     // REPL STATUS should return NULL
     verifyRun("REPL STATUS " + dbName + "_withtable", nullReplId, driverMirror);
@@ -505,7 +505,7 @@ public class TestReplicationScenarios {
     run("CREATE TABLE " + dbName + "_withview.unptned(a string) STORED AS TEXTFILE", driverMirror);
     run("CREATE VIEW " + dbName + "_withview.view AS SELECT * FROM " + dbName + "_withview.unptned", driverMirror);
     // Load using same dump to a DB with view. It should fail as DB is not empty.
-    verifyFail("REPL LOAD " + dbName + "_withview FROM '" + replDumpLocn + "'", driverMirror);
+    verifyFail("REPL LOAD " + dbName + " INTO " + dbName + "_withview", driverMirror);
 
     // REPL STATUS should return NULL
     verifyRun("REPL STATUS " + dbName + "_withview", nullReplId, driverMirror);
@@ -566,7 +566,7 @@ public class TestReplicationScenarios {
     String replDumpLocn = getResult(0, 0, driver);
     String replDumpId = getResult(0, 1, true, driver);
     LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
-    run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
 
     // The ptned table should miss in target as the table was marked virtually as dropped
     verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror);
@@ -581,7 +581,7 @@ public class TestReplicationScenarios {
     String postDropReplDumpLocn = getResult(0,0, driver);
     String postDropReplDumpId = getResult(0,1,true,driver);
     LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
-    assert(run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", true, driverMirror));
+    assert(run("REPL LOAD " + dbName + " INTO " + replDbName, true, driverMirror));
 
     verifyRun("SELECT * from " + replDbName + ".unptned", unptn_data, driverMirror);
     verifyIfTableNotExist(replDbName, "ptned", metaStoreClientMirror);
@@ -633,7 +633,7 @@ public class TestReplicationScenarios {
     String replDumpLocn = getResult(0, 0, driver);
     String replDumpId = getResult(0, 1, true, driver);
     LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
-    run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
 
     // All partitions should miss in target as it was marked virtually as dropped
     verifyRun("SELECT a from " + replDbName + ".ptned WHERE b=1", empty, driverMirror);
@@ -650,7 +650,7 @@ public class TestReplicationScenarios {
     String postDropReplDumpLocn = getResult(0,0,driver);
     String postDropReplDumpId = getResult(0,1,true,driver);
     LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
-    assert(run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", true, driverMirror));
+    assert(run("REPL LOAD " + dbName + " INTO " + replDbName, true, driverMirror));
 
     verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror);
     verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror);
@@ -872,7 +872,6 @@ public class TestReplicationScenarios {
 
     Tuple incrementalDump = replDumpDb(dbName);
     String incrementalDumpLocn = incrementalDump.dumpLocation;
-    replDumpId = incrementalDump.lastReplId;
 
     // Rename the event directories such a way that the length varies.
     // We will encounter create_table, truncate followed by insert.
@@ -906,7 +905,7 @@ public class TestReplicationScenarios {
     }
 
     // Load from modified dump event directories.
-    run("REPL LOAD " + replDbName + " FROM '" + incrementalDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
   }
 
@@ -1110,7 +1109,7 @@ public class TestReplicationScenarios {
     // Drop partition after dump
     run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')", driver);
 
-    run("REPL LOAD " + replDbName + " FROM '" + postDropReplDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
 
     Exception e = null;
     try {
@@ -1710,7 +1709,7 @@ public class TestReplicationScenarios {
     verifyFail("SELECT * FROM " + dbName + ".unptned_tmp", driver);
 
     // Dump all the events except DROP
-    loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
 
     // Need to find the tables and data as drop is not part of this dump
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
@@ -1760,7 +1759,7 @@ public class TestReplicationScenarios {
     verifyFail("SELECT * FROM " + dbName + ".ptned", driver);
 
     // Replicate all the events except DROP
-    loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
 
     // Need to find the tables and data as drop is not part of this dump
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -1793,7 +1792,7 @@ public class TestReplicationScenarios {
     run("INSERT OVERWRITE TABLE " + dbName + ".unptned values('" + data_after_ovwrite[0] + "')", driver);
 
     // Replicate only one INSERT INTO operation on the table.
-    loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
 
     // After Load from this dump, all target tables/partitions will have initial set of data but source will have latest data.
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
@@ -1829,7 +1828,7 @@ public class TestReplicationScenarios {
     verifySetup("SELECT a from " + dbName + ".ptned where (b=2)", data_after_ovwrite, driver);
 
     // Replicate only 2 INSERT INTO operations.
-    loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
     incrementalDump = replDumpDb(dbName);
 
     // After Load from this dump, all target tables/partitions will have initial set of data.
@@ -1837,7 +1836,7 @@ public class TestReplicationScenarios {
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
 
     // Replicate the remaining INSERT OVERWRITE operation on the table.
-    loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
 
     // After load, shall see the overwritten data.
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -1932,7 +1931,7 @@ public class TestReplicationScenarios {
     run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_renamed", driver);
     run("ALTER TABLE " + dbName + ".ptned RENAME TO " + dbName + ".ptned_renamed", driver);
 
-    loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
 
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -1971,7 +1970,7 @@ public class TestReplicationScenarios {
 
     run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=10)", driver);
 
-    loadAndVerify(replDbName, incrementalDump.dumpLocation, incrementalDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrementalDump.lastReplId);
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=10) ORDER BY a", empty, driverMirror);
@@ -2339,20 +2338,20 @@ public class TestReplicationScenarios {
     run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data_load1[0] + "')", driver);
     verifyRun("SELECT a from " + dbName + ".unptned ORDER BY a", unptn_data_load1, driver);
 
-    run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
 
     // Dump and load only first insert (1 record)
-    loadAndVerify(replDbName, firstInsert.dumpLocation, firstInsert.lastReplId);
+    loadAndVerify(replDbName, dbName, firstInsert.lastReplId);
 
     verifyRun("SELECT a from " + dbName + "_dupe.unptned ORDER BY a", unptn_data_load1, driverMirror);
 
     // Dump and load only second insert (2 records)
 
-    loadAndVerify(replDbName, secondInsert.dumpLocation, secondInsert.lastReplId);
+    loadAndVerify(replDbName, dbName, secondInsert.lastReplId);
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data_load2, driverMirror);
 
     // Dump and load only truncate (0 records)
-    loadAndVerify(replDbName, thirdTrunc.dumpLocation, thirdTrunc.lastReplId);
+    loadAndVerify(replDbName, dbName, thirdTrunc.lastReplId);
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror);
 
     // Dump and load insert after truncate (1 record)
@@ -2361,7 +2360,7 @@ public class TestReplicationScenarios {
   }
 
   @Test
-  public void testIncrementalRepeatEventOnExistingObject() throws IOException {
+  public void testIncrementalRepeatEventOnExistingObject() throws IOException, InterruptedException {
     String testName = "incrementalRepeatEventOnExistingObject";
     String dbName = createDB(testName, driver);
     run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE", driver);
@@ -2383,46 +2382,46 @@ public class TestReplicationScenarios {
     run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver);
     Tuple replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // INSERT EVENT to partitioned table with dynamic ADD_PARTITION
     run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=1) values('" + ptn_data_1[0] + "')", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // ADD_PARTITION EVENT to partitioned table
     run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // INSERT EVENT to partitioned table on existing partition
     run("INSERT INTO TABLE " + dbName + ".ptned PARTITION(b=2) values('" + ptn_data_2[0] + "')", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // TRUNCATE_PARTITION EVENT on partitioned table
     run("TRUNCATE TABLE " + dbName + ".ptned PARTITION (b=1)", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // TRUNCATE_TABLE EVENT on unpartitioned table
     run("TRUNCATE TABLE " + dbName + ".unptned", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // CREATE_TABLE EVENT with multiple partitions
     run("CREATE TABLE " + dbName + ".unptned_tmp AS SELECT * FROM " + dbName + ".ptned", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // ADD_CONSTRAINT EVENT
     run("ALTER TABLE " + dbName + ".unptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // Replicate all the events happened so far
     for (Tuple currDump : incrementalDumpList) {
       // Load the incremental dump and ensure it does nothing and lastReplID remains same
-      loadAndVerify(replDbName, currDump.dumpLocation, currDump.lastReplId);
+      loadAndVerify(replDbName, dbName, currDump.lastReplId);
     }
     Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName);
 
@@ -2432,18 +2431,15 @@ public class TestReplicationScenarios {
     verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror);
 
-    // Load each incremental dump from the list. Each dump have only one operation.
-    for (Tuple currDump : incrementalDumpList) {
-      // Load the incremental dump and ensure it does nothing and lastReplID remains same
-      loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId);
+    // Load the incremental dump and ensure it does nothing and lastReplID remains same
+    loadAndVerify(replDbName, dbName, incrDump.lastReplId);
 
-      // Verify if the data are intact even after applying an applied event once again on existing objects
-      verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror);
-      verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
-      verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
-      verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty, driverMirror);
-      verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror);
-    }
+    // Verify if the data are intact even after applying an applied event once again on existing objects
+    verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", empty, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".ptned where (b=2) ORDER BY a", ptn_data_2, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=1) ORDER BY a", empty, driverMirror);
+    verifyRun("SELECT a from " + replDbName + ".unptned_tmp where (b=2) ORDER BY a", ptn_data_2, driverMirror);
   }
 
   @Test
@@ -2468,76 +2464,76 @@ public class TestReplicationScenarios {
     run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')", driver);
     Tuple replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // INSERT EVENT to partitioned table with dynamic ADD_PARTITION
     run("INSERT INTO TABLE " + dbName + ".ptned partition(b=1) values('" + ptn_data_1[0] + "')", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // ADD_PARTITION EVENT to partitioned table
     run("ALTER TABLE " + dbName + ".ptned ADD PARTITION (b=2)", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // INSERT EVENT to partitioned table on existing partition
     run("INSERT INTO TABLE " + dbName + ".ptned partition(b=2) values('" + ptn_data_2[0] + "')", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // TRUNCATE_PARTITION EVENT on partitioned table
     run("TRUNCATE TABLE " + dbName + ".ptned PARTITION(b=1)", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // TRUNCATE_TABLE EVENT on unpartitioned table
     run("TRUNCATE TABLE " + dbName + ".unptned", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // CREATE_TABLE EVENT on partitioned table
     run("CREATE TABLE " + dbName + ".ptned_tmp (a string) PARTITIONED BY (b int) STORED AS TEXTFILE", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // INSERT EVENT to partitioned table with dynamic ADD_PARTITION
     run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=10) values('" + ptn_data_1[0] + "')", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // INSERT EVENT to partitioned table with dynamic ADD_PARTITION
     run("INSERT INTO TABLE " + dbName + ".ptned_tmp partition(b=20) values('" + ptn_data_2[0] + "')", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // DROP_PARTITION EVENT to partitioned table
     run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b=1)", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // RENAME_PARTITION EVENT to partitioned table
     run("ALTER TABLE " + dbName + ".ptned PARTITION (b=2) RENAME TO PARTITION (b=20)", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // RENAME_TABLE EVENT to unpartitioned table
     run("ALTER TABLE " + dbName + ".unptned RENAME TO " + dbName + ".unptned_new", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // ADD_CONSTRAINT EVENT
     run("ALTER TABLE " + dbName + ".ptned_tmp ADD CONSTRAINT uk_unptned UNIQUE(a) disable", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // DROP_TABLE EVENT to partitioned table
     run("DROP TABLE " + dbName + ".ptned_tmp", driver);
     replDump = replDumpDb(dbName);
     incrementalDumpList.add(replDump);
-
+    Thread.sleep(1000);
     // Load each incremental dump from the list. Each dump have only one operation.
     for (Tuple currDump : incrementalDumpList) {
       // Load the current incremental dump and ensure it does nothing and lastReplID remains same
-      loadAndVerify(replDbName, currDump.dumpLocation, currDump.lastReplId);
+      loadAndVerify(replDbName, dbName, currDump.lastReplId);
     }
     // Replicate all the events happened so far
     Tuple incrDump = incrementalLoadAndVerify(dbName, replDbName);
@@ -2551,19 +2547,17 @@ public class TestReplicationScenarios {
     verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20")), metaStoreClientMirror);
 
     // Load each incremental dump from the list. Each dump have only one operation.
-    for (Tuple currDump : incrementalDumpList) {
-      // Load the current incremental dump and ensure it does nothing and lastReplID remains same
-      loadAndVerify(replDbName, currDump.dumpLocation, incrDump.lastReplId);
-
-      // Verify if the data are intact even after applying an applied event once again on missing objects
-      verifyIfTableNotExist(replDbName, "unptned", metaStoreClientMirror);
-      verifyIfTableNotExist(replDbName, "ptned_tmp", metaStoreClientMirror);
-      verifyIfTableExist(replDbName, "unptned_new", metaStoreClientMirror);
-      verifyIfTableExist(replDbName, "ptned", metaStoreClientMirror);
-      verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror);
-      verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror);
-      verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20")), metaStoreClientMirror);
-    }
+    // Load the current incremental dump and ensure it does nothing and lastReplID remains same
+    loadAndVerify(replDbName, dbName, incrDump.lastReplId);
+
+    // Verify if the data are intact even after applying an applied event once again on missing objects
+    verifyIfTableNotExist(replDbName, "unptned", metaStoreClientMirror);
+    verifyIfTableNotExist(replDbName, "ptned_tmp", metaStoreClientMirror);
+    verifyIfTableExist(replDbName, "unptned_new", metaStoreClientMirror);
+    verifyIfTableExist(replDbName, "ptned", metaStoreClientMirror);
+    verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("1")), metaStoreClientMirror);
+    verifyIfPartitionNotExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("2")), metaStoreClientMirror);
+    verifyIfPartitionExist(replDbName, "ptned", new ArrayList<>(Arrays.asList("20")), metaStoreClientMirror);
   }
 
   @Test
@@ -2661,14 +2655,14 @@ public class TestReplicationScenarios {
     // Replicate all the events happened so far. It should fail as the data files missing in
     // original path and not available in CM as well.
     Tuple incrDump = replDumpDb(dbName);
-    verifyFail("REPL LOAD " + replDbName + " FROM '" + incrDump.dumpLocation + "'", driverMirror);
+    verifyFail("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
 
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", empty, driverMirror);
     verifyFail("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", driverMirror);
 
     // Move the files back to original data location
     assert(dataFs.rename(tmpLoc, ptnLoc));
-    loadAndVerify(replDbName, incrDump.dumpLocation, incrDump.lastReplId);
+    loadAndVerify(replDbName, dbName, incrDump.lastReplId);
 
     verifyRun("SELECT a from " + replDbName + ".ptned where (b=1) ORDER BY a", ptn_data_1, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".ptned_tmp where (b=1) ORDER BY a", ptn_data_1, driverMirror);
@@ -2890,7 +2884,7 @@ public class TestReplicationScenarios {
     String replDumpLocn = replDumpDb(dbName).dumpLocation;
     // Reset the driver
     driverMirror.close();
-    run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
     // Calling close() explicitly to clean up the staging dirs
     driverMirror.close();
     // Check result
@@ -2941,7 +2935,7 @@ public class TestReplicationScenarios {
     run("TRUNCATE TABLE " + dbName + ".unptned", driver);
 
     LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
-    run("REPL LOAD " + replDbName + " FROM '" + replDumpLocn + "'", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName, driverMirror);
 
     verifyRun("SELECT count(*) from " + replDbName + ".unptned", new String[]{"2"}, driverMirror);
   }
@@ -3147,7 +3141,7 @@ public class TestReplicationScenarios {
     fs.delete(path);
 
     try {
-      driverMirror.run("REPL LOAD " + dbName + " FROM '" + dumpLocation + "'");
+      driverMirror.run("REPL LOAD " + dbName + " INTO " + dbName);
       assert false;
     } catch (CommandProcessorException e) {
       assertTrue(e.getResponseCode() == ErrorMsg.REPL_FILE_MISSING_FROM_SRC_AND_CM_PATH.getErrorCode());
@@ -3287,8 +3281,8 @@ public class TestReplicationScenarios {
 
     String replDbName = dbName + "_replica";
     Tuple dump = replDumpDb(dbName);
-    run("REPL LOAD " + replDbName + " FROM '" + dump.dumpLocation +
-            "' with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName +
+            " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
     verifyRun("REPL STATUS " + replDbName, dump.lastReplId, driverMirror);
 
     run(" use " + replDbName, driverMirror);
@@ -3308,8 +3302,7 @@ public class TestReplicationScenarios {
     String dbName = createDB(testName, driver);
     String replDbName = dbName + "_replica";
 
-    Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
-    String replDumpId = bootstrapDump.lastReplId;
+    bootstrapLoadAndVerify(dbName, replDbName);
 
     String[] unptn_data = new String[] { "eleven", "twelve" };
 
@@ -3322,10 +3315,9 @@ public class TestReplicationScenarios {
     verifySetup("SELECT * from " + dbName + ".unptned_late ORDER BY a", unptn_data, driver);
 
     Tuple incrementalDump = replDumpDb(dbName);
-    run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation +
-            "' with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName +
+            " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
     verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
-    replDumpId = incrementalDump.lastReplId;
 
     verifyRun("SELECT a from " + replDbName + ".unptned ORDER BY a", unptn_data, driverMirror);
     verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data, driverMirror);
@@ -3340,8 +3332,8 @@ public class TestReplicationScenarios {
     verifySetup("SELECT a from " + dbName + ".unptned", data_after_ovwrite, driver);
 
     incrementalDump = replDumpDb(dbName);
-    run("REPL LOAD " + replDbName + " FROM '" + incrementalDump.dumpLocation +
-            "' with ('hive.repl.enable.move.optimization'='true')", driverMirror);
+    run("REPL LOAD " + dbName + " INTO " + replDbName +
+            " with ('hive.repl.enable.move.optimization'='true')", driverMirror);
     verifyRun("REPL STATUS " + replDbName, incrementalDump.lastReplId, driverMirror);
 
     verifyRun("SELECT a from " + replDbName + ".unptned_late ORDER BY a", unptn_data_after_ins, driverMirror);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index f3a1abb..2854045 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -51,6 +51,7 @@ import java.util.Collections;
 import java.util.Map;
 
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+import static org.junit.Assert.assertEquals;
 
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables.
@@ -90,6 +91,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     acidEnableConf.putAll(overrides);
 
     primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+    acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
     Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
           put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
@@ -97,6 +99,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
           put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
           put("hive.metastore.client.capability.check", "false");
       }};
+    overridesForHiveConf1.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
   }
 
@@ -117,7 +120,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   public void testAcidTablesBootstrap() throws Throwable {
     // Bootstrap
     WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
 
     // First incremental, after bootstrap
@@ -126,7 +129,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     LOG.info(testName.getMethodName() + ": first incremental dump and load.");
     WarehouseInstance.Tuple incDump = primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, incDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
 
     // Second incremental, after bootstrap
@@ -135,14 +138,14 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     LOG.info(testName.getMethodName() + ": second incremental dump and load.");
     WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
   @Test
   public void testAcidTablesMoveOptimizationBootStrap() throws Throwable {
     WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation,
+    replica.load(replicatedDbName, primaryDbName,
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
     verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, true);
   }
@@ -150,10 +153,10 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   @Test
   public void testAcidTablesMoveOptimizationIncremental() throws Throwable {
     WarehouseInstance.Tuple bootstrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation,
+    replica.load(replicatedDbName, primaryDbName,
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
     WarehouseInstance.Tuple incrDump = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, incrDump.dumpLocation,
+    replica.load(replicatedDbName, primaryDbName,
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'"));
     verifyLoadExecution(replicatedDbName, incrDump.lastReplicationId, true);
   }
@@ -196,7 +199,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     // Bootstrap load which should also replicate the aborted write ids on both tables.
     HiveConf replicaConf = replica.getConf();
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(new String[] {"t1", "t2"})
@@ -280,7 +283,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     }
 
     // Bootstrap dump has taken snapshot before concurrent tread performed write. So, it won't see data "2".
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(bootstrapDump.lastReplicationId)
@@ -289,7 +292,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     // Incremental should include the concurrent write of data "2" from another txn.
     WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(incrementalDump.lastReplicationId)
@@ -354,7 +357,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     }
 
     // Bootstrap dump has taken latest list of tables and hence won't see table t1 as it is dropped.
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(bootstrapDump.lastReplicationId)
@@ -368,7 +371,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
             .run("insert into t1 values(100)")
             .dump(primaryDbName);
 
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(incrementalDump.lastReplicationId)
@@ -380,7 +383,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   public void testOpenTxnEvent() throws Throwable {
     String tableName = testName.getMethodName();
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
@@ -401,12 +404,12 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     primary.testEventCounts(primaryDbName, lastReplId, null, null, 22);
 
     // Test load
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(incrementalDump.lastReplicationId);
 
     // Test the idempotent behavior of Open and Commit Txn
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(incrementalDump.lastReplicationId);
   }
@@ -415,7 +418,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   public void testAbortTxnEvent() throws Throwable {
     String tableNameFail = testName.getMethodName() + "Fail";
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
@@ -429,12 +432,12 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     WarehouseInstance.Tuple incrementalDump =
             primary.dump(primaryDbName);
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(incrementalDump.lastReplicationId);
 
     // Test the idempotent behavior of Abort Txn
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(incrementalDump.lastReplicationId);
   }
@@ -443,7 +446,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
   public void testTxnEventNonAcid() throws Throwable {
     String tableName = testName.getMethodName();
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replicaNonAcid.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replicaNonAcid.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
@@ -460,7 +463,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     WarehouseInstance.Tuple incrementalDump =
             primary.dump(primaryDbName);
-    replicaNonAcid.runFailure("REPL LOAD " + replicatedDbName + " FROM '" + incrementalDump.dumpLocation + "'")
+    replicaNonAcid.runFailure("REPL LOAD " + primaryDbName + " INTO " + replicatedDbName + "'")
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
   }
@@ -499,7 +502,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
 
     List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
-    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+    replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
     callerVerifier.assertInjectionsPerformed(true, false);
     InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
 
@@ -527,7 +530,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
 
     // Retry with same dump with which it was already loaded should resume the bootstrap load.
     // This time, it completes by adding just constraints for table t4.
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     callerVerifier.assertInjectionsPerformed(true, false);
     InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
 
@@ -604,9 +607,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     String txnStrStart = "START TRANSACTION";
     String txnStrCommit = "COMMIT";
 
-    WarehouseInstance.Tuple incrementalDump;
     primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')");
-    WarehouseInstance.Tuple bootStrapDump = primary.dump("`*`");
 
     primary.run("use " + primaryDbName)
           .run("create database " + dbName1 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
@@ -638,7 +639,7 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
           .verifyResults(resultArray)
           .run(txnStrCommit);
 
-    incrementalDump = primary.dump("`*`");
+    primary.dump("`*`");
 
     // Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
     // we are not able to create multiple embedded derby instances for two different MetaStore instances.
@@ -646,20 +647,10 @@ public class TestReplicationScenariosAcidTables extends BaseReplicationScenarios
     primary.run("drop database " + dbName1 + " cascade");
     primary.run("drop database " + dbName2 + " cascade");
     //End of additional steps
-
-    replica.loadWithoutExplain("", bootStrapDump.dumpLocation)
-            .run("REPL STATUS default")
-            .verifyResult(bootStrapDump.lastReplicationId);
-
-    replica.loadWithoutExplain("", incrementalDump.dumpLocation)
-          .run("REPL STATUS " + dbName1)
-          .run("select key from " + dbName1 + "." + tableName + " order by key")
-          .verifyResults(resultArray)
-          .run("select key from " + dbName2 + "." + tableName + " order by key")
-          .verifyResults(resultArray);
-
-    replica.run("drop database " + primaryDbName + " cascade");
-    replica.run("drop database " + dbName1 + " cascade");
-    replica.run("drop database " + dbName2 + " cascade");
+    try {
+      replica.loadWithoutExplain("", "`*`");
+    } catch (SemanticException e) {
+      assertEquals("REPL LOAD * is not supported", e.getMessage());
+    }
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
index f5dd043..36841ba 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTablesBootstrap.java
@@ -68,7 +68,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
             dumpWithoutAcidClause);
     LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
 
     // Take a incremental dump with acid table bootstrap
@@ -77,7 +77,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     LOG.info(testName.getMethodName() + ": incremental dump and load dump with acid table bootstrap.");
     WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
             .dump(primaryDbName, dumpWithAcidBootstrapClause);
-    replica.load(replicatedDbName, incrementalDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyIncLoad(replicatedDbName, incrementalDump.lastReplicationId);
     // Ckpt should be set on bootstrapped tables.
     replica.verifyIfCkptSetForTables(replicatedDbName, acidTableNames, incrementalDump.dumpLocation);
@@ -90,7 +90,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
             "bootstrap.");
     WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
@@ -99,7 +99,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
             dumpWithoutAcidClause);
     LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, bootstrapDump.lastReplicationId, false);
 
     prepareIncAcidData(primaryDbName);
@@ -129,7 +129,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     try {
       LOG.info(testName.getMethodName()
               + ": loading first incremental dump with acid table bootstrap (will fail)");
-      replica.loadFailure(replicatedDbName, incDump.dumpLocation);
+      replica.loadFailure(replicatedDbName, primaryDbName);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetAlterTableModifier();
@@ -149,7 +149,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     LOG.info(testName.getMethodName()
             + ": trying to load second incremental dump with wrong bootstrap dump "
             + " specified for cleaning ACID tables. Should fail.");
-    replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+    replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause);
 
     // Set previously failed bootstrap dump to clean-up. Now, new bootstrap should overwrite the old one.
     loadWithClause = Collections.singletonList(
@@ -159,7 +159,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     LOG.info(testName.getMethodName()
             + ": trying to load second incremental dump with correct bootstrap dump "
             + "specified for cleaning ACID tables. Should succeed.");
-    replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+    replica.load(replicatedDbName, primaryDbName, loadWithClause);
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
 
     // Once the REPL LOAD is successful, the this config should be unset or else, the subsequent REPL LOAD
@@ -170,7 +170,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     LOG.info(testName.getMethodName()
             + ": trying to load second incremental dump (with acid bootstrap) again."
             + " Should succeed.");
-    replica.load(replicatedDbName, inc2Dump.dumpLocation, loadWithClause);
+    replica.load(replicatedDbName, primaryDbName, loadWithClause);
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 
@@ -178,7 +178,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
   public void retryIncBootstrapAcidFromDifferentDumpWithoutCleanTablesConfig() throws Throwable {
     WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
             dumpWithoutAcidClause);
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     prepareIncAcidData(primaryDbName);
     prepareIncNonAcidData(primaryDbName);
@@ -186,10 +186,10 @@ public class TestReplicationScenariosAcidTablesBootstrap
             .dump(primaryDbName, dumpWithAcidBootstrapClause);
     WarehouseInstance.Tuple inc2Dump = primary.run("use " + primaryDbName)
             .dump(primaryDbName, dumpWithAcidBootstrapClause);
-    replica.load(replicatedDbName, incDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     // Re-bootstrapping from different bootstrap dump without clean tables config should fail.
-    replica.loadFailure(replicatedDbName, inc2Dump.dumpLocation, Collections.emptyList(),
+    replica.loadFailure(replicatedDbName, primaryDbName, Collections.emptyList(),
             ErrorMsg.REPL_BOOTSTRAP_LOAD_PATH_NOT_VALID.getErrorCode());
   }
 
@@ -199,7 +199,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
                                                                 dumpWithoutAcidClause);
     LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     // Open concurrent transactions, create data for incremental and take an incremental dump
     // with ACID table bootstrap.
@@ -232,7 +232,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     // tables t1 and t2
     HiveConf replicaConf = replica.getConf();
     LOG.info(testName.getMethodName() + ": loading incremental dump with ACID bootstrap.");
-    replica.load(replicatedDbName, incDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
     // Verify if HWM is properly set after REPL LOAD
     verifyNextId(tables, replicatedDbName, replicaConf);
@@ -257,7 +257,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     WarehouseInstance.Tuple bootstrapDump = prepareDataAndDump(primaryDbName,
                                                                 dumpWithoutAcidClause);
     LOG.info(testName.getMethodName() + ": loading dump without acid tables.");
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     // Create incremental data for incremental load with bootstrap of ACID
     prepareIncNonAcidData(primaryDbName);
@@ -315,7 +315,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     // write. So concurrent writes won't be dumped.
     LOG.info(testName.getMethodName() +
             ": loading incremental dump containing bootstrapped ACID tables.");
-    replica.load(replicatedDbName, incDump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyIncLoad(replicatedDbName, incDump.lastReplicationId);
 
     // Next Incremental should include the concurrent writes
@@ -324,7 +324,7 @@ public class TestReplicationScenariosAcidTablesBootstrap
     WarehouseInstance.Tuple inc2Dump = primary.dump(primaryDbName);
     LOG.info(testName.getMethodName() +
             ": loading second normal incremental dump from event id = " + incDump.lastReplicationId);
-    replica.load(replicatedDbName, inc2Dump.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyInc2Load(replicatedDbName, inc2Dump.lastReplicationId);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index eb8a8995..ff1de9e 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -77,7 +77,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
   @Test
   public void testCreateFunctionIncrementalReplication() throws Throwable {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("REPL STATUS " + replicatedDbName)
         .verifyResult(bootStrapDump.lastReplicationId);
 
@@ -89,7 +89,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     WarehouseInstance.Tuple incrementalDump =
         primary.dump(primaryDbName);
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("REPL STATUS " + replicatedDbName)
         .verifyResult(incrementalDump.lastReplicationId)
         .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
@@ -97,7 +97,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
                                       replicatedDbName + ".testFunctionTwo" });
 
     // Test the idempotent behavior of CREATE FUNCTION
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("REPL STATUS " + replicatedDbName)
         .verifyResult(incrementalDump.lastReplicationId)
         .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
@@ -141,7 +141,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
             "'hive.in.repl.test.files.sorted'='true'");
     try {
-      replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+      replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -175,7 +175,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     try {
       // Retry with same dump with which it was already loaded should resume the bootstrap load.
       // This time, it completes by adding just the function f2
-      replica.load(replicatedDbName, tuple.dumpLocation);
+      replica.load(replicatedDbName, primaryDbName);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -196,7 +196,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         + ".testFunctionAnother as 'hivemall.tools.string.StopwordUDF' "
         + "using jar  'ivy://io.github.myui:hivemall:0.4.0-2'");
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("REPL STATUS " + replicatedDbName)
         .verifyResult(bootStrapDump.lastReplicationId);
 
@@ -204,14 +204,14 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     WarehouseInstance.Tuple incrementalDump =
         primary.dump(primaryDbName);
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("REPL STATUS " + replicatedDbName)
         .verifyResult(incrementalDump.lastReplicationId)
         .run("SHOW FUNCTIONS LIKE '%testfunctionanother%'")
         .verifyResult(null);
 
     // Test the idempotent behavior of DROP FUNCTION
-    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("REPL STATUS " + replicatedDbName)
         .verifyResult(incrementalDump.lastReplicationId)
         .run("SHOW FUNCTIONS LIKE '%testfunctionanother%'")
@@ -225,7 +225,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         + "using jar  'ivy://io.github.myui:hivemall:0.4.0-2'");
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
 
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
         .verifyResult(replicatedDbName + ".testFunction");
   }
@@ -241,7 +241,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
         .verifyResult(replicatedDbName + ".anotherFunction");
 
@@ -262,7 +262,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
   @Test
   public void testIncrementalCreateFunctionWithFunctionBinaryJarsOnHDFS() throws Throwable {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
@@ -275,7 +275,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("SHOW FUNCTIONS LIKE '" + replicatedDbName + "%'")
             .verifyResult(replicatedDbName + ".anotherFunction");
 
@@ -354,7 +354,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     List<String> withClause = Collections.singletonList(
         "'" + HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS.varname + "'='1'");
 
-    replica.load(replicatedDbName, tuple.dumpLocation, withClause)
+    replica.load(replicatedDbName, primaryDbName, withClause)
         .run("use " + replicatedDbName)
         .run("show tables")
         .verifyResults(new String[] { "t1", "t2", "t3" })
@@ -382,7 +382,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .dump(primaryDbName);
 
     replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("use " + replicatedDbName)
         .run("repl status " + replicatedDbName)
         .verifyResult(tuple.lastReplicationId)
@@ -403,7 +403,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .run("insert into table1 values (1,2)")
         .dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("use " + replicatedDbName)
         .run("show tables")
         .verifyResults(new String[] { "acid_table", "table1" })
@@ -422,7 +422,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .run("insert into table1 values (1,2)")
         .dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
-    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("use " + replicatedDbName)
         .run("show tables")
         .verifyResults(new String[] { "table1", "table2", "table3" })
@@ -440,7 +440,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
                 "repl dump " + primaryDbName + " with ('hive.repl.dump.metadata.only'='true')"
             );
 
-    replica.load(replicatedDbName, incrementalOneTuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("use " + replicatedDbName)
         .run("show tables")
         .verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" })
@@ -458,7 +458,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .dumpWithCommand("repl dump " + primaryDbName + " with ('hive.repl.dump.metadata.only'='true')"
         );
 
-    replica.load(replicatedDbName, secondIncremental.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("use " + replicatedDbName)
         .run("show tables")
         .verifyResults(new String[] { "table2", "table3", "table4" })
@@ -496,7 +496,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .run("insert into table1 values (1,2)")
         .dump(dbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, dbName)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(new String[]{"table1", "table2", "table3"})
@@ -510,7 +510,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .run("create table table4 (i int, j int)")
         .dump(dbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, dbName)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(new String[] { "renamed_table1", "table2", "table3", "table4" })
@@ -563,126 +563,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     // Reset ckpt and last repl ID keys to empty set for allowing bootstrap load
     replica.run("show databases")
         .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo })
-        .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')")
-        .load("", tuple.dumpLocation)
-        .run("show databases")
-        .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo })
-        .run("use " + primaryDbName)
-        .run("show tables")
-        .verifyResults(new String[] { "t1" })
-        .run("use " + dbOne)
-        .run("show tables")
-        .verifyResults(new String[] { "t1" })
-        .run("use " + dbTwo)
-        .run("show tables")
-        .verifyResults(new String[] { "t1" })
-        .verifyReplTargetProperty(primaryDbName)
-        .verifyReplTargetProperty(dbOne)
-        .verifyReplTargetProperty(dbTwo);
-
-    /*
-       Start of cleanup
-    */
-
-    replica.run("drop database " + primaryDbName + " cascade");
-    replica.run("drop database " + dbOne + " cascade");
-    replica.run("drop database " + dbTwo + " cascade");
-
-    /*
-       End of cleanup
-    */
-  }
-
-  @Test
-  public void testIncrementalDumpOfWarehouse() throws Throwable {
-    String randomOne = RandomStringUtils.random(10, true, false);
-    String randomTwo = RandomStringUtils.random(10, true, false);
-    String dbOne = primaryDbName + randomOne;
-    primary.run("alter database default set dbproperties ('" + SOURCE_OF_REPLICATION + "' = '1, 2, 3')");
-    WarehouseInstance.Tuple bootstrapTuple = primary
-        .run("use " + primaryDbName)
-        .run("create table t1 (i int, j int)")
-        .run("create database " + dbOne + " WITH DBPROPERTIES ( '" +
-                SOURCE_OF_REPLICATION + "' = '1,2,3')")
-        .run("use " + dbOne)
-        .run("create table t1 (i int, j int) partitioned by (load_date date) "
-            + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
-        .dump("`*`", Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
-
-    String dbTwo = primaryDbName + randomTwo;
-    WarehouseInstance.Tuple incrementalTuple = primary
-        .run("create database " + dbTwo + " WITH DBPROPERTIES ( '" +
-                SOURCE_OF_REPLICATION + "' = '1,2,3')")
-        .run("use " + dbTwo)
-        .run("create table t1 (i int, j int)")
-        .run("use " + dbOne)
-        .run("create table t2 (a int, b int)")
-        .dump("`*`", Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
-
-    /*
-      Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
-      we are not able to create multiple embedded derby instances for two different MetaStore instances.
-    */
-
-    primary.run("drop database " + primaryDbName + " cascade");
-    primary.run("drop database " + dbOne + " cascade");
-    primary.run("drop database " + dbTwo + " cascade");
-
-    /*
-      End of additional steps
-    */
-
-    // Reset ckpt and last repl ID keys to empty set for allowing bootstrap load
-    replica.run("show databases")
-        .verifyFailure(new String[] { primaryDbName, dbOne, dbTwo })
-        .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')")
-        .load("", bootstrapTuple.dumpLocation)
-        .run("show databases")
-        .verifyResults(new String[] { "default", primaryDbName, dbOne })
-        .run("use " + primaryDbName)
-        .run("show tables")
-        .verifyResults(new String[] { "t1" })
-        .run("use " + dbOne)
-        .run("show tables")
-        .verifyResults(new String[] { "t1" })
-        .verifyReplTargetProperty(primaryDbName)
-        .verifyReplTargetProperty(dbOne)
-        .verifyReplTargetProperty(dbTwo);
-
-    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
-    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
-    assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters()));
-
-    replica.load("", incrementalTuple.dumpLocation)
-        .run("show databases")
-        .verifyResults(new String[] { "default", primaryDbName, dbOne, dbTwo })
-        .run("use " + dbTwo)
-        .run("show tables")
-        .verifyResults(new String[] { "t1" })
-        .run("use " + dbOne)
-        .run("show tables")
-        .verifyResults(new String[] { "t1", "t2" })
-        .verifyReplTargetProperty(primaryDbName)
-        .verifyReplTargetProperty(dbOne)
-        .verifyReplTargetProperty(dbTwo);
-
-    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase("default").getParameters()));
-    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(primaryDbName).getParameters()));
-    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbOne).getParameters()));
-    assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(dbTwo).getParameters()));
-
-    /*
-       Start of cleanup
-    */
-
-    replica.run("drop database " + primaryDbName + " cascade");
-    replica.run("drop database " + dbOne + " cascade");
-    replica.run("drop database " + dbTwo + " cascade");
-
-    /*
-       End of cleanup
-    */
-
+        .run("alter database default set dbproperties ('hive.repl.ckpt.key'='', 'repl.last.id'='')");
+    try {
+      replica.load("", "`*`");
+    } catch (SemanticException e) {
+      assertEquals("REPL LOAD * is not supported", e.getMessage());
+    }
   }
 
   @Test
@@ -702,7 +588,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName);
 
     // Run load on primary itself
-    primary.load(replicatedDbName, bootstrapTuple.dumpLocation, withConfigs)
+    primary.load(replicatedDbName, primaryDbName, withConfigs)
             .status(replicatedDbName, withConfigs)
             .verifyResult(bootstrapTuple.lastReplicationId);
 
@@ -727,7 +613,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
                     .dump(primaryDbName, Collections.emptyList());
 
     // Run load on primary itself
-    primary.load(replicatedDbName, incrementalOneTuple.dumpLocation, withConfigs)
+    primary.load(replicatedDbName, primaryDbName, withConfigs)
             .status(replicatedDbName, withConfigs)
             .verifyResult(incrementalOneTuple.lastReplicationId);
 
@@ -756,7 +642,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName, Collections.emptyList());
 
     // Run load on primary itself
-    primary.load(replicatedDbName, secondIncremental.dumpLocation, withConfigs)
+    primary.load(replicatedDbName, primaryDbName, withConfigs)
             .status(replicatedDbName, withConfigs)
             .verifyResult(secondIncremental.lastReplicationId);
 
@@ -790,7 +676,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
 
     // Bootstrap load in replica
-    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(bootstrapTuple.lastReplicationId);
 
@@ -815,7 +701,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName, Collections.emptyList());
 
     // First incremental load
-    replica.load(replicatedDbName, firstIncremental.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(firstIncremental.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -827,7 +713,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .verifyResults(new String[] {"1"});
 
     // Second incremental load
-    replica.load(replicatedDbName, secondIncremental.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(secondIncremental.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -845,7 +731,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
 
     // Bootstrap load in replica
-    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .status(replicatedDbName)
         .verifyResult(bootstrapTuple.lastReplicationId);
 
@@ -873,7 +759,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .dump(primaryDbName, Collections.emptyList());
 
     // First incremental load
-    replica.load(replicatedDbName, firstIncremental.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .status(replicatedDbName)
         .verifyResult(firstIncremental.lastReplicationId)
         .run("use " + replicatedDbName)
@@ -885,7 +771,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
         .verifyResults(new String[] { "3" });
 
     // Second incremental load
-    replica.load(replicatedDbName, secondIncremental.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .status(replicatedDbName)
         .verifyResult(secondIncremental.lastReplicationId)
         .run("use " + replicatedDbName)
@@ -938,7 +824,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName, Collections.singletonList("'hive.repl.dump.metadata.only'='true'"));
 
     // Bootstrap load in replica
-    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(bootstrapTuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -963,13 +849,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
   public void testIncrementalDumpEmptyDumpDirectory() throws Throwable {
     WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId);
 
     tuple = primary.dump(primaryDbName, Collections.emptyList());
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId);
 
@@ -980,26 +866,26 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName, Collections.emptyList());
 
     // Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId);
 
     // Bootstrap load from an empty dump directory should return empty load directory error.
     tuple = primary.dump("someJunkDB", Collections.emptyList());
     try {
-      replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation + "'");
+      replica.runCommand("REPL LOAD someJunkDB into someJunkDB");
       assert false;
     } catch (CommandProcessorException e) {
       assertTrue(e.getMessage().toLowerCase().contains("semanticException no data to load in path".toLowerCase()));
     }
 
-    // Incremental load to non existing db should return database not exist error.
+    // Bootstrap load from an empty dump directory should return empty load directory error. Since we have repl status
+    //check on target
     tuple = primary.dump("someJunkDB");
     try {
-      replica.runCommand("REPL LOAD someJunkDB from '" + tuple.dumpLocation+"'");
+      replica.runCommand("REPL LOAD someJunkDB into someJunkDB ");
     } catch (CommandProcessorException e) {
-      assertTrue(e.getMessage().toLowerCase().contains(
-              "org.apache.hadoop.hive.ql.ddl.DDLTask. Database does not exist: someJunkDB".toLowerCase()));
+      assertTrue(e.getMessage().toLowerCase().contains("semanticException no data to load in path".toLowerCase()));
     }
 
     primary.run(" drop database if exists " + testDbName + " cascade");
@@ -1009,7 +895,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
   public void testIncrementalDumpMultiIteration() throws Throwable {
     WarehouseInstance.Tuple bootstrapTuple = primary.dump(primaryDbName);
 
-    replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(bootstrapTuple.lastReplicationId);
 
@@ -1022,7 +908,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .run("insert into table3 partition(country='india') values(3)")
             .dump(primaryDbName, Collections.emptyList());
 
-    replica.load(replicatedDbName, incremental.dumpLocation,
+    replica.load(replicatedDbName, primaryDbName,
         Collections.singletonList("'hive.repl.approx.max.load.tasks'='10'"))
             .status(replicatedDbName)
             .verifyResult(incremental.lastReplicationId)
@@ -1047,7 +933,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     FileStatus[] fileStatus = fs.listStatus(path);
     int numEvents = fileStatus.length - 1; //one is metadata file
 
-    replica.load(replicatedDbName, incremental.dumpLocation,
+    replica.load(replicatedDbName, primaryDbName,
         Collections.singletonList("'hive.repl.approx.max.load.tasks'='1'"))
             .run("use " + replicatedDbName)
             .run("show tables")
@@ -1067,7 +953,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName);
 
     // Bootstrap Repl A -> B
-    replica.load(replicatedDbName, tuplePrimary.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuplePrimary.lastReplicationId)
             .run("show tblproperties t1('custom.property')")
@@ -1079,12 +965,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     // do a empty incremental load to allow dump of replicatedDbName
     WarehouseInstance.Tuple temp = primary.dump(primaryDbName, Collections.emptyList());
-    replica.load(replicatedDbName, temp.dumpLocation); // first successful incremental load.
+    replica.load(replicatedDbName, primaryDbName); // first successful incremental load.
 
     // Bootstrap Repl B -> C
     WarehouseInstance.Tuple tupleReplica = replica.dump(replicatedDbName);
     String replDbFromReplica = replicatedDbName + "_dupe";
-    replica.load(replDbFromReplica, tupleReplica.dumpLocation)
+    replica.load(replDbFromReplica, replicatedDbName)
             .run("use " + replDbFromReplica)
             .run("repl status " + replDbFromReplica)
             .verifyResult(tupleReplica.lastReplicationId)
@@ -1113,7 +999,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName, Collections.emptyList());
 
     // Incremental Repl A -> B with alters on db/table/partition
-    WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, tuplePrimaryInc.dumpLocation)
+    WarehouseInstance.Tuple tupleReplicaInc = replica.load(replicatedDbName, primaryDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuplePrimaryInc.lastReplicationId)
             .dump(replicatedDbName, Collections.emptyList());
@@ -1127,7 +1013,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     verifyIfCkptPropMissing(india.getParameters());
 
     // Incremental Repl B -> C with alters on db/table/partition
-    replica.load(replDbFromReplica, tupleReplicaInc.dumpLocation)
+    replica.load(replDbFromReplica, replicatedDbName)
             .run("use " + replDbFromReplica)
             .run("repl status " + replDbFromReplica)
             .verifyResult(tupleReplicaInc.lastReplicationId)
@@ -1158,7 +1044,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     // Bootstrap Repl A -> B and then export table t1
     String path = "hdfs:///tmp/" + replicatedDbName + "/";
     String exportPath = "'" + path + "1/'";
-    replica.load(replicatedDbName, tuplePrimary.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuplePrimary.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -1199,7 +1085,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .run("insert into table t2 partition(country='us') values ('sfo')")
             .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
@@ -1212,12 +1098,12 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     replica.verifyIfCkptSet(replicatedDbName, tuple.dumpLocation);
 
     // Retry with same dump with which it was already loaded also fails.
-    replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+    replica.loadFailure(replicatedDbName, primaryDbName);
 
     // Retry from same dump when the database is empty is also not allowed.
     replica.run("drop table t1")
             .run("drop table t2")
-            .loadFailure(replicatedDbName, tuple.dumpLocation);
+            .loadFailure(replicatedDbName, primaryDbName);
   }
 
   @Test
@@ -1257,7 +1143,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     // Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded.
     List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
     try {
-      replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+      replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1293,7 +1179,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     try {
       // Retry with same dump with which it was already loaded should resume the bootstrap load.
       // This time, it fails when try to load the foreign key constraints. All other constraints are loaded.
-      replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+      replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1331,7 +1217,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     try {
       // Retry with same dump with which it was already loaded should resume the bootstrap load.
       // This time, it completes by adding just foreign key constraints for table t2.
-      replica.load(replicatedDbName, tuple.dumpLocation);
+      replica.load(replicatedDbName, primaryDbName);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1382,7 +1268,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     // Make sure that there's some order in which the objects are loaded.
     List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'",
             "'hive.in.repl.test.files.sorted'='true'");
-    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+    replica.loadFailure(replicatedDbName, primaryDbName, withConfigs);
     InjectableBehaviourObjectStore.setAlterPartitionsBehaviour(null); // reset the behaviour
     alterPartitionStub.assertInjectionsPerformed(true, false);
 
@@ -1414,7 +1300,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     try {
       // Retry with same dump with which it was already loaded should resume the bootstrap load.
       // This time, it completes by adding remaining partitions and function.
-      replica.load(replicatedDbName, tuple.dumpLocation);
+      replica.load(replicatedDbName, primaryDbName);
       callerVerifier.assertInjectionsPerformed(false, false);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
@@ -1441,7 +1327,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .dump(primaryDbName);
 
     testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2",
-            "ADD_PARTITION", tuple);
+            "ADD_PARTITION");
   }
 
   @Test
@@ -1454,8 +1340,8 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .run("insert into table t2 partition(country='india') values ('bangalore')")
             .run("create table t1 (place string) partitioned by (country string)")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
-    replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+    replica.load(replicatedDbName, primaryDbName, withConfigs);
+    replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
     replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
 
@@ -1463,7 +1349,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
             .run("insert overwrite table t1 select * from t2")
             .dump(primaryDbName, Collections.emptyList());
 
-    testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION", tuple);
+    testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t1", "ADD_PARTITION");
   }
 
   @Test
@@ -1471,24 +1357,24 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     List<String> withConfigs =
         Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
     String replicatedDbName_CM = replicatedDbName + "_CM";
-    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .run("create table t2 (place string) partitioned by (country string)")
             .run("ALTER TABLE t2 ADD PARTITION (country='india')")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
-    replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+    replica.load(replicatedDbName, primaryDbName, withConfigs);
+    replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
     replica.run("alter database " + replicatedDbName + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("alter database " + replicatedDbName_CM + " set DBPROPERTIES ('" + SOURCE_OF_REPLICATION + "' = '1,2,3')");
 
-    tuple = primary.run("use " + primaryDbName)
+    WarehouseInstance.Tuple tuple = primary.run("use " + primaryDbName)
             .run("insert into table t2 partition(country='india') values ('bangalore')")
             .dump(primaryDbName, Collections.emptyList());
 
-    testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT", tuple);
+    testMoveOptimization(primaryDbName, replicatedDbName, replicatedDbName_CM, "t2", "INSERT");
   }
 
   private void testMoveOptimization(String primaryDb, String replicaDb, String replicatedDbName_CM,
-                                    String tbl,  String eventType, WarehouseInstance.Tuple tuple) throws Throwable {
+                                    String tbl,  String eventType) throws Throwable {
     List<String> withConfigs =
         Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
 
@@ -1510,13 +1396,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
     try {
-      replica.loadFailure(replicaDb, tuple.dumpLocation, withConfigs);
+      replica.loadFailure(replicaDb, primaryDbName, withConfigs);
     } finally {
       InjectableBehaviourObjectStore.resetAddNotificationModifier();
     }
 
     callerVerifier.assertInjectionsPerformed(true, false);
-    replica.load(replicaDb, tuple.dumpLocation, withConfigs);
+    replica.load(replicaDb, primaryDbName, withConfigs);
 
     replica.run("use " + replicaDb)
             .run("select country from " + tbl + " where country == 'india'")
@@ -1527,13 +1413,13 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+      replica.loadFailure(replicatedDbName_CM, primaryDbName, withConfigs);
     } finally {
       InjectableBehaviourObjectStore.resetAddNotificationModifier();
     }
 
     callerVerifier.assertInjectionsPerformed(true, false);
-    replica.load(replicatedDbName_CM, tuple.dumpLocation, withConfigs);
+    replica.load(replicatedDbName_CM, primaryDbName, withConfigs);
 
     replica.run("use " + replicatedDbName_CM)
             .run("select country from " + tbl + " where country == 'india'")
@@ -1573,7 +1459,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     // again from start.
     InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+      replica.loadFailure(replicatedDbName, primaryDbName);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetAlterTableModifier();
@@ -1583,7 +1469,7 @@ public class TestReplicationScenariosAcrossInstances extends BaseReplicationAcro
     // is loaded before t2. So that scope is set to table in first iteration for table t1. In the next iteration, it
     // loads only remaining partitions of t2, so that the table tracker has no tasks.
     List<String> withConfigs = Arrays.asList("'hive.in.repl.test.files.sorted'='true'");
-    replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+    replica.load(replicatedDbName, primaryDbName, withConfigs);
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
index 740f229..df304c2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTables.java
@@ -108,7 +108,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     assertFalse(primary.miniDFSCluster.getFileSystem()
         .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("repl status " + replicatedDbName)
         .verifyResult(tuple.lastReplicationId)
         .run("use " + replicatedDbName)
@@ -128,7 +128,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     assertFalse(primary.miniDFSCluster.getFileSystem()
         .exists(new Path(tuple.dumpLocation, FILE_NAME)));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't3'")
         .verifyFailure(new String[] { "t3" })
@@ -155,7 +155,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
     List<String> withClauseOptions = externalTableBasePathWithClause();
 
-    replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
         .run("use " + replicatedDbName)
         .run("show tables like 't1'")
         .verifyResult("t1")
@@ -184,7 +184,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     assertExternalFileInfo(Arrays.asList("t1", "t2", "t3", "t4"),
         new Path(tuple.dumpLocation, FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
         .run("use " + replicatedDbName)
         .run("show tables like 't3'")
         .verifyResult("t3")
@@ -250,13 +250,13 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             "'distcp.options.update'=''"
     );
 
-    WarehouseInstance.Tuple bootstrapTuple = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
         .run("create external table a (i int, j int) "
             + "row format delimited fields terminated by ',' "
             + "location '" + externalTableLocation.toUri() + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 'a'")
         .verifyResults(Collections.singletonList("a"))
@@ -271,10 +271,10 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
       outputStream.write("13,21\n".getBytes());
     }
 
-    WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
+    primary.run("create table b (i int)")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("select i From a")
         .verifyResults(new String[] { "1", "13" })
         .run("select j from a")
@@ -283,11 +283,11 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     // alter table location to something new.
     externalTableLocation =
         new Path("/" + testName.getMethodName() + "/" + primaryDbName + "/new_location/a/");
-    incrementalTuple = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
         .run("alter table a set location '" + externalTableLocation + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select i From a")
         .verifyResults(Collections.emptyList());
@@ -313,7 +313,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     assertExternalFileInfo(Collections.singletonList("t2"),
         new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't2'")
         .verifyResults(new String[] { "t2" })
@@ -337,7 +337,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     assertExternalFileInfo(Collections.singletonList("t2"),
         new Path(tuple.dumpLocation, FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select distinct(country) from t2")
         .verifyResults(new String[] { "india", "australia" })
@@ -357,12 +357,12 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
       outputStream.write("paris".getBytes());
     }
 
-    tuple = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
         .run("ALTER TABLE t2 ADD PARTITION (country='france') LOCATION '" + customPartitionLocation
             .toString() + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
         .verifyResults(new String[] { "paris" })
@@ -372,11 +372,11 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     String tmpLocation = "/tmp/" + System.nanoTime();
     primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation), new FsPermission("777"));
 
-    tuple = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
         .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
         .verifyResults(new String[] {})
@@ -391,19 +391,19 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     String tmpLocation2 = "/tmp/" + System.nanoTime() + "_2";
     primary.miniDFSCluster.getFileSystem().mkdirs(new Path(tmpLocation2), new FsPermission("777"));
 
-    tuple = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .run("insert into table t2 partition(country='france') values ('lyon')")
             .run("alter table t2 set location '" + tmpLocation2 + "'")
             .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause);
+    replica.load(replicatedDbName, primaryDbName, loadWithClause);
     assertTablePartitionLocation(primaryDbName + ".t2", replicatedDbName + ".t2");
   }
 
   @Test
   public void externalTableIncrementalReplication() throws Throwable {
     WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     Path externalTableLocation =
             new Path("/" + testName.getMethodName() + "/t1/");
@@ -433,7 +433,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     }
 
     List<String> loadWithClause = externalTableBasePathWithClause();
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't1'")
         .verifyResult("t1")
@@ -454,7 +454,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     tuple = primary.dump(primaryDbName);
     assertExternalFileInfo(Collections.singletonList("t1"), new Path(tuple.dumpLocation, FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables like 't1'")
             .verifyResult("t1")
@@ -475,7 +475,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
         .run("alter table t1 drop partition (country='us')")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("select * From t1")
         .verifyResults(new String[] {})
         .verifyReplTargetProperty(replicatedDbName);
@@ -507,7 +507,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     assertFalse(primary.miniDFSCluster.getFileSystem()
             .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -547,7 +547,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     tblPath = new Path(dbPath, "t3");
     assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -601,7 +601,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("create table t3 as select * from t1")
             .dump(primaryDbName, dumpWithClause);
 
-    replica.load(replicatedDbName, tupleBootstrapWithoutExternal.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(tupleBootstrapWithoutExternal.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -639,7 +639,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     // In the retry, these half baked tables should be dropped and bootstrap should be successful.
     InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName, tupleIncWithExternalBootstrap.dumpLocation, loadWithClause);
+      replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetAlterTableModifier();
@@ -658,7 +658,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     // So, REPL LOAD fails.
     loadWithClause.add("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
             + tupleBootstrapWithoutExternal.dumpLocation + "'");
-    replica.loadFailure(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause);
+    replica.loadFailure(replicatedDbName, primaryDbName, loadWithClause);
     loadWithClause.remove("'" + REPL_CLEAN_TABLES_FROM_BOOTSTRAP_CONFIG + "'='"
             + tupleBootstrapWithoutExternal.dumpLocation + "'");
 
@@ -668,7 +668,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
     // Verify if bootstrapping with same dump is idempotent and return same result
     for (int i = 0; i < 2; i++) {
-      replica.load(replicatedDbName, tupleNewIncWithExternalBootstrap.dumpLocation, loadWithClause)
+      replica.load(replicatedDbName, primaryDbName, loadWithClause)
               .run("use " + replicatedDbName)
               .run("show tables like 't1'")
               .verifyFailure(new String[]{"t1"})
@@ -719,7 +719,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("insert into table t1 values (1)")
             .dump(primaryDbName, dumpWithClause);
 
-    replica.load(replicatedDbName, tupleBootstrap.dumpLocation, loadWithClause);
+    replica.load(replicatedDbName, primaryDbName, loadWithClause);
 
     // Insert a row into "t1" and create another external table using data from "t1".
     primary.run("use " + primaryDbName)
@@ -758,7 +758,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
 
     // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have
     // inserted data.
-    replica.load(replicatedDbName, tupleInc.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .run("use " + replicatedDbName)
             .run("select id from t1 order by id")
             .verifyResult("1")
@@ -778,14 +778,14 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("insert into table t1 values (2)")
             .dump(primaryDbName, dumpWithClause);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId);
 
     // This looks like an empty dump but it has the ALTER TABLE event created by the previous
     // dump. We need it here so that the next dump won't have any events.
     WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, dumpWithClause);
-    replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(incTuple.lastReplicationId);
 
@@ -800,7 +800,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
                         Long.valueOf(inc2Tuple.lastReplicationId).longValue());
 
     // Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
-    replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(inc2Tuple.lastReplicationId);
   }
@@ -820,7 +820,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("insert into table t2 values (1)")
             .dump(primaryDbName, dumpWithClause);
 
-    replica.load(replicatedDbName, bootstrapDump.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(bootstrapDump.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -833,7 +833,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     // This looks like an empty dump but it has the ALTER TABLE event created by the previous
     // dump. We need it here so that the next dump won't have any events.
     WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, incTuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(incTuple.lastReplicationId);
 
@@ -843,7 +843,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
     WarehouseInstance.Tuple inc2Tuple = primary.run("use " + primaryDbName)
             .dump(primaryDbName, dumpWithClause);
 
-    replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(inc2Tuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -869,7 +869,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("insert into table t2_constraints partition(country='france') values ('paris')")
             .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -881,7 +881,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .verifyResults(new String[] {"1", "2"})
             .verifyReplTargetProperty(replicatedDbName);
 
-    tuple = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .run("create external table t3_bootstrap (id int)")
             .run("insert into table t3_bootstrap values (10)")
             .run("insert into table t3_bootstrap values (20)")
@@ -890,7 +890,7 @@ public class TestReplicationScenariosExternalTables extends BaseReplicationAcros
             .run("insert into table t4_tables values (20)")
             .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables like 't3_bootstrap'")
             .verifyResults(new String[] {"t3_bootstrap"})
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
index e9d0162..bf691f3 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosExternalTablesMetaDataOnly.java
@@ -107,7 +107,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     assertFalse(primary.miniDFSCluster.getFileSystem()
         .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("repl status " + replicatedDbName)
         .verifyResult(tuple.lastReplicationId)
         .run("use " + replicatedDbName)
@@ -127,7 +127,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     assertFalse(primary.miniDFSCluster.getFileSystem()
         .exists(new Path(tuple.dumpLocation, FILE_NAME)));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't3'")
         .verifyFailure(new String[] {"t3"})
@@ -153,7 +153,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
 
     List<String> withClauseOptions = externalTableBasePathWithClause();
 
-    replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
         .run("use " + replicatedDbName)
         .run("show tables like 't1'")
         .verifyResult("t1")
@@ -178,7 +178,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     // verify that the external table info is written correctly for incremental
     assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, withClauseOptions)
+    replica.load(replicatedDbName, primaryDbName, withClauseOptions)
         .run("use " + replicatedDbName)
         .run("show tables like 't3'")
         .verifyResult("t3")
@@ -217,7 +217,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
             + "location '" + externalTableLocation.toUri() + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, bootstrapTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 'a'")
         .verifyResults(Collections.singletonList("a"))
@@ -233,7 +233,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     WarehouseInstance.Tuple incrementalTuple = primary.run("create table b (i int)")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("select i From a")
         .verifyResults(new String[] {})
         .run("select j from a")
@@ -246,7 +246,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
         .run("alter table a set location '" + externalTableLocation + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, incrementalTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select i From a")
         .verifyResults(Collections.emptyList());
@@ -270,7 +270,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
 
     assertFalseExternalFileInfo(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't2'")
         .verifyResults(new String[] {"t2"})
@@ -291,7 +291,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
 
     assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select distinct(country) from t2")
         .verifyResults(new String[] {})
@@ -316,7 +316,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
             .toString() + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
         .verifyResults(new String[] {})
@@ -330,7 +330,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
         .run("alter table t2 partition (country='france') set location '" + tmpLocation + "'")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("select place from t2 where country='france'")
         .verifyResults(new String[] {})
@@ -346,13 +346,13 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
             .run("alter table t2 set location '" + tmpLocation2 + "'")
             .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause);
+    replica.load(replicatedDbName, primaryDbName, loadWithClause);
   }
 
   @Test
   public void externalTableIncrementalReplication() throws Throwable {
     WarehouseInstance.Tuple tuple = primary.dumpWithCommand("repl dump " + primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     Path externalTableLocation =
             new Path("/" + testName.getMethodName() + "/t1/");
@@ -382,7 +382,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     }
 
     List<String> loadWithClause = externalTableBasePathWithClause();
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
         .run("use " + replicatedDbName)
         .run("show tables like 't1'")
         .verifyResult("t1")
@@ -403,7 +403,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     tuple = primary.dump(primaryDbName);
     assertFalseExternalFileInfo(new Path(tuple.dumpLocation, FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables like 't1'")
             .verifyResult("t1")
@@ -424,7 +424,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
         .run("alter table t1 drop partition (country='us')")
         .dump(primaryDbName);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
         .run("select * From t1")
         .verifyResults(new String[] {})
         .verifyReplTargetProperty(replicatedDbName);
@@ -455,7 +455,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     assertFalse(primary.miniDFSCluster.getFileSystem()
             .exists(new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME)));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -496,7 +496,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
     tblPath = new Path(dbPath, "t3");
     assertTrue(primary.miniDFSCluster.getFileSystem().exists(tblPath));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
             .run("use " + replicatedDbName)
@@ -542,7 +542,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
             .run("insert into table t1 values (1)")
             .dump(primaryDbName, dumpWithClause);
 
-    replica.load(replicatedDbName, tupleBootstrap.dumpLocation, loadWithClause);
+    replica.load(replicatedDbName, primaryDbName, loadWithClause);
 
     // Insert a row into "t1" and create another external table using data from "t1".
     primary.run("use " + primaryDbName)
@@ -580,7 +580,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
 
     // The newly inserted data "2" should be missing in table "t1". But, table t2 should exist and have
     // inserted data.
-    replica.load(replicatedDbName, tupleInc.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .run("use " + replicatedDbName)
             .run("select id from t1 order by id")
             .verifyResult(null)
@@ -600,14 +600,14 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
             .run("insert into table t1 values (2)")
             .dump(primaryDbName, dumpWithClause);
 
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .status(replicatedDbName)
             .verifyResult(tuple.lastReplicationId);
 
     // This looks like an empty dump but it has the ALTER TABLE event created by the previous
     // dump. We need it here so that the next dump won't have any events.
     WarehouseInstance.Tuple incTuple = primary.dump(primaryDbName, dumpWithClause);
-    replica.load(replicatedDbName, incTuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(incTuple.lastReplicationId);
 
@@ -622,7 +622,7 @@ public class TestReplicationScenariosExternalTablesMetaDataOnly extends BaseRepl
                         Long.valueOf(inc2Tuple.lastReplicationId).longValue());
 
     // Incremental load to existing database with empty dump directory should set the repl id to the last event at src.
-    replica.load(replicatedDbName, inc2Tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, primaryDbName, loadWithClause)
             .status(replicatedDbName)
             .verifyResult(inc2Tuple.lastReplicationId);
   }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
index 15cb985..bcab190 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosIncrementalLoadAcidTables.java
@@ -85,6 +85,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
 
     acidConfs.putAll(overrides);
     primary = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
+    acidConfs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstance(LOG, miniDFSCluster, acidConfs);
     Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
@@ -93,6 +94,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
         put("hive.metastore.client.capability.check", "false");
         put("hive.stats.autogather", "false");
     }};
+    overridesForHiveConf1.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
   }
 
@@ -124,7 +126,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
   @Test
   public void testAcidTableIncrementalReplication() throws Throwable {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
     List<String> selectStmtList = new ArrayList<>();
@@ -209,7 +211,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
 
     WarehouseInstance.Tuple incrementalDump;
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
@@ -217,7 +219,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
             tableName, null, false, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
     incrementalDump = primary.dump(primaryDbName);
     primary.run("drop table " + primaryDbName + "." + tableName);
-    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
     verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableName,
                                               "select count(*) from " + tableName + "_nopart"),
@@ -227,7 +229,7 @@ public class TestReplicationScenariosIncrementalLoadAcidTables {
             tableNameMM, null, true, ReplicationTestUtils.OperationType.REPL_TEST_ACID_INSERT);
     incrementalDump = primary.dump(primaryDbName);
     primary.run("drop table " + primaryDbName + "." + tableNameMM);
-    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
     verifyResultsInReplicaInt(Lists.newArrayList("select count(*) from " + tableNameMM,
             "select count(*) from " + tableNameMM + "_nopart"),
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
index 7fa23b1..3eab045 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigration.java
@@ -100,7 +100,6 @@ public class TestReplicationWithTableMigration {
       put("hive.metastore.disallow.incompatible.col.type.changes", "false");
       put("hive.strict.managed.tables", "true");
     }};
-    replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
 
     HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
       put("fs.defaultFS", fs.getUri().toString());
@@ -118,6 +117,8 @@ public class TestReplicationWithTableMigration {
     }};
     configsForPrimary.putAll(overrideConfigs);
     primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
+    hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
   }
 
   private static Path createAvroSchemaFile(FileSystem fs, Path testPath) throws IOException {
@@ -305,7 +306,7 @@ public class TestReplicationWithTableMigration {
     return tablePath;
   }
 
-  private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable {
+  private void loadWithFailureInAddNotification(String tbl) throws Throwable {
     BehaviourInjection<CallerArguments, Boolean> callerVerifier
             = new BehaviourInjection<CallerArguments, Boolean>() {
       @Nullable
@@ -326,7 +327,7 @@ public class TestReplicationWithTableMigration {
     };
     InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName, dumpLocation);
+      replica.loadFailure(replicatedDbName, primaryDbName);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier();
     }
@@ -336,49 +337,49 @@ public class TestReplicationWithTableMigration {
   @Test
   public void testBootstrapLoadMigrationManagedToAcid() throws Throwable {
     WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
   }
 
   @Test
   public void testIncrementalLoadMigrationManagedToAcid() throws Throwable {
     WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
   }
 
   @Test
   public void testIncrementalLoadMigrationManagedToAcidFailure() throws Throwable {
     WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
-    loadWithFailureInAddNotification("tacid", tuple.dumpLocation);
+    loadWithFailureInAddNotification("tacid");
     replica.run("use " + replicatedDbName)
             .run("show tables like tacid")
             .verifyResult(null);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
   }
 
   @Test
   public void testIncrementalLoadMigrationManagedToAcidFailurePart() throws Throwable {
     WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
-    loadWithFailureInAddNotification("tacidpart", tuple.dumpLocation);
+    loadWithFailureInAddNotification("tacidpart");
     replica.run("use " + replicatedDbName)
             .run("show tables like tacidpart")
             .verifyResult(null);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
   }
 
   @Test
   public void testIncrementalLoadMigrationManagedToAcidAllOp() throws Throwable {
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
     List<String> selectStmtList = new ArrayList<>();
@@ -418,13 +419,13 @@ public class TestReplicationWithTableMigration {
             .run("insert into avro_tbl partition (country='india') values ('another', 13)")
             .dump(primaryDbName);
 
-    replica.load(replicatedDbName, bootstrap.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     Path dataLocation = assertTablePath(replicatedDbName, "avro_tbl");
 
     WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
             .run("drop table avro_tbl")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, incremental.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     // After drop, the external table data location should be auto deleted as it is converted one.
     assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
@@ -433,15 +434,15 @@ public class TestReplicationWithTableMigration {
   @Test
   public void testIncConvertedExternalTableAutoDeleteDataDirOnDrop() throws Throwable {
     WarehouseInstance.Tuple bootstrap = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, bootstrap.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
-    WarehouseInstance.Tuple incremental = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .run("create table avro_tbl ROW FORMAT SERDE "
                     + "'org.apache.hadoop.hive.serde2.avro.AvroSerDe' stored as avro "
                     + "tblproperties ('avro.schema.url'='" + avroSchemaFile.toUri().toString() + "')")
             .run("insert into avro_tbl values ('str', 13)")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, incremental.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     // Data location is valid and is under default external warehouse directory.
     Table avroTable = replica.getTable(replicatedDbName, "avro_tbl");
@@ -449,10 +450,10 @@ public class TestReplicationWithTableMigration {
     Path dataLocation = new Path(avroTable.getSd().getLocation());
     assertTrue(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
 
-    incremental = primary.run("use " + primaryDbName)
+    primary.run("use " + primaryDbName)
             .run("drop table avro_tbl")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, incremental.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     // After drop, the external table data location should be auto deleted as it is converted one.
     assertFalse(replica.miniDFSCluster.getFileSystem().exists(dataLocation));
@@ -463,7 +464,7 @@ public class TestReplicationWithTableMigration {
     List<String> withConfigs =
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
     WarehouseInstance.Tuple tuple = prepareDataAndDump(primaryDbName, null);
-    replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+    replica.load(replicatedDbName, primaryDbName, withConfigs);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
   }
 
@@ -472,9 +473,9 @@ public class TestReplicationWithTableMigration {
     List<String> withConfigs =
             Collections.singletonList("'hive.repl.enable.move.optimization'='true'");
     WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     tuple = prepareDataAndDump(primaryDbName, tuple.lastReplicationId);
-    replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+    replica.load(replicatedDbName, primaryDbName, withConfigs);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
   }
 
@@ -514,7 +515,7 @@ public class TestReplicationWithTableMigration {
             .run("create table texternal (id int) ")
             .run("insert into texternal values (1)")
             .dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation)
+    replica.load(replicatedDbName, primaryDbName)
             .run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
@@ -558,7 +559,7 @@ public class TestReplicationWithTableMigration {
     withConfigs.add("'hive.repl.include.external.tables'='true'");
     withConfigs.add("'hive.distcp.privileged.doAs' = '" + UserGroupInformation.getCurrentUser().getUserName() + "'");
     tuple = primary.dump(primaryDbName, withConfigs);
-    replica.load(replicatedDbName, tuple.dumpLocation, withConfigs);
+    replica.load(replicatedDbName, primaryDbName, withConfigs);
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
             .verifyResult(tuple.lastReplicationId)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
index 425c8cb..1b4833c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationWithTableMigrationEx.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
 import org.apache.hadoop.hive.metastore.api.CurrentNotificationEventId;
 import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hive.hcatalog.listener.DbNotificationListener;
@@ -80,7 +81,6 @@ public class TestReplicationWithTableMigrationEx {
       put("hive.strict.managed.tables", "true");
       put("hive.metastore.transactional.event.listeners", "");
     }};
-    replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
 
     HashMap<String, String> configsForPrimary = new HashMap<String, String>() {{
       put("fs.defaultFS", fs.getUri().toString());
@@ -96,6 +96,8 @@ public class TestReplicationWithTableMigrationEx {
     }};
     configsForPrimary.putAll(overrideConfigs);
     primary = new WarehouseInstance(LOG, miniDFSCluster, configsForPrimary);
+    hiveConfigs.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, hiveConfigs);
   }
 
   @AfterClass
@@ -186,13 +188,13 @@ public class TestReplicationWithTableMigrationEx {
 
     // dump with operation after last repl id is fetched.
     WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(2);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
     assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
 
     // next incremental dump
     tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
     assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
   }
@@ -203,13 +205,13 @@ public class TestReplicationWithTableMigrationEx {
 
     // dump with operation after last repl id is fetched.
     WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(4);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
     assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
 
     // next incremental dump
     tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
     assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
   }
@@ -221,13 +223,13 @@ public class TestReplicationWithTableMigrationEx {
             .run("create table t1 (i int, j int)")
             .dump(primaryDbName+".'t1'");
     replica.run("create database " + replicatedDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
 
     tuple = primary.run("use " + primaryDbName)
             .run("insert into t1 values (1, 2)")
             .dump(primaryDbName+".'t1'");
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
   }
 
@@ -243,7 +245,7 @@ public class TestReplicationWithTableMigrationEx {
 
     // dump with operation after last repl id is fetched.
     WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(2);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     replica.run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(new String[] {"tacid"})
@@ -257,7 +259,7 @@ public class TestReplicationWithTableMigrationEx {
 
     // next incremental dump
     tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     replica.run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(new String[] {"tacid"})
@@ -270,7 +272,7 @@ public class TestReplicationWithTableMigrationEx {
     assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
   }
 
-  private void loadWithFailureInAddNotification(String tbl, String dumpLocation) throws Throwable {
+  private void loadWithFailureInAddNotification(String tbl) throws Throwable {
     BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean> callerVerifier
             = new BehaviourInjection<InjectableBehaviourObjectStore.CallerArguments, Boolean>() {
       @Nullable
@@ -294,7 +296,7 @@ public class TestReplicationWithTableMigrationEx {
     try {
       List<String> withClause = Collections.singletonList("'hive.metastore.transactional.event.listeners'='"
               + DbNotificationListener.class.getCanonicalName() + "'");
-      replica.loadFailure(replicatedDbName, dumpLocation, withClause);
+      replica.loadFailure(replicatedDbName, primaryDbName, withClause);
     } finally {
       InjectableBehaviourObjectStore.resetCallerVerifier();
     }
@@ -307,7 +309,7 @@ public class TestReplicationWithTableMigrationEx {
 
     // dump with operation after last repl id is fetched.
     WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(4);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
     assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
     assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
@@ -317,15 +319,15 @@ public class TestReplicationWithTableMigrationEx {
            .run("create table tbl_temp (fld int)")
             .dump(primaryDbName);
 
-    loadWithFailureInAddNotification("tbl_temp", tuple.dumpLocation);
+    loadWithFailureInAddNotification("tbl_temp");
     Database replDb = replica.getDatabase(replicatedDbName);
     assertTrue(ReplUtils.isFirstIncPending(replDb.getParameters()));
     assertFalse(ReplUtils.isFirstIncPending(primary.getDatabase(primaryDbName).getParameters()));
     assertTrue(replDb.getParameters().get("dummy_key").equalsIgnoreCase("dummy_val"));
 
     // next incremental dump
-    tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
   }
 
@@ -337,13 +339,13 @@ public class TestReplicationWithTableMigrationEx {
 
     // dump with operation after last repl id is fetched.
     WarehouseInstance.Tuple tuple =  dumpWithLastEventIdHacked(4);
-    replica.load(replicatedDbName, tuple.dumpLocation, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
     verifyLoadExecution(replicatedDbName, tuple.lastReplicationId);
     assertTrue(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
 
     // next incremental dump
     tuple = primary.dump(primaryDbName);
-    replica.load(replicatedDbName, tuple.dumpLocation, withClause);
+    replica.load(replicatedDbName, primaryDbName, withClause);
     assertFalse(ReplUtils.isFirstIncPending(replica.getDatabase(replicatedDbName).getParameters()));
   }
 
@@ -382,21 +384,21 @@ public class TestReplicationWithTableMigrationEx {
 
     // test bootstrap
     alterUserName("hive");
-    WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyUserName("hive");
 
     // test incremental
     alterUserName("hive1");
-    tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyUserName("hive1");
   }
 
   @Test
   public void testOnwerPropagationInc() throws Throwable {
-    WarehouseInstance.Tuple tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
 
     primary.run("use " + primaryDbName)
             .run("create table tbl_own (fld int)")
@@ -409,8 +411,8 @@ public class TestReplicationWithTableMigrationEx {
 
     // test incremental when table is getting created in the same load
     alterUserName("hive");
-    tuple = primary.dump(primaryDbName);
-    replica.loadWithoutExplain(replicatedDbName, tuple.dumpLocation);
+    primary.dump(primaryDbName);
+    replica.loadWithoutExplain(replicatedDbName, primaryDbName);
     verifyUserName("hive");
   }
 
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
index c51bec1..afb53b8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestScheduledReplicationScenarios.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
@@ -76,16 +75,9 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
       }};
 
     acidEnableConf.putAll(overrides);
-
     primary = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
+    acidEnableConf.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
     replica = new WarehouseInstance(LOG, miniDFSCluster, acidEnableConf);
-    Map<String, String> overridesForHiveConf1 = new HashMap<String, String>() {{
-          put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
-          put("hive.support.concurrency", "false");
-          put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
-          put("hive.metastore.client.capability.check", "false");
-      }};
-    replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
   }
 
   @Before
@@ -97,16 +89,15 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
   public void tearDown() throws Throwable {
     primary.run("drop database if exists " + primaryDbName + " cascade");
     replica.run("drop database if exists " + replicatedDbName + " cascade");
-    replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade");
     primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
   @Test
-  public void testAcidTablesBootstrapIncr() throws Throwable {
+  public void testAcidTablesReplLoadBootstrapIncr() throws Throwable {
     // Bootstrap
     primary.run("use " + primaryDbName)
             .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
-                      "tblproperties (\"transactional\"=\"true\")")
+                    "tblproperties (\"transactional\"=\"true\")")
             .run("insert into t1 values(1)")
             .run("insert into t1 values(2)");
     try (ScheduledQueryExecutionService schqS =
@@ -116,36 +107,53 @@ public class TestScheduledReplicationScenarios extends BaseReplicationScenariosA
       primary.run("create scheduled query s1 every 10 minutes as repl dump " + primaryDbName);
       primary.run("alter scheduled query s1 execute");
       Thread.sleep(6000);
-      Path dumpRoot = new Path(primary.hiveConf.getVar(HiveConf.ConfVars.REPLDIR), primaryDbName.toLowerCase());
-      Path currdumpRoot = new Path(dumpRoot, String.valueOf(next));
-      replica.load(replicatedDbName, currdumpRoot.toString());
+      replica.run("create scheduled query s2 every 10 minutes as repl load " + primaryDbName + " INTO "
+              + replicatedDbName);
+      replica.run("alter scheduled query s2 execute");
+      Thread.sleep(20000);
       replica.run("use " + replicatedDbName)
               .run("show tables like 't1'")
               .verifyResult("t1")
               .run("select id from t1 order by id")
               .verifyResults(new String[]{"1", "2"});
 
-    // First incremental, after bootstrap
-
+      // First incremental, after bootstrap
       primary.run("use " + primaryDbName)
-            .run("insert into t1 values(3)")
-            .run("insert into t1 values(4)");
+              .run("insert into t1 values(3)")
+              .run("insert into t1 values(4)");
       next++;
       ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
       primary.run("alter scheduled query s1 execute");
       Thread.sleep(20000);
-      Path incrdumpRoot = new Path(dumpRoot, String.valueOf(next));
-      replica.load(replicatedDbName, incrdumpRoot.toString());
+      replica.run("alter scheduled query s2 execute");
+      Thread.sleep(20000);
+      replica.run("use " + replicatedDbName)
+              .run("show tables like 't1'")
+              .verifyResult("t1")
+              .run("select id from t1 order by id")
+              .verifyResults(new String[]{"1", "2", "3", "4"});
+
+      // Second incremental
+      primary.run("use " + primaryDbName)
+              .run("insert into t1 values(5)")
+              .run("insert into t1 values(6)");
+      next++;
+      ReplDumpWork.injectNextDumpDirForTest(String.valueOf(next));
+      primary.run("alter scheduled query s1 execute");
+      Thread.sleep(30000);
+      replica.run("alter scheduled query s2 execute");
+      Thread.sleep(30000);
       replica.run("use " + replicatedDbName)
               .run("show tables like 't1'")
               .verifyResult("t1")
               .run("select id from t1 order by id")
-              .verifyResults(new String[]{"1", "2", "3", "4"})
+              .verifyResults(new String[]{"1", "2", "3", "4", "5", "6"})
               .run("drop table t1");
 
 
     } finally {
       primary.run("drop scheduled query s1");
+      replica.run("drop scheduled query s2");
     }
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
index 44a3805..b2733d1 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
@@ -97,20 +97,21 @@ public class TestStatsReplicationScenarios {
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
       }};
-    Map<String, String> overrides = new HashMap<>();
+    Map<String, String> replicatedOverrides = new HashMap<>();
 
-    overrides.putAll(additionalOverrides);
-    overrides.putAll(replicaOverrides);
-    replica = new WarehouseInstance(LOG, miniDFSCluster, overrides);
+    replicatedOverrides.putAll(additionalOverrides);
+    replicatedOverrides.putAll(replicaOverrides);
 
     // Run with autogather false on primary if requested
+    Map<String, String> sourceOverrides = new HashMap<>();
     hasAutogather = autogather;
     additionalOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
             autogather ? "true" : "false");
-    overrides.clear();
-    overrides.putAll(additionalOverrides);
-    overrides.putAll(primaryOverrides);
-    primary = new WarehouseInstance(LOG, miniDFSCluster, overrides);
+    sourceOverrides.putAll(additionalOverrides);
+    sourceOverrides.putAll(primaryOverrides);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, sourceOverrides);
+    replicatedOverrides.put(MetastoreConf.ConfVars.REPLDIR.getHiveName(), primary.repldDir);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, replicatedOverrides);
 
     // Use transactional tables
     acidTableKindToUse = acidTableKind;
@@ -330,14 +331,14 @@ public class TestStatsReplicationScenarios {
     // checkpoint for a table in the middle of list of tables.
     if (failRetry) {
       if (lastReplicationId == null) {
-        failBootstrapLoad(dumpTuple, tableNames.size()/2);
+        failBootstrapLoad(tableNames.size()/2);
       } else {
-        failIncrementalLoad(dumpTuple);
+        failIncrementalLoad();
       }
     }
 
     // Load, possibly a retry
-    replica.load(replicatedDbName, dumpTuple.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     // Metadata load may not load all the events.
     if (!metadataOnly) {
@@ -363,9 +364,8 @@ public class TestStatsReplicationScenarios {
 
   /**
    * Run a bootstrap that will fail.
-   * @param tuple the location of bootstrap dump
    */
-  private void failBootstrapLoad(WarehouseInstance.Tuple tuple, int failAfterNumTables) throws Throwable {
+  private void failBootstrapLoad(int failAfterNumTables) throws Throwable {
     // fail setting ckpt directory property for the second table so that we test the case when
     // bootstrap load fails after some but not all tables are loaded.
     BehaviourInjection<CallerArguments, Boolean> callerVerifier
@@ -391,14 +391,14 @@ public class TestStatsReplicationScenarios {
 
     InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+      replica.loadFailure(replicatedDbName, primaryDbName);
       callerVerifier.assertInjectionsPerformed(true, false);
     } finally {
       InjectableBehaviourObjectStore.resetAlterTableModifier();
     }
   }
 
-  private void failIncrementalLoad(WarehouseInstance.Tuple dumpTuple) throws Throwable {
+  private void failIncrementalLoad() throws Throwable {
     // fail add notification when second update table stats event is encountered. Thus we
     // test successful application as well as failed application of this event.
     BehaviourInjection<NotificationEvent, Boolean> callerVerifier
@@ -421,7 +421,7 @@ public class TestStatsReplicationScenarios {
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+      replica.loadFailure(replicatedDbName, primaryDbName);
     } finally {
       InjectableBehaviourObjectStore.resetAddNotificationModifier();
     }
@@ -449,7 +449,7 @@ public class TestStatsReplicationScenarios {
 
     InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
     try {
-      replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+      replica.loadFailure(replicatedDbName, primaryDbName);
     } finally {
       InjectableBehaviourObjectStore.resetAddNotificationModifier();
     }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
index 15b6c3d..0c44100 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestTableLevelReplicationScenarios.java
@@ -163,7 +163,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     // If the policy contains '.'' means its table level replication.
     verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, replPolicy, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(expectedTables)
@@ -207,7 +207,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     // If the policy contains '.'' means its table level replication.
     verifyTableListForPolicy(tuple.dumpLocation, replPolicy.contains(".'") ? expectedTables : null);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, replPolicy, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(expectedTables)
@@ -310,7 +310,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
   public void testBasicIncrementalWithIncludeList() throws Throwable {
     WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     String[] originalNonAcidTables = new String[] {"t1", "t2"};
     String[] originalFullAcidTables = new String[] {"t3", "t4"};
@@ -329,7 +329,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
   public void testBasicIncrementalWithIncludeAndExcludeList() throws Throwable {
     WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
 
     String[] originalTables = new String[] {"t1", "t11", "t2", "t3", "t111"};
     createTables(originalTables, CreateTableType.NON_ACID);
@@ -373,7 +373,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     String replPolicy = primaryDbName;
     WarehouseInstance.Tuple tupleBootstrap = primary.run("use " + primaryDbName)
             .dump(primaryDbName);
-    replica.load(replicatedDbName, tupleBootstrap.dumpLocation);
+    replica.load(replicatedDbName, primaryDbName);
     String lastReplId = tupleBootstrap.lastReplicationId;
     for (String oldReplPolicy : invalidReplPolicies) {
       failed = false;
@@ -504,7 +504,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"),
             new Path(new Path(tuple.dumpLocation, primaryDbName.toLowerCase()), FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, replPolicy, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(replicatedTables)
@@ -543,7 +543,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     ReplicationTestUtils.assertExternalFileInfo(primary, Arrays.asList("a2"),
             new Path(tuple.dumpLocation, FILE_NAME));
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, replPolicy, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(incrementalReplicatedTables)
@@ -695,7 +695,7 @@ public class TestTableLevelReplicationScenarios extends BaseReplicationScenarios
     // Verify if the expected tables are bootstrapped.
     verifyBootstrapDirInIncrementalDump(tuple.dumpLocation, bootstrappedTables);
 
-    replica.load(replicatedDbName, tuple.dumpLocation, loadWithClause)
+    replica.load(replicatedDbName, replPolicy, loadWithClause)
             .run("use " + replicatedDbName)
             .run("show tables")
             .verifyResults(incrementalReplicatedTables)
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index f1eba52..498d59c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -304,21 +304,29 @@ public class WarehouseInstance implements Closeable {
     return this;
   }
 
-  WarehouseInstance load(String replicatedDbName, String dumpLocation) throws Throwable {
-    run("EXPLAIN REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+  WarehouseInstance load(String replicatedDbName, String primaryDbName) throws Throwable {
+    StringBuilder replCommand = new StringBuilder("REPL LOAD " + primaryDbName);
+    if (!StringUtils.isEmpty(replicatedDbName)) {
+      replCommand.append(" INTO " + replicatedDbName);
+    }
+    run("EXPLAIN " + replCommand.toString());
     printOutput();
-    run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    run(replCommand.toString());
     return this;
   }
 
-  WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocation) throws Throwable {
-    run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "' with ('hive.exec.parallel'='true')");
+  WarehouseInstance loadWithoutExplain(String replicatedDbName, String primaryDbName) throws Throwable {
+    StringBuilder replCommand = new StringBuilder("REPL LOAD " + primaryDbName);
+    if (!StringUtils.isEmpty(replicatedDbName)) {
+      replCommand.append(" INTO " + replicatedDbName);
+    }
+    run(replCommand.toString() + " with ('hive.exec.parallel'='true')");
     return this;
   }
 
-  WarehouseInstance load(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
+  WarehouseInstance load(String replicatedDbName, String primaryDbName, List<String> withClauseOptions)
           throws Throwable {
-    String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
+    String replLoadCmd = "REPL LOAD " + primaryDbName + " INTO " + replicatedDbName;
     if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
       replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
     }
@@ -338,23 +346,23 @@ public class WarehouseInstance implements Closeable {
     return run(replStatusCmd);
   }
 
-  WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation) throws Throwable {
-    loadFailure(replicatedDbName, dumpLocation, null);
+  WarehouseInstance loadFailure(String replicatedDbName, String primaryDbName) throws Throwable {
+    loadFailure(replicatedDbName, primaryDbName, null);
     return this;
   }
 
-  WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
+  WarehouseInstance loadFailure(String replicatedDbName, String primaryDbName, List<String> withClauseOptions)
           throws Throwable {
-    String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
+    String replLoadCmd = "REPL LOAD " + primaryDbName + " INTO " + replicatedDbName;
     if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
       replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
     }
     return runFailure(replLoadCmd);
   }
 
-  WarehouseInstance loadFailure(String replicatedDbName, String dumpLocation, List<String> withClauseOptions,
+  WarehouseInstance loadFailure(String replicatedDbName, String primaryDbName, List<String> withClauseOptions,
                                 int errorCode) throws Throwable {
-    String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";
+    String replLoadCmd = "REPL LOAD " + primaryDbName + " INTO " + replicatedDbName;
     if ((withClauseOptions != null) && !withClauseOptions.isEmpty()) {
       replLoadCmd += " WITH (" + StringUtils.join(withClauseOptions, ",") + ")";
     }
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index 914147f..dbe282d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -2850,8 +2850,6 @@ public class TestJdbcDriver2 {
       ResultSet replDumpRslt = stmt.executeQuery("repl dump " + primaryDb +
               " with ('hive.repl.rootdir' = '" + replDir + "')");
       assertTrue(replDumpRslt.next());
-      String dumpLocation = replDumpRslt.getString(1);
-      String lastReplId = replDumpRslt.getString(2);
       List<String> logs = stmt.getQueryLog(false, 10000);
       stmt.close();
       LOG.info("Query_Log for Bootstrap Dump");
@@ -2865,7 +2863,8 @@ public class TestJdbcDriver2 {
 
       // Bootstrap load
       stmt = (HiveStatement) con.createStatement();
-      stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'");
+      stmt.execute("repl load " + primaryDb + " into " + replicaDb +
+              " with ('hive.repl.rootdir' = '" + replDir + "')");
       logs = stmt.getQueryLog(false, 10000);
       stmt.close();
       LOG.info("Query_Log for Bootstrap Load");
@@ -2889,8 +2888,6 @@ public class TestJdbcDriver2 {
       replDumpRslt = stmt.executeQuery("repl dump " + primaryDb +
               " with ('hive.repl.rootdir' = '" + replDir + "')");
       assertTrue(replDumpRslt.next());
-      dumpLocation = replDumpRslt.getString(1);
-      lastReplId = replDumpRslt.getString(2);
       logs = stmt.getQueryLog(false, 10000);
       stmt.close();
       LOG.info("Query_Log for Incremental Dump");
@@ -2904,7 +2901,8 @@ public class TestJdbcDriver2 {
 
       // Incremental load
       stmt = (HiveStatement) con.createStatement();
-      stmt.execute("repl load " + replicaDb + " from '" + dumpLocation + "'");
+      stmt.execute("repl load " + primaryDb + " into " + replicaDb +
+              " with ('hive.repl.rootdir' = '" + replDir + "')");
       logs = stmt.getQueryLog(false, 10000);
       LOG.info("Query_Log for Incremental Load");
       verifyFetchedLog(logs, expectedIncrementalLoadLogs);
@@ -3105,7 +3103,7 @@ public class TestJdbcDriver2 {
 
     try {
       // invalid load path
-      stmt.execute("repl load default1 from '/tmp/junk'");
+      stmt.execute("repl load default into default1");
     } catch(SQLException e){
       assertTrue(e.getErrorCode() == ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getErrorCode());
     }
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
index a7c905b..7fa6796 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java
@@ -29,6 +29,7 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
 import java.sql.Connection;
 import java.sql.DatabaseMetaData;
 import java.sql.DriverManager;
@@ -43,6 +44,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Base64;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -1453,7 +1455,8 @@ public class TestJdbcWithMiniHS2 {
         TestJdbcWithMiniHS2.class.getCanonicalName().toLowerCase().replace('.', '_') + "_"
             + System.currentTimeMillis();
     String testPathName = System.getProperty("test.warehouse.dir", "/tmp") + Path.SEPARATOR + tid;
-    Path testPath = new Path(testPathName + Path.SEPARATOR + testDbName);
+    Path testPath = new Path(testPathName + Path.SEPARATOR
+            + Base64.getEncoder().encodeToString(testDbName.toLowerCase().getBytes(StandardCharsets.UTF_8)));
     FileSystem fs = testPath.getFileSystem(new HiveConf());
     Statement stmt = conDefault.createStatement();
     try {
diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
index 949e57b..3dcd60e 100644
--- a/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
+++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g
@@ -924,12 +924,8 @@ replDumpStatement
       : KW_REPL KW_DUMP
         (dbPolicy=replDbPolicy)
         (KW_REPLACE oldDbPolicy=replDbPolicy)?
-        (KW_FROM (eventId=Number)
-          (KW_TO (rangeEnd=Number))?
-          (KW_LIMIT (batchSize=Number))?
-        )?
         (KW_WITH replConf=replConfigs)?
-    -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? ^(TOK_FROM $eventId (TOK_TO $rangeEnd)? (TOK_LIMIT $batchSize)?)? $replConf?)
+    -> ^(TOK_REPL_DUMP $dbPolicy ^(TOK_REPLACE $oldDbPolicy)? $replConf?)
     ;
 
 replDbPolicy
@@ -943,10 +939,10 @@ replLoadStatement
 @init { pushMsg("Replication load statement", state); }
 @after { popMsg(state); }
       : KW_REPL KW_LOAD
-        (dbName=identifier)?
-        KW_FROM (path=StringLiteral)
-        (KW_WITH replConf=replConfigs)?
-      -> ^(TOK_REPL_LOAD $path ^(TOK_DBNAME $dbName)? $replConf?)
+      (sourceDbPolicy=replDbPolicy)
+      (KW_INTO dbName=identifier)?
+      (KW_WITH replConf=replConfigs)?
+      -> ^(TOK_REPL_LOAD $sourceDbPolicy ^(TOK_DBNAME $dbName)? $replConf?)
       ;
 
 replConfigs
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index f5eea15..fd06968 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -76,14 +76,14 @@ import javax.security.auth.login.LoginException;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.Comparator;
+import java.util.Base64;
 import java.util.ArrayList;
+import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hive.ql.exec.repl.ReplExternalTables.Writer;
 
@@ -121,7 +121,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   public int execute() {
     try {
       Hive hiveDb = getHive();
-      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR), work.dbNameOrPattern.toLowerCase());
+      Path dumpRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+              Base64.getEncoder().encodeToString(work.dbNameOrPattern.toLowerCase()
+                      .getBytes(StandardCharsets.UTF_8.name())));
       Path currentDumpPath = new Path(dumpRoot, getNextDumpDir());
       DumpMetaData dmd = new DumpMetaData(currentDumpPath, conf);
       // Initialize ReplChangeManager instance since we will require it to encode file URI.
@@ -147,14 +149,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private Long getEventFromPreviousDumpMetadata(Path dumpRoot) throws IOException, SemanticException {
     FileStatus[] statuses = dumpRoot.getFileSystem(conf).listStatus(dumpRoot);
     if (statuses.length > 0) {
-      //sort based on last modified. Recent one is at the top
-      Arrays.sort(statuses, new Comparator<FileStatus>() {
-        public int compare(FileStatus f1, FileStatus f2) {
-          return Long.compare(f2.getModificationTime(), f1.getModificationTime());
+      FileStatus latestUpdatedStatus = statuses[0];
+      for (FileStatus status : statuses) {
+        if (status.getModificationTime() > latestUpdatedStatus.getModificationTime()) {
+          latestUpdatedStatus = status;
         }
-      });
-      FileStatus recentDump = statuses[0];
-      DumpMetaData dmd = new DumpMetaData(recentDump.getPath(), conf);
+      }
+      DumpMetaData dmd = new DumpMetaData(latestUpdatedStatus.getPath(), conf);
       if (dmd.isIncrementalDump()) {
         return dmd.getEventTo();
       }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 2243cb6..703eb11 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.antlr.runtime.tree.Tree;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -44,10 +45,14 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Base64;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVEQUERYID;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY;
@@ -68,8 +73,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private ReplScope replScope = new ReplScope();
   private ReplScope oldReplScope = null;
 
-  // Base path for REPL LOAD
-  private String path;
+  // Source DB Name for REPL LOAD
+  private String sourceDbNameOrPattern;
   // Added conf member to set the REPL command specific config entries without affecting the configs
   // of any other queries running in the session
   private HiveConf conf;
@@ -77,6 +82,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   // By default, this will be same as that of super class BaseSemanticAnalyzer. But need to obtain again
   // if the Hive configs are received from WITH clause in REPL LOAD or REPL STATUS commands.
   private Hive db;
+  private boolean isTargetAlreadyLoaded;
 
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
 
@@ -280,18 +286,20 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   }
 
   // REPL LOAD
-  private void initReplLoad(ASTNode ast) throws SemanticException {
-    path = PlanUtils.stripQuotes(ast.getChild(0).getText());
+  private void initReplLoad(ASTNode ast) throws HiveException {
+    sourceDbNameOrPattern = PlanUtils.stripQuotes(ast.getChild(0).getText());
     int numChildren = ast.getChildCount();
     for (int i = 1; i < numChildren; i++) {
       ASTNode childNode = (ASTNode) ast.getChild(i);
       switch (childNode.getToken().getType()) {
-        case TOK_DBNAME:
-          replScope.setDbName(PlanUtils.stripQuotes(childNode.getChild(0).getText()));
-          break;
-        case TOK_REPL_CONFIG:
-          setConfigs((ASTNode) childNode.getChild(0));
-          break;
+      case TOK_DBNAME:
+        replScope.setDbName(PlanUtils.stripQuotes(childNode.getChild(0).getText()));
+        break;
+      case TOK_REPL_CONFIG:
+        setConfigs((ASTNode) childNode.getChild(0));
+        break;
+      case TOK_REPL_TABLES: //Accept TOK_REPL_TABLES for table level repl.Needn't do anything as dump path needs db only
+        break;
         default:
           throw new SemanticException("Unrecognized token in REPL LOAD statement.");
       }
@@ -341,25 +349,18 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
    *    36/
    */
   private void analyzeReplLoad(ASTNode ast) throws SemanticException {
-    initReplLoad(ast);
+    try {
+      initReplLoad(ast);
+    } catch (HiveException e) {
+      throw new SemanticException(e);
+    }
 
     // For analyze repl load, we walk through the dir structure available in the path,
     // looking at each db, and then each table, and then setting up the appropriate
     // import job in its place.
     try {
-      assert(path != null);
-      Path loadPath = new Path(path);
-      final FileSystem fs = loadPath.getFileSystem(conf);
-
-      // Make fully qualified path for further use.
-      loadPath = fs.makeQualified(loadPath);
-
-      if (!fs.exists(loadPath)) {
-        // supposed dump path does not exist.
-        LOG.error("File not found " + loadPath.toUri().toString());
-        throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
-      }
-
+      assert(sourceDbNameOrPattern != null);
+      Path loadPath = getCurrentLoadPath();
       // Ths config is set to make sure that in case of s3 replication, move is skipped.
       try {
         Warehouse wh = new Warehouse(conf);
@@ -387,27 +388,79 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       // At this point, all dump dirs should contain a _dumpmetadata file that
       // tells us what is inside that dumpdir.
 
-      DumpMetaData dmd = new DumpMetaData(loadPath, conf);
+      //If repl status of target is greater than dumps, don't do anything as the load for the latest dump is done
+      if (!isTargetAlreadyLoaded) {
+        DumpMetaData dmd = new DumpMetaData(loadPath, conf);
 
-      boolean evDump = false;
-      // we will decide what hdfs locations needs to be copied over here as well.
-      if (dmd.isIncrementalDump()) {
-        LOG.debug("{} contains an incremental dump", loadPath);
-        evDump = true;
-      } else {
-        LOG.debug("{} contains an bootstrap dump", loadPath);
+        boolean evDump = false;
+        // we will decide what hdfs locations needs to be copied over here as well.
+        if (dmd.isIncrementalDump()) {
+          LOG.debug("{} contains an incremental dump", loadPath);
+          evDump = true;
+        } else {
+          LOG.debug("{} contains an bootstrap dump", loadPath);
+        }
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
+                dmd.getReplScope(),
+                queryState.getLineageState(), evDump, dmd.getEventTo(),
+                dirLocationsToCopy(loadPath, evDump));
+        rootTasks.add(TaskFactory.get(replLoadWork, conf));
       }
-      ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), replScope.getDbName(),
-              dmd.getReplScope(),
-              queryState.getLineageState(), evDump, dmd.getEventTo(),
-          dirLocationsToCopy(loadPath, evDump));
-      rootTasks.add(TaskFactory.get(replLoadWork, conf));
     } catch (Exception e) {
       // TODO : simple wrap & rethrow for now, clean up with error codes
       throw new SemanticException(e.getMessage(), e);
     }
   }
 
+  private Path getCurrentLoadPath() throws IOException, SemanticException {
+    Path loadPathBase = new Path(conf.getVar(HiveConf.ConfVars.REPLDIR),
+            Base64.getEncoder().encodeToString(sourceDbNameOrPattern.toLowerCase()
+            .getBytes(StandardCharsets.UTF_8.name())));
+    final FileSystem fs = loadPathBase.getFileSystem(conf);
+
+    // Make fully qualified path for further use.
+    loadPathBase = fs.makeQualified(loadPathBase);
+
+    if (!fs.exists(loadPathBase)) {
+      // supposed dump path does not exist.
+      LOG.error("File not found " + loadPathBase.toUri().toString());
+      throw new FileNotFoundException(ErrorMsg.REPL_LOAD_PATH_NOT_FOUND.getMsg());
+    }
+    FileStatus[] statuses = loadPathBase.getFileSystem(conf).listStatus(loadPathBase);
+    if (statuses.length > 0) {
+      //sort based on last modified. Recent one is at the end
+      Arrays.sort(statuses, new Comparator<FileStatus>() {
+        public int compare(FileStatus f1, FileStatus f2) {
+          return Long.compare(f1.getModificationTime(), f2.getModificationTime());
+        }
+      });
+      if (replScope.getDbName() != null) {
+        String currentReplStatusOfTarget
+                = getReplStatus(replScope.getDbName());
+        if (currentReplStatusOfTarget == null) { //bootstrap
+          return statuses[0].getPath();
+        } else {
+          DumpMetaData latestDump = new DumpMetaData(statuses[statuses.length - 1].getPath(), conf);
+          if (Long.parseLong(currentReplStatusOfTarget.trim()) >= latestDump.getEventTo()) {
+            isTargetAlreadyLoaded = true;
+          } else {
+            for (FileStatus status : statuses) {
+              DumpMetaData dmd = new DumpMetaData(status.getPath(), conf);
+              if (dmd.isIncrementalDump()
+                      && Long.parseLong(currentReplStatusOfTarget.trim()) < dmd.getEventTo()) {
+                return status.getPath();
+              }
+            }
+          }
+        }
+      } else {
+        //If dbname is null(in case of repl load *), can't get repl status of target, return unsupported
+        throw new UnsupportedOperationException("REPL LOAD * is not supported");
+      }
+    }
+    return null;
+  }
+
   private List<DirCopyWork> dirLocationsToCopy(Path loadPath, boolean isIncrementalPhase)
       throws HiveException, IOException {
     List<DirCopyWork> list = new ArrayList<>();
@@ -466,10 +519,15 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private void analyzeReplStatus(ASTNode ast) throws SemanticException {
     initReplStatus(ast);
-
     String dbNameOrPattern = replScope.getDbName();
-    String replLastId = null;
+    String replLastId = getReplStatus(dbNameOrPattern);
+    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
+    setFetchTask(createFetchTask("last_repl_id#string"));
+    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {} using configuration {}",
+        replLastId, ctx.getResFile(), conf);
+  }
 
+  private String getReplStatus(String dbNameOrPattern) throws SemanticException {
     try {
       // Checking for status of a db
       Database database = db.getDatabase(dbNameOrPattern);
@@ -477,18 +535,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         inputs.add(new ReadEntity(database));
         Map<String, String> params = database.getParameters();
         if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
-          replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+          return params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
         }
       }
     } catch (HiveException e) {
       throw new SemanticException(e); // TODO : simple wrap & rethrow for now, clean up with error
-                                      // codes
+      // codes
     }
-
-    prepareReturnValues(Collections.singletonList(replLastId), "last_repl_id#string");
-    setFetchTask(createFetchTask("last_repl_id#string"));
-    LOG.debug("ReplicationSemanticAnalyzer.analyzeReplStatus: writing repl.last.id={} out to {}",
-        replLastId, ctx.getResFile(), conf);
+    return null;
   }
 
   private void prepareReturnValues(List<String> values, String schema) throws SemanticException {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
index e606713..e91a7ed 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestParseUtils.java
@@ -78,7 +78,7 @@ public class TestParseUtils {
           {"LOAD DATA LOCAL INPATH './examples/files/kv.txt' " +
              "  OVERWRITE INTO TABLE a", TxnType.DEFAULT},
 
-          {"REPL LOAD a from './examples/files/kv.txt'", TxnType.DEFAULT},
+          {"REPL LOAD a INTO a", TxnType.DEFAULT},
           {"REPL DUMP a", TxnType.DEFAULT},
           {"REPL STATUS a", TxnType.DEFAULT},
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
index 48b9883..81ab01d 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestReplicationSemanticAnalyzer.java
@@ -112,40 +112,6 @@ public class TestReplicationSemanticAnalyzer {
       assertDatabase(2, root);
       assertTableName(root);
     }
-
-    @Test
-    public void parseFromEventId() throws ParseException {
-      ASTNode root = parse("repl dump testDb.'test_table' from 100");
-      assertDatabase(3, root);
-      assertTableName(root);
-      assertFromEvent(1, root);
-    }
-
-    @Test
-    public void parseToEventId() throws ParseException {
-      ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200");
-      assertDatabase(3, root);
-      assertTableName(root);
-      ASTNode fromClauseRootNode = assertFromEvent(3, root);
-      assertToEventId(fromClauseRootNode);
-    }
-
-    @Test
-    public void parseLimit() throws ParseException {
-      ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200 limit 10");
-      assertDatabase(3, root);
-      assertTableName(root);
-      ASTNode fromClauseRootNode = assertFromEvent(5, root);
-      assertToEventId(fromClauseRootNode);
-
-      ASTNode child = (ASTNode) fromClauseRootNode.getChild(3);
-      assertEquals("TOK_LIMIT", child.getText());
-      assertEquals(0, child.getChildCount());
-
-      child = (ASTNode) fromClauseRootNode.getChild(4);
-      assertEquals("10", child.getText());
-      assertEquals(0, child.getChildCount());
-    }
   }
 
   public static class ReplDumpWithClause {
@@ -165,66 +131,26 @@ public class TestReplicationSemanticAnalyzer {
       assertTableName(root);
       assertWithClause(root, 2);
     }
-
-    @Test
-    public void parseFromEventId() throws ParseException {
-      ASTNode root = parse("repl dump testDb.'test_table' from 100 "
-          + "with ('key.1'='value.1','key.2'='value.2')");
-      assertDatabase(4, root);
-      assertTableName(root);
-      assertFromEvent(1, root);
-      assertWithClause(root, 3);
-    }
-
-    @Test
-    public void parseToEventId() throws ParseException {
-      ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200 "
-          + "with ('key.1'='value.1','key.2'='value.2')");
-      assertDatabase(4, root);
-      assertTableName(root);
-      ASTNode fromClauseRootNode = assertFromEvent(3, root);
-      assertToEventId(fromClauseRootNode);
-      assertWithClause(root, 3);
-    }
-
-    @Test
-    public void parseLimit() throws ParseException {
-      ASTNode root = parse("repl dump testDb.'test_table' from 100 to 200 limit 10 "
-          + "with ('key.1'='value.1','key.2'='value.2')");
-      assertDatabase(4, root);
-      assertTableName(root);
-      ASTNode fromClauseRootNode = assertFromEvent(5, root);
-      assertToEventId(fromClauseRootNode);
-      assertWithClause(root, 3);
-
-      ASTNode child = (ASTNode) fromClauseRootNode.getChild(3);
-      assertEquals("TOK_LIMIT", child.getText());
-      assertEquals(0, child.getChildCount());
-
-      child = (ASTNode) fromClauseRootNode.getChild(4);
-      assertEquals("10", child.getText());
-      assertEquals(0, child.getChildCount());
-    }
   }
 
   public static class ReplLoad {
 
     @Test
     public void parseFromLocation() throws ParseException {
-      ASTNode root = parse("repl load  from '/some/location/in/hdfs/'");
+      ASTNode root = parse("repl load testDbName");
       assertFromLocation(1, root);
     }
 
     @Test
     public void parseTargetDbName() throws ParseException {
-      ASTNode root = parse("repl load targetTestDbName from '/some/location/in/hdfs/'");
+      ASTNode root = parse("repl load testDbName into targetTestDbName");
       assertFromLocation(2, root);
       assertTargetDatabaseName(root);
     }
 
     @Test
     public void parseWithClause() throws ParseException {
-      ASTNode root = parse("repl load targetTestDbName from '/some/location/in/hdfs/'"
+      ASTNode root = parse("repl load testDbName into targetTestDbName"
           + " with ('mapred.job.queue.name'='repl','hive.repl.approx.max.load.tasks'='100')");
       assertFromLocation(3, root);
       assertTargetDatabaseName(root);
@@ -251,7 +177,7 @@ public class TestReplicationSemanticAnalyzer {
       assertEquals("TOK_REPL_LOAD", root.getText());
       assertEquals(expectedNumberOfChildren, root.getChildCount());
       ASTNode child = (ASTNode) root.getChild(0);
-      assertEquals("'/some/location/in/hdfs/'", child.getText());
+      assertEquals("testDbName", child.getText());
       assertEquals(0, child.getChildCount());
     }
 
diff --git a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
index e1a6153..c227b68 100644
--- a/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
+++ b/ql/src/test/queries/clientnegative/repl_dump_requires_admin.q
@@ -26,7 +26,7 @@ show role grant user hive_admin_user;
 show tables test_repldump_adminpriv;
 repl dump test_repldump_adminpriv;
 
-dfs -rmr  ${system:test.tmp.dir}/hrepl/test_repldump_adminpriv/next;
+dfs -rmr  ${system:test.tmp.dir}/hrepl/dGVzdF9yZXBsZHVtcF9hZG1pbnByaXY=/next;
 
 set user.name=ruser1;
 show tables test_repldump_adminpriv;
diff --git a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
index 921b50b..0e52e9d 100644
--- a/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
+++ b/ql/src/test/queries/clientnegative/repl_load_requires_admin.q
@@ -29,10 +29,10 @@ show tables test_replload_adminpriv_src;
 repl dump test_replload_adminpriv_src;
 
 -- repl load as admin should succeed
-repl load test_replload_adminpriv_tgt1 from '${system:test.tmp.dir}/hrepl/test_replload_adminpriv_src/next/';
+repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt1;
 show tables test_replload_adminpriv_tgt1;
 
 set user.name=ruser1;
 
 -- repl load as non-admin should fail
-repl load test_replload_adminpriv_tgt2 from '${system:test.tmp.dir}/hrepl/test_replload_adminpriv_src/next';
+repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt2;
diff --git a/ql/src/test/queries/clientpositive/repl_load_old_version.q b/ql/src/test/queries/clientpositive/repl_load_old_version.q
deleted file mode 100644
index 11ed75d..0000000
--- a/ql/src/test/queries/clientpositive/repl_load_old_version.q
+++ /dev/null
@@ -1,10 +0,0 @@
-set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
-REPL LOAD test_db from '../../data/files/repl_dump' with ('hive.exec.parallel'='false');
-use test_db;
-show tables;
-select * from tbl1 order by fld;
-select * from tbl2 order by fld;
-select * from tbl3 order by fld;
-select * from tbl4 order by fld;
-select * from tbl5 order by fld;
-select * from tbl6 order by fld1;
diff --git a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
index 1499c39..28f9d23 100644
--- a/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
+++ b/ql/src/test/results/clientnegative/repl_load_requires_admin.q.out
@@ -68,8 +68,9 @@ POSTHOOK: query: repl dump test_replload_adminpriv_src
 POSTHOOK: type: REPLDUMP
 POSTHOOK: Input: database:test_replload_adminpriv_src
 #### A masked pattern was here ####
+PREHOOK: query: repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt1
 PREHOOK: type: REPLLOAD
-#### A masked pattern was here ####
+POSTHOOK: query: repl load test_replload_adminpriv_src into test_replload_adminpriv_tgt1
 POSTHOOK: type: REPLLOAD
 PREHOOK: query: show tables test_replload_adminpriv_tgt1
 PREHOOK: type: SHOWTABLES