You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/05/02 06:45:38 UTC

[14/14] hive git commit: HIVE-18988: Support bootstrap replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)

HIVE-18988: Support bootstrap replication of ACID tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Thejas M Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e8651cb9
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e8651cb9
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e8651cb9

Branch: refs/heads/master
Commit: e8651cb970aadc2da3d080ddae5767650d69c660
Parents: 4fd8f03
Author: Sankar Hariappan <sa...@apache.org>
Authored: Wed May 2 12:15:01 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Wed May 2 12:15:01 2018 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |    5 +
 .../listener/DbNotificationListener.java        |    2 +-
 .../TestReplicationScenariosAcidTables.java     |  167 +
 .../java/org/apache/hadoop/hive/ql/Driver.java  |    1 +
 .../hadoop/hive/ql/exec/ReplCopyTask.java       |    8 +-
 .../apache/hadoop/hive/ql/exec/ReplTxnTask.java |   27 +-
 .../apache/hadoop/hive/ql/exec/ReplTxnWork.java |  106 -
 .../apache/hadoop/hive/ql/exec/TaskFactory.java |    1 +
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |   86 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpWork.java  |    4 +-
 .../ql/exec/repl/bootstrap/ReplLoadTask.java    |    9 +-
 .../ql/exec/repl/bootstrap/ReplLoadWork.java    |    8 +-
 .../exec/repl/bootstrap/events/TableEvent.java  |    3 +
 .../events/filesystem/FSPartitionEvent.java     |    6 +
 .../events/filesystem/FSTableEvent.java         |   16 +
 .../bootstrap/load/table/LoadPartitions.java    |   50 +-
 .../repl/bootstrap/load/table/LoadTable.java    |   51 +-
 .../exec/repl/bootstrap/load/util/Context.java  |    4 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   36 +
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   10 +
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |    6 +
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   17 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |    5 +-
 .../hive/ql/parse/LoadSemanticAnalyzer.java     |    2 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |    7 +-
 .../hadoop/hive/ql/parse/ReplicationSpec.java   |   28 +-
 .../hadoop/hive/ql/parse/repl/CopyUtils.java    |   98 +-
 .../dump/BootStrapReplicationSpecFunction.java  |   10 +-
 .../hive/ql/parse/repl/dump/HiveWrapper.java    |    6 +-
 .../ql/parse/repl/dump/PartitionExport.java     |    6 +-
 .../hive/ql/parse/repl/dump/TableExport.java    |   14 +-
 .../hadoop/hive/ql/parse/repl/dump/Utils.java   |   10 +
 .../ql/parse/repl/dump/io/FileOperations.java   |   69 +-
 .../parse/repl/dump/io/FunctionSerializer.java  |    2 +-
 .../repl/load/message/AbortTxnHandler.java      |    2 +-
 .../repl/load/message/AllocWriteIdHandler.java  |    2 +-
 .../repl/load/message/CommitTxnHandler.java     |    2 +-
 .../parse/repl/load/message/OpenTxnHandler.java |    2 +-
 .../hadoop/hive/ql/plan/ImportTableDesc.java    |   11 +
 .../apache/hadoop/hive/ql/plan/ReplTxnWork.java |  124 +
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 2632 +++++----
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.h    |  107 +
 .../ThriftHiveMetastore_server.skeleton.cpp     |    5 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3305 +++++------
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   75 +
 .../metastore/api/AddDynamicPartitions.java     |   32 +-
 .../api/AllocateTableWriteIdsRequest.java       |   68 +-
 .../api/AllocateTableWriteIdsResponse.java      |   36 +-
 .../metastore/api/ClearFileMetadataRequest.java |   32 +-
 .../hive/metastore/api/ClientCapabilities.java  |   32 +-
 .../hive/metastore/api/CompactionRequest.java   |   44 +-
 .../hive/metastore/api/CreationMetadata.java    |   32 +-
 .../metastore/api/FindSchemasByColsResp.java    |   36 +-
 .../hive/metastore/api/FireEventRequest.java    |   32 +-
 .../metastore/api/GetAllFunctionsResponse.java  |   36 +-
 .../api/GetFileMetadataByExprRequest.java       |   32 +-
 .../api/GetFileMetadataByExprResult.java        |   48 +-
 .../metastore/api/GetFileMetadataRequest.java   |   32 +-
 .../metastore/api/GetFileMetadataResult.java    |   44 +-
 .../hive/metastore/api/GetTablesRequest.java    |   32 +-
 .../hive/metastore/api/GetTablesResult.java     |   36 +-
 .../metastore/api/GetValidWriteIdsRequest.java  |   32 +-
 .../metastore/api/GetValidWriteIdsResponse.java |   36 +-
 .../api/HeartbeatTxnRangeResponse.java          |   64 +-
 .../metastore/api/InsertEventRequestData.java   |   64 +-
 .../hadoop/hive/metastore/api/LockRequest.java  |   36 +-
 .../hive/metastore/api/Materialization.java     |   32 +-
 .../api/NotificationEventResponse.java          |   36 +-
 .../metastore/api/PutFileMetadataRequest.java   |   64 +-
 .../api/ReplTblWriteIdStateRequest.java         |  952 ++++
 .../hive/metastore/api/SchemaVersion.java       |   36 +-
 .../hive/metastore/api/ShowCompactResponse.java |   36 +-
 .../hive/metastore/api/ShowLocksResponse.java   |   36 +-
 .../hive/metastore/api/TableValidWriteIds.java  |   32 +-
 .../hive/metastore/api/ThriftHiveMetastore.java | 5145 ++++++++++--------
 .../hive/metastore/api/WMFullResourcePlan.java  |  144 +-
 .../api/WMGetAllResourcePlanResponse.java       |   36 +-
 .../WMGetTriggersForResourePlanResponse.java    |   36 +-
 .../api/WMValidateResourcePlanResponse.java     |   64 +-
 .../gen-php/metastore/ThriftHiveMetastore.php   | 2066 +++----
 .../src/gen/thrift/gen-php/metastore/Types.php  | 1028 ++--
 .../hive_metastore/ThriftHiveMetastore-remote   |    7 +
 .../hive_metastore/ThriftHiveMetastore.py       | 1482 ++---
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  676 ++-
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   31 +
 .../gen/thrift/gen-rb/thrift_hive_metastore.rb  |   52 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |    5 +
 .../hive/metastore/HiveMetaStoreClient.java     |   27 +
 .../hadoop/hive/metastore/IMetaStoreClient.java |   19 +-
 .../hive/metastore/ReplChangeManager.java       |   63 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  326 +-
 .../hadoop/hive/metastore/txn/TxnStore.java     |   11 +-
 .../hadoop/hive/metastore/txn/TxnUtils.java     |   51 +-
 .../src/main/thrift/hive_metastore.thrift       |   10 +
 .../HiveMetaStoreClientPreCatalog.java          |   27 +
 .../hadoop/hive/common/ValidReadTxnList.java    |    3 -
 96 files changed, 12249 insertions(+), 8267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index df7a559..fe7b23f 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -469,6 +469,11 @@ public class HiveConf extends Configuration {
             + "metadata for acid tables which do not require the corresponding transaction \n"
             + "semantics to be applied on target. This can be removed when ACID table \n"
             + "replication is supported."),
+    REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT("hive.repl.bootstrap.dump.open.txn.timeout", "1h",
+        new TimeValidator(TimeUnit.HOURS),
+        "Indicates the timeout for all transactions which are opened before triggering bootstrap REPL DUMP. "
+            + "If these open transactions are not closed within the timeout value, then REPL DUMP will "
+            + "forcefully abort those transactions and continue with bootstrap dump."),
     //https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/TransparentEncryption.html#Running_as_the_superuser
     REPL_ADD_RAW_RESERVED_NAMESPACE("hive.repl.add.raw.reserved.namespace", false,
         "For TDE with same encryption keys on source and target, allow Distcp super user to access \n"

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 59d1e3a..7835691 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -240,7 +240,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
         FileStatus file = files[i];
         i++;
         return ReplChangeManager.encodeFileUri(file.getPath().toString(),
-            ReplChangeManager.checksumFor(file.getPath(), fs));
+            ReplChangeManager.checksumFor(file.getPath(), fs), null);
       } catch (IOException e) {
         throw new RuntimeException(e);
       }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 2ad83b6..8ad507f 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -19,12 +19,21 @@ package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.shims.Utils;
 import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -32,7 +41,9 @@ import org.junit.BeforeClass;
 import org.junit.AfterClass;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables
@@ -59,6 +70,9 @@ public class TestReplicationScenariosAcidTables {
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put("hive.support.concurrency", "true");
         put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+        put("hive.repl.dump.include.acid.tables", "true");
+        put("hive.metastore.client.capability.check", "false");
+        put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
     }};
     primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
     replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
@@ -66,6 +80,8 @@ public class TestReplicationScenariosAcidTables {
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put("hive.support.concurrency", "false");
         put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+        put("hive.repl.dump.include.acid.tables", "true");
+        put("hive.metastore.client.capability.check", "false");
     }};
     replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
   }
@@ -92,6 +108,157 @@ public class TestReplicationScenariosAcidTables {
   }
 
   @Test
+  public void testAcidTablesBootstrap() throws Throwable {
+    WarehouseInstance.Tuple bootstrapDump = primary
+            .run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(1)")
+            .run("insert into t1 values(2)")
+            .run("create table t2 (place string) partitioned by (country string) clustered by(place) " +
+                    "into 3 buckets stored as orc tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t2 partition(country='india') values ('bangalore')")
+            .run("insert into t2 partition(country='us') values ('austin')")
+            .run("insert into t2 partition(country='france') values ('paris')")
+            .run("alter table t2 add partition(country='italy')")
+            .run("create table t3 (rank int) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .run("insert into t3 values(11)")
+            .run("insert into t3 values(22)")
+            .run("create table t4 (id int)")
+            .run("insert into t4 values(111), (222)")
+            .run("create table t5 (id int) stored as orc ")
+            .run("insert into t5 values(1111), (2222)")
+            .run("alter table t5 set tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t5 values(3333)")
+            .dump(primaryDbName, null);
+
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"t1", "t2", "t3", "t4", "t5"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(bootstrapDump.lastReplicationId)
+            .run("select id from t1 order by id")
+            .verifyResults(new String[]{"1", "2"})
+            .run("select country from t2 order by country")
+            .verifyResults(new String[] {"france", "india", "us"})
+            .run("select rank from t3 order by rank")
+            .verifyResults(new String[] {"11", "22"})
+            .run("select id from t4 order by id")
+            .verifyResults(new String[] {"111", "222"})
+            .run("select id from t5 order by id")
+            .verifyResults(new String[] {"1111", "2222", "3333"});
+  }
+
+  @Test
+  public void testAcidTablesBootstrapWithOpenTxnsTimeout() throws Throwable {
+    // Open 5 txns
+    HiveConf primaryConf = primary.getConf();
+    TxnStore txnHandler = TxnUtils.getTxnStore(primary.getConf());
+    OpenTxnsResponse otResp = txnHandler.openTxns(new OpenTxnRequest(5, "u1", "localhost"));
+    List<Long> txns = otResp.getTxn_ids();
+    String txnIdRange = " txn_id >= " + txns.get(0) + " and txn_id <= " + txns.get(4);
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+            5, TxnDbUtil.countQueryAgent(primaryConf,
+                  "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+
+    // Create 2 tables, one partitioned and other not. Also, have both types of full ACID and MM tables.
+    primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(1)")
+            .run("create table t2 (rank int) partitioned by (name string) tblproperties(\"transactional\"=\"true\", " +
+                    "\"transactional_properties\"=\"insert_only\")")
+            .run("insert into t2 partition(name='Bob') values(11)")
+            .run("insert into t2 partition(name='Carl') values(10)");
+    // Allocate write ids for both tables t1 and t2 for all txns
+    // t1=5+1(insert) and t2=5+2(insert)
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(primaryDbName, "t1");
+    rqst.setTxnIds(txns);
+    txnHandler.allocateTableWriteIds(rqst);
+    rqst.setTableName("t2");
+    txnHandler.allocateTableWriteIds(rqst);
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
+            6, TxnDbUtil.countQueryAgent(primaryConf,
+                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
+                            + "' and t2w_table = 't1'"));
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXN_TO_WRITE_ID"),
+            7, TxnDbUtil.countQueryAgent(primaryConf,
+                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + primaryDbName.toLowerCase()
+                            + "' and t2w_table = 't2'"));
+
+    // Bootstrap dump with open txn timeout as 1s.
+    List<String> withConfigs = Arrays.asList(
+            "'hive.repl.bootstrap.dump.open.txn.timeout'='1s'");
+    WarehouseInstance.Tuple bootstrapDump = primary
+            .run("use " + primaryDbName)
+            .dump(primaryDbName, null, withConfigs);
+
+    // After bootstrap dump, all the opened txns should be aborted. Verify it.
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+            0, TxnDbUtil.countQueryAgent(primaryConf,
+                    "select count(*) from TXNS where txn_state = 'o' and " + txnIdRange));
+    Assert.assertEquals(TxnDbUtil.queryToString(primaryConf, "select * from TXNS"),
+            5, TxnDbUtil.countQueryAgent(primaryConf,
+                    "select count(*) from TXNS where txn_state = 'a' and " + txnIdRange));
+
+    // Verify the next write id
+    String[] nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
+            + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't1'")
+            .split("\n");
+    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
+    nextWriteId = TxnDbUtil.queryToString(primaryConf, "select nwi_next from NEXT_WRITE_ID where "
+            + " nwi_database = '" + primaryDbName.toLowerCase() + "' and nwi_table = 't2'")
+            .split("\n");
+    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+
+    // Bootstrap load which should also replicate the aborted write ids on both tables.
+    HiveConf replicaConf = replica.getConf();
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("show tables")
+            .verifyResults(new String[] {"t1", "t2"})
+            .run("repl status " + replicatedDbName)
+            .verifyResult(bootstrapDump.lastReplicationId)
+            .run("select id from t1")
+            .verifyResults(new String[]{"1"})
+            .run("select rank from t2 order by rank")
+            .verifyResults(new String[] {"10", "11"});
+
+    // Verify if HWM is properly set after REPL LOAD
+    nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
+            + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't1'")
+            .split("\n");
+    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 7L);
+    nextWriteId = TxnDbUtil.queryToString(replicaConf, "select nwi_next from NEXT_WRITE_ID where "
+            + " nwi_database = '" + replicatedDbName.toLowerCase() + "' and nwi_table = 't2'")
+            .split("\n");
+    Assert.assertEquals(Long.parseLong(nextWriteId[1].trim()), 8L);
+
+    // Verify if all the aborted write ids are replicated to the replicated DB
+    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
+            5, TxnDbUtil.countQueryAgent(replicaConf,
+                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
+                            + "' and t2w_table = 't1'"));
+    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from TXN_TO_WRITE_ID"),
+            5, TxnDbUtil.countQueryAgent(replicaConf,
+                    "select count(*) from TXN_TO_WRITE_ID where t2w_database = '" + replicatedDbName.toLowerCase()
+                            + "' and t2w_table = 't2'"));
+
+    // Verify if entries added in COMPACTION_QUEUE for each table/partition
+    // t1-> 1 entry and t2-> 2 entries (1 per partition)
+    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
+            1, TxnDbUtil.countQueryAgent(replicaConf,
+                    "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
+                            + "' and cq_table = 't1'"));
+    Assert.assertEquals(TxnDbUtil.queryToString(replicaConf, "select * from COMPACTION_QUEUE"),
+            2, TxnDbUtil.countQueryAgent(replicaConf,
+                    "select count(*) from COMPACTION_QUEUE where cq_database = '" + replicatedDbName
+                            + "' and cq_table = 't2'"));
+  }
+
+  @Test
   public void testOpenTxnEvent() throws Throwable {
     String tableName = testName.getMethodName();
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 41ad002..9f4e6f2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -591,6 +591,7 @@ public class Driver implements IDriver {
         setTriggerContext(queryId);
       }
 
+      ctx.setHiveTxnManager(queryTxnMgr);
       ctx.setStatsSource(statsSource);
       ctx.setCmd(command);
       ctx.setHDFSCleanup(true);

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index de270cf..24e7324 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -83,7 +83,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
       if (ReplChangeManager.isCMFileUri(fromPath, srcFs)) {
         String[] result = ReplChangeManager.getFileWithChksumFromURI(fromPath.toString());
         ReplChangeManager.FileInfo sourceInfo = ReplChangeManager
-            .getFileInfo(new Path(result[0]), result[1], conf);
+            .getFileInfo(new Path(result[0]), result[1], result[2], conf);
         if (FileUtils.copy(
             sourceInfo.getSrcFs(), sourceInfo.getSourcePath(),
             dstFs, toPath, false, false, conf)) {
@@ -130,7 +130,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
           console.printInfo("Copying file: " + oneSrc.getPath().toString());
           LOG.debug("ReplCopyTask :cp:{}=>{}", oneSrc.getPath(), toPath);
           srcFiles.add(new ReplChangeManager.FileInfo(oneSrc.getPath().getFileSystem(conf),
-                                                      oneSrc.getPath()));
+                                                      oneSrc.getPath(), null));
         }
       }
 
@@ -183,14 +183,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
     try (BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(fileListing)))) {
       // TODO : verify if skipping charset here is okay
 
-      String line = null;
+      String line;
       while ((line = br.readLine()) != null) {
         LOG.debug("ReplCopyTask :_filesReadLine: {}", line);
 
         String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line);
         try {
           ReplChangeManager.FileInfo f = ReplChangeManager
-              .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], conf);
+              .getFileInfo(new Path(fileWithChksum[0]), fileWithChksum[1], fileWithChksum[2], conf);
           filePaths.add(f);
         } catch (MetaException e) {
           // issue warning for missing file and throw exception

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index 2615072..5bbc25a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -46,18 +47,13 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
   @Override
   public int execute(DriverContext driverContext) {
     String replPolicy = work.getReplPolicy();
-    if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
-      Utilities.FILE_OP_LOGGER.trace("Executing ReplTxnTask " + work.getOperationType().toString() +
-              " for txn ids : " + work.getTxnIds().toString() + " replPolicy : " + replPolicy);
-    }
-
-    String tableName = work.getTableName() == null || work.getTableName().isEmpty() ? null : work.getTableName();
-    if (tableName != null) {
+    String tableName = work.getTableName();
+    ReplicationSpec replicationSpec = work.getReplicationSpec();
+    if ((tableName != null) && (replicationSpec != null)) {
       Table tbl;
       try {
         tbl = Hive.get().getTable(work.getDbName(), tableName);
-        ReplicationSpec replicationSpec = work.getReplicationSpec();
-        if (replicationSpec != null && !replicationSpec.allowReplacementInto(tbl.getParameters())) {
+        if (!replicationSpec.allowReplacementInto(tbl.getParameters())) {
           // if the event is already replayed, then no need to replay it again.
           LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " +
                   replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType());
@@ -75,7 +71,6 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
     try {
       HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager();
       String user = UserGroupInformation.getCurrentUser().getUserName();
-      LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " + replPolicy);
       switch(work.getOperationType()) {
       case REPL_OPEN_TXN:
         List<Long> txnIds = txnManager.replOpenTxn(replPolicy, work.getTxnIds(), user);
@@ -98,11 +93,17 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
       case REPL_ALLOC_WRITE_ID:
         assert work.getTxnToWriteIdList() != null;
         String dbName = work.getDbName();
-        String tblName = work.getTableName();
         List <TxnToWriteId> txnToWriteIdList = work.getTxnToWriteIdList();
-        txnManager.replAllocateTableWriteIdsBatch(dbName, tblName, replPolicy, txnToWriteIdList);
+        txnManager.replAllocateTableWriteIdsBatch(dbName, tableName, replPolicy, txnToWriteIdList);
         LOG.info("Replayed alloc write Id Event for repl policy: " + replPolicy + " db Name : " + dbName +
-                " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tblName);
+                " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tableName);
+        return 0;
+      case REPL_WRITEID_STATE:
+        txnManager.replTableWriteIdState(work.getValidWriteIdList(),
+                work.getDbName(), tableName, work.getPartNames());
+        LOG.info("Replicated WriteId state for DbName: " + work.getDbName()
+                + " TableName: " + tableName
+                + " ValidWriteIdList: " + work.getValidWriteIdList());
         return 0;
       default:
         LOG.error("Operation Type " + work.getOperationType() + " is not supported ");

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
deleted file mode 100644
index 530e9be..0000000
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.ql.exec;
-
-import java.io.Serializable;
-
-import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
-import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
-import org.apache.hadoop.hive.ql.plan.Explain;
-import org.apache.hadoop.hive.ql.plan.Explain.Level;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * ReplTxnTask.
- * Used for replaying the transaction related events.
- */
-@Explain(displayName = "Replication Transaction", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
-public class ReplTxnWork implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private String dbName;
-  private String tableName;
-  private String replPolicy;
-  private List<Long> txnIds;
-  private List<TxnToWriteId> txnToWriteIdList;
-  private ReplicationSpec replicationSpec;
-
-  /**
-   * OperationType.
-   * Different kind of events supported for replaying.
-   */
-  public enum OperationType {
-    REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID
-  }
-
-  OperationType operation;
-
-  public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
-                     List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
-    this.txnIds = txnIds;
-    this.dbName = dbName;
-    this.tableName = tableName;
-    this.operation = type;
-    this.replPolicy = replPolicy;
-    this.txnToWriteIdList = txnToWriteIdList;
-    this.replicationSpec = replicationSpec;
-  }
-
-  public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
-                     ReplicationSpec replicationSpec) {
-    this(replPolicy, dbName, tableName, txnIds, type, null, replicationSpec);
-  }
-
-  public ReplTxnWork(String replPolicy, String dbName, String tableName, Long txnId,
-                     OperationType type, ReplicationSpec replicationSpec) {
-    this(replPolicy, dbName, tableName, Collections.singletonList(txnId), type, null, replicationSpec);
-  }
-
-  public ReplTxnWork(String replPolicy, String dbName, String tableName, OperationType type,
-                     List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
-    this(replPolicy, dbName, tableName, null, type, txnToWriteIdList, replicationSpec);
-  }
-
-  public List<Long> getTxnIds() {
-    return txnIds;
-  }
-
-  public String getDbName() {
-    return dbName;
-  }
-
-  public String getTableName()  {
-    return tableName;
-  }
-
-  public String getReplPolicy()  {
-    return replPolicy;
-  }
-
-  public OperationType getOperationType() {
-    return operation;
-  }
-
-  public List<TxnToWriteId> getTxnToWriteIdList() {
-    return txnToWriteIdList;
-  }
-
-  public ReplicationSpec getReplicationSpec() {
-    return replicationSpec;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
index 2da6b0f..3a107b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/TaskFactory.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.plan.SparkWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index ce0757c..88d352b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -21,6 +21,8 @@ import com.google.common.primitives.Ints;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.Database;
@@ -39,6 +41,8 @@ import org.apache.hadoop.hive.metastore.messaging.event.filters.EventBoundaryFil
 import org.apache.hadoop.hive.metastore.messaging.event.filters.MessageFormatFilter;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.lockmgr.LockException;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
@@ -65,9 +69,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
@@ -90,6 +97,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
   }
 
+  private static long sleepTime = 60000;
   private Logger LOG = LoggerFactory.getLogger(ReplDumpTask.class);
   private ReplLogger replLogger;
 
@@ -204,20 +212,21 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     // bootstrap case
     Hive hiveDb = getHive();
     Long bootDumpBeginReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+    String validTxnList = getValidTxnListForReplDump(hiveDb);
     for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
       LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
       replLogger = new BootstrapDumpLogger(dbName, dumpRoot.toString(),
               Utils.getAllTables(getHive(), dbName).size(),
               getHive().getAllFunctions().size());
       replLogger.startLog();
-      Path dbRoot = dumpDbMetadata(dbName, dumpRoot);
+      Path dbRoot = dumpDbMetadata(dbName, dumpRoot, bootDumpBeginReplId);
       dumpFunctionMetadata(dbName, dumpRoot);
 
       String uniqueKey = Utils.setDbBootstrapDumpState(hiveDb, dbName);
       for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
         LOG.debug(
             "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
-        dumpTable(dbName, tblName, dbRoot);
+        dumpTable(dbName, tblName, validTxnList, dbRoot);
         dumpConstraintMetadata(dbName, tblName, dbRoot);
       }
       Utils.resetDbBootstrapDumpState(hiveDb, dbName, uniqueKey);
@@ -257,17 +266,17 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return bootDumpBeginReplId;
   }
 
-  private Path dumpDbMetadata(String dbName, Path dumpRoot) throws Exception {
+  private Path dumpDbMetadata(String dbName, Path dumpRoot, long lastReplId) throws Exception {
     Path dbRoot = new Path(dumpRoot, dbName);
     // TODO : instantiating FS objects are generally costly. Refactor
     FileSystem fs = dbRoot.getFileSystem(conf);
     Path dumpPath = new Path(dbRoot, EximUtil.METADATA_NAME);
-    HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName).database();
+    HiveWrapper.Tuple<Database> database = new HiveWrapper(getHive(), dbName, lastReplId).database();
     EximUtil.createDbExportDump(fs, dumpPath, database.object, database.replicationSpec);
     return dbRoot;
   }
 
-  private void dumpTable(String dbName, String tblName, Path dbRoot) throws Exception {
+  private void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception {
     try {
       Hive db = getHive();
       HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName);
@@ -276,6 +285,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
           new TableExport.Paths(work.astRepresentationForErrorMsg, dbRoot, tblName, conf, true);
       String distCpDoAsUser = conf.getVar(HiveConf.ConfVars.HIVE_DISTCP_DOAS_USER);
       tuple.replicationSpec.setIsReplace(true);  // by default for all other objects this is false
+      if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
+        tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
+      }
       new TableExport(exportPaths, tableSpec, tuple.replicationSpec, db, distCpDoAsUser, conf).write();
 
       replLogger.tableLog(tblName, tableSpec.tableHandle.getTableType());
@@ -286,6 +298,70 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     }
   }
 
+  private String getValidWriteIdList(String dbName, String tblName, String validTxnString) throws LockException {
+    if ((validTxnString == null) || validTxnString.isEmpty()) {
+      return null;
+    }
+    String fullTableName = AcidUtils.getFullTableName(dbName, tblName);
+    ValidWriteIdList validWriteIds = getTxnMgr()
+            .getValidWriteIds(Collections.singletonList(fullTableName), validTxnString)
+            .getTableValidWriteIdList(fullTableName);
+    return ((validWriteIds != null) ? validWriteIds.toString() : null);
+  }
+
+  private List<Long> getOpenTxns(ValidTxnList validTxnList) {
+    long[] invalidTxns = validTxnList.getInvalidTransactions();
+    List<Long> openTxns = new ArrayList<>();
+    for (long invalidTxn : invalidTxns) {
+      if (!validTxnList.isTxnAborted(invalidTxn)) {
+        openTxns.add(invalidTxn);
+      }
+    }
+    return openTxns;
+  }
+
+  private String getValidTxnListForReplDump(Hive hiveDb) throws HiveException {
+    // Key design point for REPL DUMP is to not have any txns older than current txn in which dump runs.
+    // This is needed to ensure that Repl dump doesn't copy any data files written by any open txns
+    // mainly for streaming ingest case where one delta file shall have data from committed/aborted/open txns.
+    // It may also have data inconsistency if the on-going txns doesn't have corresponding open/write
+    // events captured which means, catch-up incremental phase won't be able to replicate those txns.
+    // So, the logic is to wait for configured amount of time to see if all open txns < current txn is
+    // getting aborted/committed. If not, then we forcefully abort those txns just like AcidHouseKeeperService.
+    ValidTxnList validTxnList = getTxnMgr().getValidTxns();
+    long timeoutInMs = HiveConf.getTimeVar(conf,
+            HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+    long waitUntilTime = System.currentTimeMillis() + timeoutInMs;
+    while (System.currentTimeMillis() < waitUntilTime) {
+      // If there are no txns which are open for the given ValidTxnList snapshot, then just return it.
+      if (getOpenTxns(validTxnList).isEmpty()) {
+        return validTxnList.toString();
+      }
+
+      // Wait for 1 minute and check again.
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+        LOG.info("REPL DUMP thread sleep interrupted", e);
+      }
+      validTxnList = getTxnMgr().getValidTxns();
+    }
+
+    // After the timeout just force abort the open txns
+    List<Long> openTxns = getOpenTxns(validTxnList);
+    if (!openTxns.isEmpty()) {
+      hiveDb.abortTransactions(openTxns);
+      validTxnList = getTxnMgr().getValidTxns();
+      if (validTxnList.getMinOpenTxn() != null) {
+        openTxns = getOpenTxns(validTxnList);
+        LOG.warn("REPL DUMP unable to force abort all the open txns: {} after timeout due to unknown reasons. " +
+                "However, this is rare case that shouldn't happen.", openTxns);
+        throw new IllegalStateException("REPL DUMP triggered abort txns failed for unknown reasons.");
+      }
+    }
+    return validTxnList.toString();
+  }
+
   private ReplicationSpec getNewReplicationSpec(String evState, String objState,
       boolean isMetadataOnly) {
     return new ReplicationSpec(true, isMetadataOnly, evState, objState, false, true, true);

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
index 323c73d..61fa424 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpWork.java
@@ -39,8 +39,8 @@ public class ReplDumpWork implements Serializable {
   }
 
   public ReplDumpWork(String dbNameOrPattern, String tableNameOrPattern,
-      Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit,
-      String resultTempPath) {
+                      Long eventFrom, Long eventTo, String astRepresentationForErrorMsg, Integer maxEventLimit,
+                      String resultTempPath) {
     this.dbNameOrPattern = dbNameOrPattern;
     this.tableNameOrPattern = tableNameOrPattern;
     this.eventFrom = eventFrom;

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
index 6f217cf..748d318 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadTask.java
@@ -72,8 +72,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
   protected int execute(DriverContext driverContext) {
     try {
       int maxTasks = conf.getIntVar(HiveConf.ConfVars.REPL_APPROX_MAX_LOAD_TASKS);
-      Context context = new Context(conf, getHive(), work.sessionStateLineageState,
-          work.currentTransactionId, driverContext.getCtx());
+      Context context = new Context(conf, getHive(), work.sessionStateLineageState, driverContext.getCtx());
       TaskTracker loadTaskTracker = new TaskTracker(maxTasks);
       /*
           for now for simplicity we are doing just one directory ( one database ), come back to use
@@ -127,7 +126,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
               new TableContext(dbTracker, work.dbNameToLoadIn, work.tableNameToLoadIn);
           TableEvent tableEvent = (TableEvent) next;
           LoadTable loadTable = new LoadTable(tableEvent, context, iterator.replLogger(),
-                                              tableContext, loadTaskTracker, getTxnMgr());
+                                              tableContext, loadTaskTracker);
           tableTracker = loadTable.tasks();
           if (!scope.database) {
             scope.rootTasks.addAll(tableTracker.tasks());
@@ -145,7 +144,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
           // for a table we explicitly try to load partitions as there is no separate partitions events.
           LoadPartitions loadPartitions =
               new LoadPartitions(context, iterator.replLogger(), loadTaskTracker, tableEvent,
-                      work.dbNameToLoadIn, tableContext, getTxnMgr());
+                      work.dbNameToLoadIn, tableContext);
           TaskTracker partitionsTracker = loadPartitions.tasks();
           partitionsPostProcessing(iterator, scope, loadTaskTracker, tableTracker,
               partitionsTracker);
@@ -163,7 +162,7 @@ public class ReplLoadTask extends Task<ReplLoadWork> implements Serializable {
               work.tableNameToLoadIn);
           LoadPartitions loadPartitions =
               new LoadPartitions(context, iterator.replLogger(), tableContext, loadTaskTracker,
-                      event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated(), getTxnMgr());
+                      event.asTableEvent(), work.dbNameToLoadIn, event.lastPartitionReplicated());
           /*
                the tableTracker here should be a new instance and not an existing one as this can
                only happen when we break in between loading partitions.

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
index 91ec93e..c1a9a62 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/ReplLoadWork.java
@@ -44,22 +44,20 @@ public class ReplLoadWork implements Serializable {
   taken care when using other methods.
   */
   final LineageState sessionStateLineageState;
-  public final long currentTransactionId;
 
   public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameToLoadIn,
-      String tableNameToLoadIn, LineageState lineageState, long currentTransactionId)
+      String tableNameToLoadIn, LineageState lineageState)
       throws IOException {
     this.tableNameToLoadIn = tableNameToLoadIn;
     sessionStateLineageState = lineageState;
-    this.currentTransactionId = currentTransactionId;
     this.iterator = new BootstrapEventsIterator(dumpDirectory, dbNameToLoadIn, hiveConf);
     this.constraintsIterator = new ConstraintEventsIterator(dumpDirectory, hiveConf);
     this.dbNameToLoadIn = dbNameToLoadIn;
   }
 
   public ReplLoadWork(HiveConf hiveConf, String dumpDirectory, String dbNameOrPattern,
-      LineageState lineageState, long currentTransactionId) throws IOException {
-    this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState, currentTransactionId);
+      LineageState lineageState) throws IOException {
+    this(hiveConf, dumpDirectory, dbNameOrPattern, null, lineageState);
   }
 
   public BootstrapEventsIterator iterator() {

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
index e817f5f..3dcc1d7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/TableEvent.java
@@ -31,6 +31,9 @@ public interface TableEvent extends BootstrapEvent {
   List<AddPartitionDesc> partitionDescriptions(ImportTableDesc tblDesc)
       throws SemanticException;
 
+  List<String> partitions(ImportTableDesc tblDesc)
+          throws SemanticException;
+
   ReplicationSpec replicationSpec();
 
   boolean shouldNotReplicate();

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
index ef73d89..ee804e8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSPartitionEvent.java
@@ -68,6 +68,12 @@ public class FSPartitionEvent implements PartitionEvent {
   }
 
   @Override
+  public List<String> partitions(ImportTableDesc tblDesc)
+          throws SemanticException {
+    return tableEvent.partitions(tblDesc);
+  }
+
+  @Override
   public ReplicationSpec replicationSpec() {
     return tableEvent.replicationSpec();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index cfd1640..0fabf5a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -88,6 +89,21 @@ public class FSTableEvent implements TableEvent {
     return descs;
   }
 
+  @Override
+  public List<String> partitions(ImportTableDesc tblDesc)
+          throws SemanticException {
+    List<String> partitions = new ArrayList<>();
+    try {
+      for (Partition partition : metadata.getPartitions()) {
+        String partName = Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues());
+        partitions.add(partName);
+      }
+    } catch (MetaException e) {
+      throw new SemanticException(e);
+    }
+    return partitions;
+  }
+
   private AddPartitionDesc partitionDesc(Path fromPath,
       ImportTableDesc tblDesc, Partition partition) throws SemanticException {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
index 7a95716..df7f30d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadPartitions.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -45,10 +45,10 @@ import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
 import org.datanucleus.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -77,19 +78,16 @@ public class LoadPartitions {
 
   private final ImportTableDesc tableDesc;
   private Table table;
-  private final HiveTxnManager txnMgr;
 
   public LoadPartitions(Context context, ReplLogger replLogger, TaskTracker tableTracker,
                         TableEvent event, String dbNameToLoadIn,
-                        TableContext tableContext,
-                        HiveTxnManager txnMgr) throws HiveException, IOException {
-    this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null, txnMgr);
+                        TableContext tableContext) throws HiveException, IOException {
+    this(context, replLogger, tableContext, tableTracker, event, dbNameToLoadIn, null);
   }
 
   public LoadPartitions(Context context, ReplLogger replLogger, TableContext tableContext,
                         TaskTracker limiter, TableEvent event, String dbNameToLoadIn,
-                        AddPartitionDesc lastReplicatedPartition,
-                        HiveTxnManager txnMgr) throws HiveException, IOException {
+                        AddPartitionDesc lastReplicatedPartition) throws HiveException, IOException {
     this.tracker = new TaskTracker(limiter);
     this.event = event;
     this.context = context;
@@ -99,7 +97,6 @@ public class LoadPartitions {
 
     this.tableDesc = tableContext.overrideProperties(event.tableDesc(dbNameToLoadIn));
     this.table = ImportSemanticAnalyzer.tableIfExists(tableDesc, context.hiveDb);
-    this.txnMgr = txnMgr;
   }
 
   private String location() throws MetaException, HiveException {
@@ -141,9 +138,7 @@ public class LoadPartitions {
 
       if (table == null) {
         //new table
-
-        table = new Table(tableDesc.getDatabaseName(),
-            tableDesc.getTableName());
+        table = tableDesc.toTable(context.hiveConf);
         if (isPartitioned(tableDesc)) {
           updateReplicationState(initialReplicationState());
           if (!forNewTable().hasReplicationState()) {
@@ -154,7 +149,6 @@ public class LoadPartitions {
         }
       } else {
         // existing
-
         if (table.isPartitioned()) {
           List<AddPartitionDesc> partitionDescs = event.partitionDescriptions(tableDesc);
           if (!event.replicationSpec().isMetadataOnly() && !partitionDescs.isEmpty()) {
@@ -243,18 +237,24 @@ public class LoadPartitions {
   /**
    * This will create the move of partition data from temp path to actual path
    */
-  private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec,
-      Path tmpPath) {
-    // Note: this sets LoadFileType incorrectly for ACID; is that relevant for load?
-    //       See setLoadFileType and setIsAcidIow calls elsewhere for an example.
-    LoadTableDesc loadTableWork = new LoadTableDesc(
-        tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
-        event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
-        txnMgr.getCurrentTxnId()
-    );
-    loadTableWork.setInheritTableSpecs(false);
-    MoveWork work = new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
-    return TaskFactory.get(work, context.hiveConf);
+  private Task<?> movePartitionTask(Table table, AddPartitionDesc.OnePartitionDesc partSpec, Path tmpPath) {
+    MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
+    if (AcidUtils.isTransactionalTable(table)) {
+      LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+              Collections.singletonList(tmpPath),
+              Collections.singletonList(new Path(partSpec.getLocation())),
+              true, null, null);
+      moveWork.setMultiFilesDesc(loadFilesWork);
+    } else {
+      LoadTableDesc loadTableWork = new LoadTableDesc(
+              tmpPath, Utilities.getTableDesc(table), partSpec.getPartSpec(),
+              event.replicationSpec().isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L
+      );
+      loadTableWork.setInheritTableSpecs(false);
+      moveWork.setLoadTableWork(loadTableWork);
+    }
+
+    return TaskFactory.get(moveWork, context.hiveConf);
   }
 
   private Path locationOnReplicaWarehouse(Table table, AddPartitionDesc.OnePartitionDesc partSpec)

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
index ddb26e5..e2ec4af 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/table/LoadTable.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.TaskTracker;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.Context;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.load.util.PathUtils;
 import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
-import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ImportSemanticAnalyzer;
@@ -42,16 +42,18 @@ import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
-import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.TreeMap;
@@ -66,17 +68,15 @@ public class LoadTable {
   private final TableContext tableContext;
   private final TaskTracker tracker;
   private final TableEvent event;
-  private final HiveTxnManager txnMgr;
 
   public LoadTable(TableEvent event, Context context, ReplLogger replLogger,
-                   TableContext tableContext, TaskTracker limiter, HiveTxnManager txnMgr)
+                   TableContext tableContext, TaskTracker limiter)
       throws SemanticException, IOException {
     this.event = event;
     this.context = context;
     this.replLogger = replLogger;
     this.tableContext = tableContext;
     this.tracker = new TaskTracker(limiter);
-    this.txnMgr = txnMgr;
   }
 
   private void createTableReplLogTask(String tableName, TableType tableType) throws SemanticException {
@@ -189,9 +189,8 @@ public class LoadTable {
     }
   }
 
-  private void newTableTasks(ImportTableDesc tblDesc) throws SemanticException {
-    Table table;
-    table = new Table(tblDesc.getDatabaseName(), tblDesc.getTableName());
+  private void newTableTasks(ImportTableDesc tblDesc) throws Exception {
+    Table table = tblDesc.toTable(context.hiveConf);
     // Either we're dropping and re-creating, or the table didn't exist, and we're creating.
     Task<?> createTableTask =
         tblDesc.getCreateTableTask(new HashSet<>(), new HashSet<>(), context.hiveConf);
@@ -199,12 +198,22 @@ public class LoadTable {
       tracker.addTask(createTableTask);
       return;
     }
+
+    Task<?> parentTask = createTableTask;
+    if (event.replicationSpec().isTransactionalTableDump()) {
+      List<String> partNames = isPartitioned(tblDesc) ? event.partitions(tblDesc) : null;
+      ReplTxnWork replTxnWork = new ReplTxnWork(tblDesc.getDatabaseName(), tblDesc.getTableName(), partNames,
+              event.replicationSpec().getValidWriteIdList(), ReplTxnWork.OperationType.REPL_WRITEID_STATE);
+      Task<?> replTxnTask = TaskFactory.get(replTxnWork, context.hiveConf);
+      createTableTask.addDependentTask(replTxnTask);
+      parentTask = replTxnTask;
+    }
     if (!isPartitioned(tblDesc)) {
-      LOG.debug("adding dependent CopyWork/MoveWork for table");
+      LOG.debug("adding dependent ReplTxnTask/CopyWork/MoveWork for table");
       Task<?> loadTableTask =
           loadTableTask(table, event.replicationSpec(), new Path(tblDesc.getLocation()),
               event.metadataPath());
-      createTableTask.addDependentTask(loadTableTask);
+      parentTask.addDependentTask(loadTableTask);
     }
     tracker.addTask(createTableTask);
   }
@@ -229,14 +238,20 @@ public class LoadTable {
     Task<?> copyTask =
         ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath, context.hiveConf);
 
-    LoadTableDesc loadTableWork = new LoadTableDesc(
-        tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
-        replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
-        //todo: what is the point of this?  If this is for replication, who would have opened a txn?
-        txnMgr.getCurrentTxnId()
-    );
-    MoveWork moveWork =
-        new MoveWork(new HashSet<>(), new HashSet<>(), loadTableWork, null, false);
+    MoveWork moveWork = new MoveWork(new HashSet<>(), new HashSet<>(), null, null, false);
+    if (AcidUtils.isTransactionalTable(table)) {
+      LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+              Collections.singletonList(tmpPath),
+              Collections.singletonList(tgtPath),
+              true, null, null);
+      moveWork.setMultiFilesDesc(loadFilesWork);
+    } else {
+      LoadTableDesc loadTableWork = new LoadTableDesc(
+              tmpPath, Utilities.getTableDesc(table), new TreeMap<>(),
+              replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING, 0L
+      );
+      moveWork.setLoadTableWork(loadTableWork);
+    }
     Task<?> loadTableTask = TaskFactory.get(moveWork, context.hiveConf);
     copyTask.addDependentTask(loadTableTask);
     return copyTask;

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
index 6fbc657..7eae1ea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/load/util/Context.java
@@ -36,18 +36,16 @@ public class Context {
   taken care when using other methods.
  */
   public final LineageState sessionStateLineageState;
-  public final long currentTransactionId;
 
 
   public Context(HiveConf hiveConf, Hive hiveDb,
-      LineageState lineageState, long currentTransactionId,
+      LineageState lineageState,
       org.apache.hadoop.hive.ql.Context nestedContext) throws MetaException {
     this.hiveConf = hiveConf;
     this.hiveDb = hiveDb;
     this.warehouse = new Warehouse(hiveConf);
     this.pathInfo = new PathInfo(hiveConf);
     sessionStateLineageState = lineageState;
-    this.currentTransactionId = currentTransactionId;
     this.nestedContext = nestedContext;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 445e126..183515a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -1831,6 +1832,41 @@ public class AcidUtils {
     return fileList;
   }
 
+  public static List<Path> getValidDataPaths(Path dataPath, Configuration conf, String validWriteIdStr)
+          throws IOException {
+    List<Path> pathList = new ArrayList<>();
+    if ((validWriteIdStr == null) || validWriteIdStr.isEmpty()) {
+      // If Non-Acid case, then all files would be in the base data path. So, just return it.
+      pathList.add(dataPath);
+      return pathList;
+    }
+
+    // If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList.
+    ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr);
+    Directory acidInfo = AcidUtils.getAcidState(dataPath, conf, validWriteIdList);
+
+    for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
+      pathList.add(hfs.getFileStatus().getPath());
+    }
+    for (ParsedDelta delta : acidInfo.getCurrentDirectories()) {
+      pathList.add(delta.getPath());
+    }
+    if (acidInfo.getBaseDirectory() != null) {
+      pathList.add(acidInfo.getBaseDirectory());
+    }
+    return pathList;
+  }
+
+  public static String getAcidSubDir(Path dataPath) {
+    String dataDir = dataPath.getName();
+    if (dataDir.startsWith(AcidUtils.BASE_PREFIX)
+            || dataDir.startsWith(AcidUtils.DELTA_PREFIX)
+            || dataDir.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+      return dataDir;
+    }
+    return null;
+  }
+
   public static boolean isAcidEnabled(HiveConf hiveConf) {
     String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
     boolean concurrency =  hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 76569d5..515c08b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -690,6 +690,16 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+          throws LockException {
+    try {
+      getMS().replTableWriteIdState(validWriteIdList, dbName, tableName, partNames);
+    } catch (TException e) {
+      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
+
+  @Override
   public void heartbeat() throws LockException {
     List<HiveLock> locks;
     if(isTxnOpen()) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index a74670b..ab9d67e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -235,6 +235,12 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
+  public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+          throws LockException {
+    // No-op
+  }
+
+  @Override
   public void heartbeat() throws LockException {
     // No-op
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index f239535..5f68e08 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -57,7 +57,7 @@ public interface HiveTxnManager {
    * @return The new transaction id.
    * @throws LockException in case of failure to start the transaction.
    */
-  List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user)  throws LockException;
+  List<Long> replOpenTxn(String replPolicy, List<Long> srcTxnIds, String user) throws LockException;
 
   /**
    * Commit the transaction in target cluster.
@@ -65,7 +65,7 @@ public interface HiveTxnManager {
    * @param srcTxnId The id of the transaction at the source cluster
    * @throws LockException in case of failure to commit the transaction.
    */
-  void replCommitTxn(String replPolicy, long srcTxnId)  throws LockException;
+  void replCommitTxn(String replPolicy, long srcTxnId) throws LockException;
 
  /**
    * Abort the transaction in target cluster.
@@ -73,7 +73,18 @@ public interface HiveTxnManager {
    * @param srcTxnId The id of the transaction at the source cluster
    * @throws LockException in case of failure to abort the transaction.
    */
-  void replRollbackTxn(String replPolicy, long srcTxnId)  throws LockException;
+  void replRollbackTxn(String replPolicy, long srcTxnId) throws LockException;
+
+ /**
+  * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark.
+  * @param validWriteIdList Snapshot of writeid list when the table/partition is dumped.
+  * @param dbName Database name
+  * @param tableName Table which is written.
+  * @param partNames List of partitions being written.
+  * @throws LockException in case of failure.
+  */
+  void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames)
+          throws LockException;
 
   /**
    * Get the lock manager.  This must be used rather than instantiating an

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index b850ddc..fa32807 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -327,8 +327,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
     Long writeId = 0L; // Initialize with 0 for non-ACID and non-MM tables.
     int stmtId = 0;
-    if ((tableExists && AcidUtils.isTransactionalTable(table))
-            || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps()))) {
+    if (!replicationSpec.isInReplicationScope()
+            && ((tableExists && AcidUtils.isTransactionalTable(table))
+            || (!tableExists && AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())))) {
       //if importing into existing transactional table or will create a new transactional table
       //(because Export was done from transactional table), need a writeId
       // Explain plan doesn't open a txn and hence no need to allocate write id.

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
index c07991d..8332bcc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
@@ -81,7 +81,7 @@ public class LoadSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
     });
     if ((srcs != null) && srcs.length == 1) {
-      if (srcs[0].isDir()) {
+      if (srcs[0].isDirectory()) {
         srcs = fs.listStatus(srcs[0].getPath(), new PathFilter() {
           @Override
           public boolean accept(Path p) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 05eca1f..562f497 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -368,9 +368,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       }
 
       if ((!evDump) && (tblNameOrPattern != null) && !(tblNameOrPattern.isEmpty())) {
-        ReplLoadWork replLoadWork =
-            new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern, tblNameOrPattern,
-                queryState.getLineageState(), getTxnMgr().getCurrentTxnId());
+        ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
+                tblNameOrPattern, queryState.getLineageState());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
         return;
       }
@@ -401,7 +400,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
 
         ReplLoadWork replLoadWork = new ReplLoadWork(conf, loadPath.toString(), dbNameOrPattern,
-            queryState.getLineageState(), getTxnMgr().getCurrentTxnId());
+                queryState.getLineageState());
         rootTasks.add(TaskFactory.get(replLoadWork, conf));
         //
         //        for (FileStatus dir : dirsInLoadPath) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index 824da21..7d901f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -43,6 +43,7 @@ public class ReplicationSpec {
   private boolean isNoop = false;
   private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in.
   private boolean isReplace = true; // default is that the import mode is insert overwrite
+  private String validWriteIdList = null; // WriteIds snapshot for replicating ACID/MM tables.
   private Type specType = Type.DEFAULT; // DEFAULT means REPL_LOAD or BOOTSTRAP_DUMP or EXPORT
 
   // Key definitions related to replication
@@ -52,7 +53,8 @@ public class ReplicationSpec {
     CURR_STATE_ID("repl.last.id"),
     NOOP("repl.noop"),
     LAZY("repl.lazy"),
-    IS_REPLACE("repl.is.replace")
+    IS_REPLACE("repl.is.replace"),
+    VALID_WRITEID_LIST("repl.valid.writeid.list")
     ;
     private final String keyName;
 
@@ -140,6 +142,7 @@ public class ReplicationSpec {
     this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
     this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
     this.isReplace = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_REPLACE.toString()));
+    this.validWriteIdList = keyFetcher.apply(ReplicationSpec.KEY.VALID_WRITEID_LIST.toString());
   }
 
   /**
@@ -325,6 +328,27 @@ public class ReplicationSpec {
     this.isLazy = isLazy;
   }
 
+  /**
+   * @return the WriteIds snapshot for the current ACID/MM table being replicated
+   */
+  public String getValidWriteIdList() {
+    return validWriteIdList;
+  }
+
+  /**
+   * @param validWriteIdList WriteIds snapshot for the current ACID/MM table being replicated
+   */
+  public void setValidWriteIdList(String validWriteIdList) {
+    this.validWriteIdList = validWriteIdList;
+  }
+
+  /**
+   * @return whether the current replication dumped object related to ACID/Mm table
+   */
+  public boolean isTransactionalTableDump() {
+    return (validWriteIdList != null);
+  }
+
   public String get(KEY key) {
     switch (key){
       case REPL_SCOPE:
@@ -346,6 +370,8 @@ public class ReplicationSpec {
         return String.valueOf(isLazy());
       case IS_REPLACE:
         return String.valueOf(isReplace());
+      case VALID_WRITEID_LIST:
+        return getValidWriteIdList();
     }
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 4e61280..529ea21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -66,43 +66,52 @@ public class CopyUtils {
   // Used by replication, copy files from source to destination. It is possible source file is
   // changed/removed during copy, so double check the checksum after copy,
   // if not match, copy again from cm
-  public void copyAndVerify(FileSystem destinationFs, Path destination,
+  public void copyAndVerify(FileSystem destinationFs, Path destRoot,
                     List<ReplChangeManager.FileInfo> srcFiles) throws IOException, LoginException {
-    Map<FileSystem, List<ReplChangeManager.FileInfo>> map = fsToFileMap(srcFiles);
-    for (Map.Entry<FileSystem, List<ReplChangeManager.FileInfo>> entry : map.entrySet()) {
+    Map<FileSystem, Map< Path, List<ReplChangeManager.FileInfo>>> map = fsToFileMap(srcFiles, destRoot);
+    for (Map.Entry<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> entry : map.entrySet()) {
       FileSystem sourceFs = entry.getKey();
-      List<ReplChangeManager.FileInfo> fileInfoList = entry.getValue();
-      boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList);
+      Map<Path, List<ReplChangeManager.FileInfo>> destMap = entry.getValue();
+      for (Map.Entry<Path, List<ReplChangeManager.FileInfo>> destMapEntry : destMap.entrySet()) {
+        Path destination = destMapEntry.getKey();
+        List<ReplChangeManager.FileInfo> fileInfoList = destMapEntry.getValue();
+        boolean useRegularCopy = regularCopy(destinationFs, sourceFs, fileInfoList);
 
-      doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy);
-
-      // Verify checksum, retry if checksum changed
-      List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>();
-      for (ReplChangeManager.FileInfo srcFile : srcFiles) {
-        if(!srcFile.isUseSourcePath()) {
-          // If already use cmpath, nothing we can do here, skip this file
-          continue;
+        if (!destinationFs.exists(destination)
+                && !FileUtils.mkdir(destinationFs, destination, hiveConf)) {
+          LOG.error("Failed to create destination directory: " + destination);
+          throw new IOException("Destination directory creation failed");
         }
-        String sourceChecksumString = srcFile.getCheckSum();
-        if (sourceChecksumString != null) {
-          String verifySourceChecksumString;
-          try {
-            verifySourceChecksumString
-                    = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs);
-          } catch (IOException e) {
-            // Retry with CM path
-            verifySourceChecksumString = null;
+        doCopyRetry(sourceFs, fileInfoList, destinationFs, destination, useRegularCopy);
+
+        // Verify checksum, retry if checksum changed
+        List<ReplChangeManager.FileInfo> retryFileInfoList = new ArrayList<>();
+        for (ReplChangeManager.FileInfo srcFile : srcFiles) {
+          if (!srcFile.isUseSourcePath()) {
+            // If already use cmpath, nothing we can do here, skip this file
+            continue;
           }
-          if ((verifySourceChecksumString == null)
-                  || !sourceChecksumString.equals(verifySourceChecksumString)) {
-            // If checksum does not match, likely the file is changed/removed, copy again from cm
-            srcFile.setIsUseSourcePath(false);
-            retryFileInfoList.add(srcFile);
+          String sourceChecksumString = srcFile.getCheckSum();
+          if (sourceChecksumString != null) {
+            String verifySourceChecksumString;
+            try {
+              verifySourceChecksumString
+                      = ReplChangeManager.checksumFor(srcFile.getSourcePath(), sourceFs);
+            } catch (IOException e) {
+              // Retry with CM path
+              verifySourceChecksumString = null;
+            }
+            if ((verifySourceChecksumString == null)
+                    || !sourceChecksumString.equals(verifySourceChecksumString)) {
+              // If checksum does not match, likely the file is changed/removed, copy again from cm
+              srcFile.setIsUseSourcePath(false);
+              retryFileInfoList.add(srcFile);
+            }
           }
         }
-      }
-      if (!retryFileInfoList.isEmpty()) {
-        doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy);
+        if (!retryFileInfoList.isEmpty()) {
+          doCopyRetry(sourceFs, retryFileInfoList, destinationFs, destination, useRegularCopy);
+        }
       }
     }
   }
@@ -212,7 +221,7 @@ public class CopyUtils {
     for (Map.Entry<FileSystem, List<Path>> entry : map.entrySet()) {
       final FileSystem sourceFs = entry.getKey();
       List<ReplChangeManager.FileInfo> fileList = Lists.transform(entry.getValue(),
-                                path -> { return new ReplChangeManager.FileInfo(sourceFs, path);});
+              path -> new ReplChangeManager.FileInfo(sourceFs, path, null));
       doCopyOnce(sourceFs, entry.getValue(),
                  destinationFs, destination,
                  regularCopy(destinationFs, sourceFs, fileList));
@@ -287,16 +296,33 @@ public class CopyUtils {
     return result;
   }
 
-  private Map<FileSystem, List<ReplChangeManager.FileInfo>> fsToFileMap(
-      List<ReplChangeManager.FileInfo> srcFiles) throws IOException {
-    Map<FileSystem, List<ReplChangeManager.FileInfo>> result = new HashMap<>();
+  // Create map of source file system to destination path to list of files to copy
+  private Map<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> fsToFileMap(
+      List<ReplChangeManager.FileInfo> srcFiles, Path destRoot) throws IOException {
+    Map<FileSystem, Map<Path, List<ReplChangeManager.FileInfo>>> result = new HashMap<>();
     for (ReplChangeManager.FileInfo file : srcFiles) {
       FileSystem fileSystem = file.getSrcFs();
       if (!result.containsKey(fileSystem)) {
-        result.put(fileSystem, new ArrayList<ReplChangeManager.FileInfo>());
+        result.put(fileSystem, new HashMap<>());
       }
-      result.get(fileSystem).add(file);
+      Path destination = getCopyDestination(file, destRoot);
+      if (!result.get(fileSystem).containsKey(destination)) {
+        result.get(fileSystem).put(destination, new ArrayList<>());
+      }
+      result.get(fileSystem).get(destination).add(file);
     }
     return result;
   }
+
+  private Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) {
+    if (fileInfo.getSubDir() == null) {
+      return destRoot;
+    }
+    String[] subDirs = fileInfo.getSubDir().split(Path.SEPARATOR);
+    Path destination = destRoot;
+    for (String subDir: subDirs) {
+      destination = new Path(destination, subDir);
+    }
+    return destination;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e8651cb9/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
index 5c1850c..4b2812e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/BootStrapReplicationSpecFunction.java
@@ -24,16 +24,18 @@ import org.apache.hadoop.hive.ql.parse.SemanticException;
 
 class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<ReplicationSpec> {
   private final Hive db;
+  private final long currentNotificationId;
 
-  BootStrapReplicationSpecFunction(Hive db) {
+  BootStrapReplicationSpecFunction(Hive db, long currentNotificationId) {
     this.db = db;
+    this.currentNotificationId = currentNotificationId;
   }
 
   @Override
   public ReplicationSpec fromMetaStore() throws HiveException {
     try {
-      long currentNotificationId = db.getMSC()
-              .getCurrentNotificationEventId().getEventId();
+      long currentReplicationState = (this.currentNotificationId > 0)
+              ? this.currentNotificationId : db.getMSC().getCurrentNotificationEventId().getEventId();
       ReplicationSpec replicationSpec =
           new ReplicationSpec(
               true,
@@ -45,7 +47,7 @@ class BootStrapReplicationSpecFunction implements HiveWrapper.Tuple.Function<Rep
               false
           );
 
-      replicationSpec.setCurrentReplicationState(String.valueOf(currentNotificationId));
+      replicationSpec.setCurrentReplicationState(String.valueOf(currentReplicationState));
       return replicationSpec;
     } catch (Exception e) {
       throw new SemanticException(e);