You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2019/04/12 11:23:51 UTC

[hive] branch master updated: HIVE-21109: Support stats replication for ACID tables (Ashutosh Bapat, reviewed by Sankar Hariappan)

This is an automated email from the ASF dual-hosted git repository.

sankarh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6af1b  HIVE-21109: Support stats replication for ACID tables (Ashutosh Bapat, reviewed by Sankar Hariappan)
ec6af1b is described below

commit ec6af1bc775b8bb8807e6bcdb86afb05b11ac833
Author: Ashutosh Bapat <ab...@cloudera.com>
AuthorDate: Fri Apr 12 16:53:09 2019 +0530

    HIVE-21109: Support stats replication for ACID tables (Ashutosh Bapat, reviewed by Sankar Hariappan)
    
    Signed-off-by: Sankar Hariappan <sa...@apache.org>
---
 .../ql/parse/TestStatsReplicationScenarios.java    | 405 ++++++++++++++++++---
 ...java => TestStatsReplicationScenariosACID.java} |  22 +-
 ...StatsReplicationScenariosACIDNoAutogather.java} |  20 +-
 ...r.java => TestStatsReplicationScenariosMM.java} |  20 +-
 ...stStatsReplicationScenariosMMNoAutogather.java} |  23 +-
 .../TestStatsReplicationScenariosMigration.java    |  73 ++++
 ...sReplicationScenariosMigrationNoAutogather.java |  73 ++++
 .../TestStatsReplicationScenariosNoAutogather.java |   7 +-
 .../hadoop/hive/ql/parse/WarehouseInstance.java    |  14 +-
 .../hadoop/hive/ql/ddl/table/CreateTableDesc.java  |  22 +-
 .../hive/ql/ddl/table/CreateTableOperation.java    |  17 +-
 .../hadoop/hive/ql/exec/ColumnStatsUpdateTask.java |  52 ++-
 .../org/apache/hadoop/hive/ql/exec/DDLTask.java    |  36 +-
 .../org/apache/hadoop/hive/ql/exec/MoveTask.java   |   9 +-
 .../apache/hadoop/hive/ql/exec/ReplCopyTask.java   |   7 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java     |   3 -
 .../bootstrap/events/filesystem/FSTableEvent.java  |  18 +-
 .../hadoop/hive/ql/exec/repl/util/ReplUtils.java   |  78 +++-
 .../org/apache/hadoop/hive/ql/metadata/Hive.java   | 106 ++++--
 .../org/apache/hadoop/hive/ql/metadata/Table.java  |   7 -
 .../hive/ql/parse/ImportSemanticAnalyzer.java      |  92 +++--
 .../hive/ql/parse/repl/dump/TableExport.java       |  11 +-
 .../repl/dump/events/UpdatePartColStatHandler.java |   7 -
 .../dump/events/UpdateTableColStatHandler.java     |   5 -
 .../repl/load/message/RenamePartitionHandler.java  |  36 +-
 .../repl/load/message/RenameTableHandler.java      |  13 +-
 .../load/message/UpdatePartColStatHandler.java     |  13 +-
 .../load/message/UpdateTableColStatHandler.java    |  16 +-
 .../hadoop/hive/ql/plan/AddPartitionDesc.java      |   5 +
 .../hadoop/hive/ql/plan/ColumnStatsUpdateWork.java |  10 +-
 .../hadoop/hive/ql/plan/ImportTableDesc.java       |  10 +-
 .../hadoop/hive/ql/plan/RenamePartitionDesc.java   |   2 +
 .../hadoop/hive/metastore/HiveMetaStore.java       |  71 +++-
 .../apache/hadoop/hive/metastore/ObjectStore.java  |  27 +-
 .../hadoop/hive/metastore/cache/CachedStore.java   |   2 +-
 .../ptest2/conf/deployed/master-mr2.properties     |  10 +-
 36 files changed, 1018 insertions(+), 324 deletions(-)

diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
index 1ec4498..94eb1ff 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenarios.java
@@ -20,10 +20,17 @@ package org.apache.hadoop.hive.ql.parse;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
+import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
 import org.apache.hadoop.hive.shims.Utils;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -43,6 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 
 import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
 
@@ -60,35 +68,52 @@ public class TestStatsReplicationScenarios {
   private static HiveConf conf;
   private static boolean hasAutogather;
 
+  enum AcidTableKind {
+    FULL_ACID,
+    INSERT_ONLY
+  }
+
+  private static AcidTableKind acidTableKindToUse;
+
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     Map<String, String> overrides = new HashMap<>();
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, true);
+    internalBeforeClassSetup(overrides, overrides, TestReplicationScenarios.class, true, null);
   }
 
-  static void internalBeforeClassSetup(Map<String, String> overrides, Class clazz,
-                                       boolean autogather)
+  static void internalBeforeClassSetup(Map<String, String> primaryOverrides,
+                                       Map<String, String> replicaOverrides, Class clazz,
+                                       boolean autogather, AcidTableKind acidTableKind)
       throws Exception {
     conf = new HiveConf(clazz);
     conf.set("dfs.client.use.datanode.hostname", "true");
     conf.set("hadoop.proxyuser." + Utils.getUGI().getShortUserName() + ".hosts", "*");
     MiniDFSCluster miniDFSCluster =
         new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true).build();
-    Map<String, String> localOverrides = new HashMap<String, String>() {{
+    Map<String, String> additionalOverrides = new HashMap<String, String>() {{
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put(HiveConf.ConfVars.HIVE_IN_TEST_REPL.varname, "true");
       }};
-    localOverrides.putAll(overrides);
-    replica = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+    Map<String, String> overrides = new HashMap<>();
+
+    overrides.putAll(additionalOverrides);
+    overrides.putAll(replicaOverrides);
+    replica = new WarehouseInstance(LOG, miniDFSCluster, overrides);
 
     // Run with autogather false on primary if requested
     hasAutogather = autogather;
-    localOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
-                        autogather ? "true" : "false");
-    primary = new WarehouseInstance(LOG, miniDFSCluster, localOverrides);
+    additionalOverrides.put(HiveConf.ConfVars.HIVESTATSAUTOGATHER.varname,
+            autogather ? "true" : "false");
+    overrides.clear();
+    overrides.putAll(additionalOverrides);
+    overrides.putAll(primaryOverrides);
+    primary = new WarehouseInstance(LOG, miniDFSCluster, overrides);
+
+    // Use transactional tables
+    acidTableKindToUse = acidTableKind;
   }
 
   @AfterClass
@@ -128,7 +153,8 @@ public class TestStatsReplicationScenarios {
 
   private void verifyReplicatedStatsForTable(String tableName) throws Throwable {
     // Test column stats
-    Assert.assertEquals(primary.getTableColumnStatistics(primaryDbName, tableName),
+    Assert.assertEquals("Mismatching column statistics for  table " + tableName,
+                        primary.getTableColumnStatistics(primaryDbName, tableName),
                         replica.getTableColumnStatistics(replicatedDbName, tableName));
 
     // Test table level stats
@@ -136,8 +162,9 @@ public class TestStatsReplicationScenarios {
             collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
     Map<String, String> pParams =
             collectStatsParams(primary.getTable(primaryDbName, tableName).getParameters());
-    Assert.assertEquals(pParams, rParams);
+    Assert.assertEquals("Mismatch in stats parameters for table " + tableName, pParams, rParams);
 
+    primary.getTable(primaryDbName, tableName).getPartitionKeys();
     verifyReplicatedStatsForPartitionsOfTable(tableName);
   }
 
@@ -151,18 +178,24 @@ public class TestStatsReplicationScenarios {
       return;
     }
 
+    List<FieldSchema> partKeys = primary.getTable(primaryDbName, tableName).getPartitionKeys();
     for (Partition pPart : pParts) {
       Partition rPart = replica.getPartition(replicatedDbName, tableName,
               pPart.getValues());
 
       Map<String, String> rParams = collectStatsParams(rPart.getParameters());
       Map<String, String> pParams = collectStatsParams(pPart.getParameters());
-      Assert.assertEquals(pParams, rParams);
+      String partName = Warehouse.makePartName(partKeys, pPart.getValues());
+      Assert.assertEquals("Mismatch in stats parameters for partition " + partName + " of table " + tableName,
+                          pParams, rParams);
+
+      // Test partition column stats for the partition
+      Assert.assertEquals("Mismatching column statistics for partition " + partName + "of table " + tableName,
+                          primary.getPartitionColumnStatistics(primaryDbName, tableName, partName,
+                                  StatsSetupConst.getColumnsHavingStats(pParams)),
+                          replica.getPartitionColumnStatistics(replicatedDbName, tableName, partName,
+                                  StatsSetupConst.getColumnsHavingStats(rParams)));
     }
-
-    // Test partition column stats for all partitions
-    Assert.assertEquals(primary.getAllPartitionColumnStatistics(primaryDbName, tableName),
-                        replica.getAllPartitionColumnStatistics(replicatedDbName, tableName));
   }
 
   private void verifyNoStatsReplicationForMetadataOnly(String tableName) throws Throwable {
@@ -170,17 +203,18 @@ public class TestStatsReplicationScenarios {
     Assert.assertTrue(replica.getTableColumnStatistics(replicatedDbName, tableName).isEmpty());
 
     // When no data is replicated, the basic stats parameters for table should look as if it's a
-    // new table created on replica. Based on the create table rules the basic stats may be true
-    // or false. Either is fine with us so don't bother checking exact values.
+    // new table created on replica i.e. zero or null.
     Map<String, String> rParams =
             collectStatsParams(replica.getTable(replicatedDbName, tableName).getParameters());
-    Map<String, String> expectedFalseParams = new HashMap<>();
-    Map<String, String> expectedTrueParams = new HashMap<>();
-    StatsSetupConst.setStatsStateForCreateTable(expectedTrueParams,
-            replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.TRUE);
-    StatsSetupConst.setStatsStateForCreateTable(expectedFalseParams,
-            replica.getTableColNames(replicatedDbName, tableName), StatsSetupConst.FALSE);
-    Assert.assertTrue(rParams.equals(expectedFalseParams) || rParams.equals(expectedTrueParams));
+    for (String param : StatsSetupConst.SUPPORTED_STATS) {
+      String val = rParams.get(param);
+      Assert.assertTrue("parameter " + param + " of table " + tableName + " is expected to be " +
+              "null or 0", val == null || val.trim().equals("0"));
+    }
+
+    // As long as the above conditions are met, it doesn't matter whether basic and column stats
+    // state are set to true or false. If those are false, actual values are immaterial. If they
+    // are true, the values assured above represent the correct state of no data.
 
     verifyNoPartitionStatsReplicationForMetadataOnly(tableName);
   }
@@ -196,7 +230,8 @@ public class TestStatsReplicationScenarios {
 
     // Partitions are not replicated in metadata only replication.
     List<Partition> rParts = replica.getAllPartitions(replicatedDbName, tableName);
-    Assert.assertTrue(rParts == null || rParts.isEmpty());
+    Assert.assertTrue("Partitions replicated in a metadata only dump",
+            rParts == null || rParts.isEmpty());
 
     // Test partition column stats for all partitions
     Map<String, List<ColumnStatisticsObj>> rPartColStats =
@@ -207,6 +242,18 @@ public class TestStatsReplicationScenarios {
     }
   }
 
+  private String getCreateTableProperties() {
+    if (acidTableKindToUse == AcidTableKind.FULL_ACID) {
+      return " stored as orc TBLPROPERTIES('transactional'='true')";
+    }
+
+    if (acidTableKindToUse == AcidTableKind.INSERT_ONLY) {
+      return " TBLPROPERTIES('transactional'='true', 'transactional_properties'='insert_only')";
+    }
+
+    return "";
+  }
+
   private List<String> createBootStrapData() throws Throwable {
     // Unpartitioned table with data
     String simpleTableName = "sTable";
@@ -216,16 +263,19 @@ public class TestStatsReplicationScenarios {
     String ndTableName = "ndTable";
     // Partitioned table without data during bootstrap and hence no stats.
     String ndPartTableName = "ndPTable";
+    String tblCreateExtra = getCreateTableProperties();
 
     primary.run("use " + primaryDbName)
-            .run("create table " + simpleTableName + " (id int)")
+            .run("create table " + simpleTableName + " (id int)" + tblCreateExtra)
             .run("insert into " + simpleTableName + " values (1), (2)")
-            .run("create table " + partTableName + " (place string) partitioned by (country string)")
+            .run("create table " + partTableName + " (place string) partitioned by (country string)"
+                    + tblCreateExtra)
             .run("insert into " + partTableName + " partition(country='india') values ('bangalore')")
             .run("insert into " + partTableName + " partition(country='us') values ('austin')")
             .run("insert into " + partTableName + " partition(country='france') values ('paris')")
-            .run("create table " + ndTableName + " (str string)")
-            .run("create table " + ndPartTableName + " (val string) partitioned by (pk int)");
+            .run("create table " + ndTableName + " (str string)" + tblCreateExtra)
+            .run("create table " + ndPartTableName + " (val string) partitioned by (pk int)" +
+                    tblCreateExtra);
 
     List<String> tableNames = new ArrayList<>(Arrays.asList(simpleTableName, partTableName,
             ndTableName, ndPartTableName));
@@ -246,13 +296,14 @@ public class TestStatsReplicationScenarios {
    * Dumps primarydb on primary, loads it on replica as replicadb, verifies that the statistics
    * loaded are same as the ones on primary.
    * @param tableNames, names of tables on primary expected to be loaded
-   * @param lastReplicationId of the last dump, for incremental dump/load
    * @param parallelLoad, if true, parallel bootstrap load is used
    * @param metadataOnly, only metadata is dumped and loaded.
+   * @param lastReplicationId of the last dump, for incremental dump/load
+   * @param failRetry
    * @return lastReplicationId of the dump performed.
    */
   private String dumpLoadVerify(List<String> tableNames, String lastReplicationId,
-                                boolean parallelLoad, boolean metadataOnly)
+                                boolean parallelLoad, boolean metadataOnly, boolean failRetry)
           throws Throwable {
     List<String> withClauseList;
     // Parallel load works only for bootstrap.
@@ -269,15 +320,24 @@ public class TestStatsReplicationScenarios {
     WarehouseInstance.Tuple dumpTuple = primary.run("use " + primaryDbName)
             .dump(primaryDbName, lastReplicationId, withClauseList);
 
+
     // Load, if necessary changing configuration.
     if (parallelLoad) {
       replica.hiveConf.setBoolVar(HiveConf.ConfVars.EXECPARALLEL, true);
     }
 
-    replica.load(replicatedDbName, dumpTuple.dumpLocation)
-            .run("use " + replicatedDbName)
-            .run("show tables")
-            .verifyResults(tableNames.toArray(new String[1]));
+    // Fail load if for testing failure and retry scenario. Fail the load while setting
+    // checkpoint for a table in the middle of list of tables.
+    if (failRetry) {
+      if (lastReplicationId == null) {
+        failBootstrapLoad(dumpTuple, tableNames.size()/2);
+      } else {
+        failIncrementalLoad(dumpTuple);
+      }
+    }
+
+    // Load, possibly a retry
+    replica.load(replicatedDbName, dumpTuple.dumpLocation);
 
     // Metadata load may not load all the events.
     if (!metadataOnly) {
@@ -301,12 +361,108 @@ public class TestStatsReplicationScenarios {
     return dumpTuple.lastReplicationId;
   }
 
+  /**
+   * Run a bootstrap that will fail.
+   * @param tuple the location of bootstrap dump
+   */
+  private void failBootstrapLoad(WarehouseInstance.Tuple tuple, int failAfterNumTables) throws Throwable {
+    // fail setting ckpt directory property for the second table so that we test the case when
+    // bootstrap load fails after some but not all tables are loaded.
+    BehaviourInjection<CallerArguments, Boolean> callerVerifier
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      int cntTables = 0;
+      String prevTable = null;
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        if (prevTable == null ||
+                !prevTable.equalsIgnoreCase(args.tblName)) {
+          cntTables++;
+        }
+        prevTable = args.tblName;
+        if (args.dbName.equalsIgnoreCase(replicatedDbName) && cntTables > failAfterNumTables) {
+          injectionPathCalled = true;
+          LOG.warn("Verifier - DB : " + args.dbName + " TABLE : " + args.tblName);
+          return false;
+        }
+        return true;
+      }
+    };
+
+    InjectableBehaviourObjectStore.setAlterTableModifier(callerVerifier);
+    try {
+      replica.loadFailure(replicatedDbName, tuple.dumpLocation);
+      callerVerifier.assertInjectionsPerformed(true, false);
+    } finally {
+      InjectableBehaviourObjectStore.resetAlterTableModifier();
+    }
+  }
+
+  private void failIncrementalLoad(WarehouseInstance.Tuple dumpTuple) throws Throwable {
+    // fail add notification when second update table stats event is encountered. Thus we
+    // test successful application as well as failed application of this event.
+    BehaviourInjection<NotificationEvent, Boolean> callerVerifier
+            = new BehaviourInjection<NotificationEvent, Boolean>() {
+      int cntEvents = 0;
+      @Override
+      public Boolean apply(NotificationEvent entry) {
+        cntEvents++;
+        if (entry.getEventType().equalsIgnoreCase(EventMessage.EventType.UPDATE_TABLE_COLUMN_STAT.toString()) &&
+            cntEvents > 1) {
+          injectionPathCalled = true;
+          LOG.warn("Verifier - DB: " + entry.getDbName()
+                  + " Table: " + entry.getTableName()
+                  + " Event: " + entry.getEventType());
+          return false;
+        }
+        return true;
+      }
+    };
+
+    InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
+    try {
+      replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+    } finally {
+      InjectableBehaviourObjectStore.resetAddNotificationModifier();
+    }
+    callerVerifier.assertInjectionsPerformed(true, false);
+
+    // fail add notification when second update partition stats event is encountered. Thus we test
+    // successful application as well as failed application of this event.
+    callerVerifier = new BehaviourInjection<NotificationEvent, Boolean>() {
+      int cntEvents = 0;
+
+      @Override
+      public Boolean apply(NotificationEvent entry) {
+        cntEvents++;
+        if (entry.getEventType().equalsIgnoreCase(EventMessage.EventType.UPDATE_PARTITION_COLUMN_STAT.toString()) &&
+            cntEvents > 1) {
+          injectionPathCalled = true;
+          LOG.warn("Verifier - DB: " + entry.getDbName()
+                  + " Table: " + entry.getTableName()
+                  + " Event: " + entry.getEventType());
+          return false;
+        }
+        return true;
+      }
+    };
+
+    InjectableBehaviourObjectStore.setAddNotificationModifier(callerVerifier);
+    try {
+      replica.loadFailure(replicatedDbName, dumpTuple.dumpLocation);
+    } finally {
+      InjectableBehaviourObjectStore.resetAddNotificationModifier();
+    }
+    callerVerifier.assertInjectionsPerformed(true, false);
+  }
+
   private void createIncrementalData(List<String> tableNames) throws Throwable {
     // Annotations for this table are same as createBootStrapData
     String simpleTableName = "sTable";
     String partTableName = "pTable";
     String ndTableName = "ndTable";
     String ndPartTableName = "ndPTable";
+    String tblCreateExtra = getCreateTableProperties();
 
     Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
                                                          ndTableName, ndPartTableName)));
@@ -324,14 +480,15 @@ public class TestStatsReplicationScenarios {
             .run("insert into " + partTableName + "(country, place) values ('us', 'chicago')")
             // new partition
             .run("insert into " + partTableName + "(country, place) values ('australia', 'perth')")
-            .run("create table " + incTableName + " (config string, enabled boolean)")
+            .run("create table " + incTableName + " (config string, enabled boolean)" +
+                    tblCreateExtra)
             .run("insert into " + incTableName + " values ('conf1', true)")
             .run("insert into " + incTableName + " values ('conf2', false)")
             .run("insert into " + ndPartTableName + "(pk, val) values (1, 'one')")
             .run("insert into " + ndPartTableName + "(pk, val) values (1, 'another one')")
             .run("insert into " + ndPartTableName + "(pk, val) values (2, 'two')")
             .run("create table " + incPartTableName +
-                    "(val string) partitioned by (tvalue boolean)")
+                    "(val string) partitioned by (tvalue boolean)" + tblCreateExtra)
             .run("insert into " + incPartTableName + "(tvalue, val) values (true, 'true')")
             .run("insert into " + incPartTableName + "(tvalue, val) values (false, 'false')");
 
@@ -347,29 +504,183 @@ public class TestStatsReplicationScenarios {
     }
   }
 
-  private void testStatsReplicationCommon(boolean parallelBootstrap, boolean metadataOnly) throws Throwable {
+  private void applyDMLOperations(List<String> tableNames) throws Throwable {
+    // Annotations for this table are same as createBootStrapData
+    String simpleTableName = "sTable";
+    String partTableName = "pTable";
+    String ndTableName = "ndTable";
+    String ndPartTableName = "ndPTable";
+    String incTableName = "iTable"; // New table
+    String tblCreateExtra = getCreateTableProperties();
+
+    Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
+            ndTableName, ndPartTableName, incTableName)));
+
+    String ctasTableName = "ctasTable"; // Table created through CTAS
+    String ctasPartTableName = "ctasPartTable"; // Table created through CTAS
+    // Tables created through import
+    String eximTableName = "eximTable";
+    String eximPartTableName = "eximPartTable";
+    // Tables created through load
+    String loadTableName = "loadTable";
+    String loadPartTableName = "loadPartTable";
+
+    String exportPath = "'hdfs:///tmp/" + primaryDbName + "/" + incTableName + "/'";
+    String exportPartPath = "'hdfs:///tmp/" + primaryDbName + "/" + partTableName + "/'";
+    String localDir = "./test.dat";
+    String inPath = localDir + "/000000_0";
+    String tableStorage = "";
+    if (acidTableKindToUse == AcidTableKind.FULL_ACID) {
+      tableStorage = "stored as orc";
+    }
+
+    primary.run("use " + primaryDbName)
+            // insert overwrite
+            .run("insert overwrite table " + simpleTableName + " values (5), (6), (7)")
+            .run("insert overwrite table " + partTableName + " partition (country='india') " +
+                    " values ('bombay')")
+            // truncate
+            .run("truncate table " + ndTableName)
+            .run("truncate table " + ndPartTableName + " partition (pk=1)")
+            // CTAS
+            .run("create table " + ctasTableName + " as select * from " + incTableName)
+            .run("create table " + ctasPartTableName + " as select * from " + partTableName)
+            // Import
+            .run("export table " + partTableName + " to " + exportPartPath)
+            .run("import table " + eximPartTableName + " from " + exportPartPath)
+            .run("export table " + incTableName + " to " + exportPath)
+            .run("import table " + eximTableName + " from " + exportPath)
+            // load
+            .run("insert overwrite local directory '" + localDir + "'" + tableStorage + " select " +
+                    "* from " + simpleTableName)
+            .run("create table " + loadTableName + " (id int)" + tblCreateExtra)
+            .run("load data local inpath '" + inPath + "' overwrite into table " + loadTableName)
+            .run("create table " + loadPartTableName + " (id int) partitioned by (key int) " + tblCreateExtra)
+            .run("load data local inpath '" + inPath + "' overwrite into table "
+                    + loadPartTableName + " partition (key=1)");
+
+    tableNames.add(ctasTableName);
+    tableNames.add(ctasPartTableName);
+    tableNames.add(eximTableName);
+    tableNames.add(eximPartTableName);
+    tableNames.add(loadTableName);
+    tableNames.add(loadPartTableName);
+
+    // Run analyze on each of the tables, if they are not being gathered automatically.
+    if (!hasAutogather) {
+      for (String name : tableNames) {
+        primary.run("use " + primaryDbName)
+                .run("analyze table " + name + " compute statistics for columns");
+      }
+    }
+  }
+
+  private void applyTransactionalDMLOperations(List<String> tableNames) throws Throwable {
+    // Annotations for this table are same as createBootStrapData
+    String partTableName = "pTable";
+    String ndTableName = "ndTable";
+    String incTableName = "iTable";
+    String eximTableName = "eximTable";
+    String eximPartTableName = "eximPartTable";
+
+    Assert.assertTrue(tableNames.containsAll(Arrays.asList(partTableName, ndTableName,
+            eximPartTableName, eximTableName, incTableName)));
+
+    primary.run("update " + partTableName + " set place = 'mumbai' where place = 'bombay'")
+           .run("delete from " + partTableName + " where place = 'chicago'")
+            .run("merge into " + eximPartTableName + " as T using " + partTableName + " as U "
+                    + " on T.country = U.country "
+                    + " when matched and T.place != U.place then update set place = U.place"
+                    + " when not matched then insert values (U.country, U.place)")
+            .run("update " + incTableName + " set enabled = false where config = 'conf1'")
+            .run("merge into " + eximTableName + " as T using " + incTableName + " as U "
+                    + " on T.config = U.config"
+                    + " when matched and T.enabled != U.enabled then update set enabled = U.enabled"
+                    + " when not matched then insert values (U.config, U.enabled)")
+           .run("delete from " + ndTableName);
+
+    // Run analyze on each of the tables, if they are not being gathered automatically.
+    if (!hasAutogather) {
+      for (String name : tableNames) {
+        primary.run("use " + primaryDbName)
+                .run("analyze table " + name + " compute statistics for columns");
+      }
+    }
+  }
+
+  private void applyDDLOperations(List<String> tableNames) throws Throwable {
+    // Annotations for this table are same as createBootStrapData
+    String simpleTableName = "sTable";
+    String partTableName = "pTable";
+    String incTableName = "iTable";
+    String ctasTableName = "ctasTable"; // Table created through CTAS
+
+    Assert.assertTrue(tableNames.containsAll(Arrays.asList(simpleTableName, partTableName,
+            incTableName, ctasTableName)));
+
+    String renamedTableName = "rnTable";
+
+    primary.run("use " + primaryDbName)
+            .run("alter table " + simpleTableName + " add columns (val int)")
+            .run("alter table " + incTableName + " change config configuration string")
+            .run("alter table " + ctasTableName + " rename to " + renamedTableName)
+            .run("alter table " + partTableName +
+                    " partition(country='us') rename to partition (country='usa')");
+
+    tableNames.remove(ctasTableName);
+    tableNames.add(renamedTableName);
+  }
+
+  private void testStatsReplicationCommon(boolean parallelBootstrap, boolean metadataOnly,
+                                          boolean failRetry) throws Throwable {
     List<String> tableNames = createBootStrapData();
     String lastReplicationId = dumpLoadVerify(tableNames, null, parallelBootstrap,
-            metadataOnly);
+            metadataOnly, failRetry);
 
     // Incremental dump
     createIncrementalData(tableNames);
     lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
-            metadataOnly);
+            metadataOnly, failRetry);
+
+    // Incremental dump with Insert overwrite operation
+    applyDMLOperations(tableNames);
+    lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+            metadataOnly, false);
+
+    // Incremental dump with transactional DML operations
+    if (acidTableKindToUse == AcidTableKind.FULL_ACID) {
+      applyTransactionalDMLOperations(tableNames);
+      lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+              metadataOnly, false);
+    }
+
+    // Incremental dump with DDL operations
+    applyDDLOperations(tableNames);
+    lastReplicationId = dumpLoadVerify(tableNames, lastReplicationId, parallelBootstrap,
+            metadataOnly, false);
+  }
+
+  @Test
+  public void testNonParallelBootstrapLoad() throws Throwable {
+    LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+    testStatsReplicationCommon(false, false, false);
   }
 
   @Test
-  public void testForNonAcidTables() throws Throwable {
-    testStatsReplicationCommon(false, false);
+  public void testForParallelBootstrapLoad() throws Throwable {
+    LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+    testStatsReplicationCommon(true, false, false);
   }
 
   @Test
-  public void testForNonAcidTablesParallelBootstrapLoad() throws Throwable {
-    testStatsReplicationCommon(true, false);
+  public void testMetadataOnlyDump() throws Throwable {
+    LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+    testStatsReplicationCommon(false, true, false);
   }
 
   @Test
-  public void testNonAcidMetadataOnlyDump() throws Throwable {
-    testStatsReplicationCommon(false, true);
+  public void testRetryFailure() throws Throwable {
+    LOG.info("Testing " + testName.getClass().getName() + "." + testName.getMethodName());
+    testStatsReplicationCommon(false, false, true);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACID.java
similarity index 67%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACID.java
index 51f8dfb..ea42e0c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACID.java
@@ -23,31 +23,31 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncod
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
  */
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosACID extends TestStatsReplicationScenarios {
   @Rule
   public final TestName testName = new TestName();
 
-  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
-  static WarehouseInstance primary;
-  private static WarehouseInstance replica;
-  private String primaryDbName, replicatedDbName;
-  private static HiveConf conf;
-
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     Map<String, String> overrides = new HashMap<>();
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+              "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(), "false");
+    overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname, "1s");
+    overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+    internalBeforeClassSetup(overrides, overrides, TestStatsReplicationScenariosACID.class, true,
+            AcidTableKind.FULL_ACID);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACIDNoAutogather.java
similarity index 67%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACIDNoAutogather.java
index 51f8dfb..bf744af 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosACIDNoAutogather.java
@@ -30,24 +30,26 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
  */
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosACIDNoAutogather extends TestStatsReplicationScenarios {
   @Rule
   public final TestName testName = new TestName();
 
-  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
-  static WarehouseInstance primary;
-  private static WarehouseInstance replica;
-  private String primaryDbName, replicatedDbName;
-  private static HiveConf conf;
-
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     Map<String, String> overrides = new HashMap<>();
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+              "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(),"false");
+    overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname,"1s");
+    overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+    internalBeforeClassSetup(overrides, overrides,
+            TestStatsReplicationScenariosACIDNoAutogather.class, false, AcidTableKind.FULL_ACID);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMM.java
similarity index 68%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMM.java
index 51f8dfb..4dc6558 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMM.java
@@ -30,24 +30,26 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
  */
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosMM extends TestStatsReplicationScenarios {
   @Rule
   public final TestName testName = new TestName();
 
-  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
-  static WarehouseInstance primary;
-  private static WarehouseInstance replica;
-  private String primaryDbName, replicatedDbName;
-  private static HiveConf conf;
-
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     Map<String, String> overrides = new HashMap<>();
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+              "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(),"false");
+    overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname,"1s");
+    overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+    internalBeforeClassSetup(overrides, overrides, TestStatsReplicationScenariosMM.class, true,
+            AcidTableKind.INSERT_ONLY);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMMNoAutogather.java
similarity index 65%
copy from itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
copy to itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMMNoAutogather.java
index 51f8dfb..e1ee7b2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMMNoAutogather.java
@@ -23,31 +23,32 @@ import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncod
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * Tests statistics replication when statistics are collected using ANALYZE command.
+ * Tests statistics replication for ACID tables.
  */
-public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicationScenarios {
+public class TestStatsReplicationScenariosMMNoAutogather extends TestStatsReplicationScenarios {
   @Rule
   public final TestName testName = new TestName();
 
-  protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
-  static WarehouseInstance primary;
-  private static WarehouseInstance replica;
-  private String primaryDbName, replicatedDbName;
-  private static HiveConf conf;
-
   @BeforeClass
   public static void classLevelSetup() throws Exception {
     Map<String, String> overrides = new HashMap<>();
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
+    overrides.put(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "true");
+    overrides.put(HiveConf.ConfVars.HIVE_TXN_MANAGER.varname,
+              "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+    overrides.put(MetastoreConf.ConfVars.CAPABILITY_CHECK.getHiveName(), "false");
+    overrides.put(HiveConf.ConfVars.REPL_BOOTSTRAP_DUMP_OPEN_TXN_TIMEOUT.varname, "1s");
+    overrides.put(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE.varname, "nonstrict");
+    overrides.put("mapred.input.dir.recursive", "true");
+
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+    internalBeforeClassSetup(overrides, overrides,
+            TestStatsReplicationScenariosMMNoAutogather.class, false, AcidTableKind.INSERT_ONLY);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java
new file mode 100644
index 0000000..49ad718
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigration.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests statistics replication for ACID tables.
+ */
+public class TestStatsReplicationScenariosMigration extends TestStatsReplicationScenarios {
+  @Rule
+  public final TestName testName = new TestName();
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    Map<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+            GzipJSONMessageEncoder.class.getCanonicalName());
+
+    Map<String, String> replicaConfigs = new HashMap<String, String>() {{
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.strict.managed.tables", "true");
+    }};
+    replicaConfigs.putAll(overrides);
+
+    Map<String, String> primaryConfigs = new HashMap<String, String>() {{
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.support.concurrency", "false");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+      put("hive.strict.managed.tables", "false");
+    }};
+    primaryConfigs.putAll(overrides);
+
+    internalBeforeClassSetup(primaryConfigs, replicaConfigs,
+            TestStatsReplicationScenariosMigration.class, true, null);
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java
new file mode 100644
index 0000000..3b05220
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosMigrationNoAutogather.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests statistics replication for ACID tables.
+ */
+public class TestStatsReplicationScenariosMigrationNoAutogather extends TestStatsReplicationScenarios {
+  @Rule
+  public final TestName testName = new TestName();
+
+  @BeforeClass
+  public static void classLevelSetup() throws Exception {
+    Map<String, String> overrides = new HashMap<>();
+    overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
+            GzipJSONMessageEncoder.class.getCanonicalName());
+
+    Map<String, String> replicaConfigs = new HashMap<String, String>() {{
+      put("hive.support.concurrency", "true");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.strict.managed.tables", "true");
+    }};
+    replicaConfigs.putAll(overrides);
+
+    Map<String, String> primaryConfigs = new HashMap<String, String>() {{
+      put("hive.metastore.client.capability.check", "false");
+      put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+      put("hive.exec.dynamic.partition.mode", "nonstrict");
+      put("hive.strict.checks.bucketing", "false");
+      put("hive.mapred.mode", "nonstrict");
+      put("mapred.input.dir.recursive", "true");
+      put("hive.metastore.disallow.incompatible.col.type.changes", "false");
+      put("hive.support.concurrency", "false");
+      put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
+      put("hive.strict.managed.tables", "false");
+    }};
+    primaryConfigs.putAll(overrides);
+
+    internalBeforeClassSetup(primaryConfigs, replicaConfigs,
+            TestStatsReplicationScenariosMigrationNoAutogather.class, false, null);
+  }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
index 51f8dfb..2d7e9c7 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestStatsReplicationScenariosNoAutogather.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hive.ql.parse;
 
-import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.messaging.json.gzip.GzipJSONMessageEncoder;
 import org.junit.BeforeClass;
@@ -38,9 +37,6 @@ public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicat
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
   static WarehouseInstance primary;
-  private static WarehouseInstance replica;
-  private String primaryDbName, replicatedDbName;
-  private static HiveConf conf;
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
@@ -48,6 +44,7 @@ public class TestStatsReplicationScenariosNoAutogather extends TestStatsReplicat
     overrides.put(MetastoreConf.ConfVars.EVENT_MESSAGE_FACTORY.getHiveName(),
         GzipJSONMessageEncoder.class.getCanonicalName());
 
-    internalBeforeClassSetup(overrides, TestReplicationScenarios.class, false);
+    internalBeforeClassSetup(overrides, overrides, TestStatsReplicationScenariosNoAutogather.class,
+            false, null);
   }
 }
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index c76d30c..e9a63f8 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -483,19 +483,18 @@ public class WarehouseInstance implements Closeable {
    * Get statistics for given set of columns of a given table in the given database
    * @param dbName - the database where the table resides
    * @param tableName - tablename whose statistics are to be retrieved
-   * @param colNames - columns whose statistics is to be retrieved.
    * @return - list of ColumnStatisticsObj objects in the order of the specified columns
    */
   public Map<String, List<ColumnStatisticsObj>> getAllPartitionColumnStatistics(String dbName,
                                                                     String tableName) throws Exception {
     List<String> colNames = new ArrayList();
     client.getFields(dbName, tableName).forEach(fs -> colNames.add(fs.getName()));
-    return getAllPartitionColumnStatistics(dbName, tableName, colNames);
+    return client.getPartitionColumnStatistics(dbName, tableName,
+            client.listPartitionNames(dbName, tableName, (short) -1), colNames);
   }
 
   /**
-   * Get statistics for given set of columns for all the partitions of a given table in the given
-   * database.
+   * Get statistics for a given partition of the given table in the given database.
    * @param dbName - the database where the table resides
    * @param tableName - name of the partitioned table in the database
    * @param colNames - columns whose statistics is to be retrieved
@@ -503,12 +502,11 @@ public class WarehouseInstance implements Closeable {
    * ordered according to the given list of columns.
    * @throws Exception
    */
-  Map<String, List<ColumnStatisticsObj>> getAllPartitionColumnStatistics(String dbName,
-                                                                         String tableName,
-                                                                         List<String> colNames)
+  List<ColumnStatisticsObj> getPartitionColumnStatistics(String dbName, String tableName,
+                                                         String partName, List<String> colNames)
           throws Exception {
     return client.getPartitionColumnStatistics(dbName, tableName,
-            client.listPartitionNames(dbName, tableName, (short) -1), colNames);
+                                              Collections.singletonList(partName), colNames).get(0);
   }
 
   public List<Partition> getAllPartitions(String dbName, String tableName) throws Exception {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java
index d9e58e9..ee32f4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableDesc.java
@@ -40,7 +40,6 @@ import org.apache.hadoop.hive.metastore.api.SQLForeignKey;
 import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.Utilities;
@@ -118,7 +117,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
   List<SQLNotNullConstraint> notNullConstraints;
   List<SQLDefaultConstraint> defaultConstraints;
   List<SQLCheckConstraint> checkConstraints;
-  private ColumnStatistics colStats;
+  private ColumnStatistics colStats;  // For the sake of replication
   private Long initialMmWriteId; // Initial MM write ID for CTAS and import.
   // The FSOP configuration for the FSOP that is going to write initial data during ctas.
   // This is not needed beyond compilation, so it is transient.
@@ -142,7 +141,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
       List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys,
       List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints,
       List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints,
-      ColumnStatistics colStats) {
+      ColumnStatistics colStats, long writeId) {
 
     this(tableName, isExternal, isTemporary, cols, partCols,
         bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape,
@@ -153,6 +152,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
 
     this.databaseName = databaseName;
     this.colStats = colStats;
+    this.replWriteId = writeId;
   }
 
   public CreateTableDesc(String databaseName, String tableName, boolean isExternal, boolean isTemporary,
@@ -174,7 +174,7 @@ public class CreateTableDesc implements DDLDesc, Serializable {
         outputFormat, location, serName, storageHandler, serdeProps,
         tblProps, ifNotExists, skewedColNames, skewedColValues,
         primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints,
-       null);
+       null, -1);
     this.partColNames = partColNames;
     this.isCTAS = isCTAS;
   }
@@ -900,14 +900,16 @@ public class CreateTableDesc implements DDLDesc, Serializable {
       colStatsDesc.setDbName(getTableName());
       colStatsDesc.setDbName(getDatabaseName());
       tbl.getTTable().setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj()));
+      // Statistics will have an associated write Id for a transactional table. We need it to
+      // update column statistics.
+      if (replWriteId > 0) {
+        tbl.getTTable().setWriteId(replWriteId);
+      }
     }
 
-    // The statistics for non-transactional tables will be obtained from the source. Do not
-    // reset those on replica.
-    if (replicationSpec != null && replicationSpec.isInReplicationScope() &&
-        !TxnUtils.isTransactionalTable(tbl.getTTable())) {
-      // Do nothing to the table statistics.
-    } else {
+    // When replicating the statistics for a table will be obtained from the source. Do not
+    // reset it on replica.
+    if (replicationSpec == null || !replicationSpec.isInReplicationScope()) {
       if (!this.isCTAS && (tbl.getPath() == null || (tbl.isEmpty() && !isExternal()))) {
         if (!tbl.isPartitioned() && conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
           StatsSetupConst.setStatsStateForCreateTable(tbl.getTTable().getParameters(),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
index 24373fe..7da3d26 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/CreateTableOperation.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.hooks.LineageInfo.DataContainer;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
@@ -93,27 +92,23 @@ public class CreateTableOperation extends DDLOperation {
 
   private void createTableReplaceMode(Table tbl, boolean replDataLocationChanged) throws HiveException {
     ReplicationSpec replicationSpec = desc.getReplicationSpec();
-    long writeId = 0;
+    Long writeId = 0L;
     EnvironmentContext environmentContext = null;
     if (replicationSpec != null && replicationSpec.isInReplicationScope()) {
       if (replicationSpec.isMigratingToTxnTable()) {
         // for migration we start the transaction and allocate write id in repl txn task for migration.
-        String writeIdPara = context.getConf().get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
-        if (writeIdPara == null) {
+        writeId = ReplUtils.getMigrationCurrentTblWriteId(context.getConf());
+        if (writeId == null) {
           throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
         }
-        writeId = Long.parseLong(writeIdPara);
       } else {
         writeId = desc.getReplWriteId();
       }
 
       // In case of replication statistics is obtained from the source, so do not update those
-      // on replica. Since we are not replicating statisics for transactional tables, do not do
-      // so for transactional tables right now.
-      if (!AcidUtils.isTransactionalTable(desc)) {
-        environmentContext = new EnvironmentContext();
-        environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
-      }
+      // on replica.
+      environmentContext = new EnvironmentContext();
+      environmentContext.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
     }
 
     // In replication flow, if table's data location is changed, then set the corresponding flag in
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
index cf00d7b..9756191 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ColumnStatsUpdateTask.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.hive.ql.exec;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
@@ -33,7 +36,6 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsData;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
 import org.apache.hadoop.hive.metastore.api.Date;
-import org.apache.hadoop.hive.metastore.api.Decimal;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.SetPartitionsStatsRequest;
 import org.apache.hadoop.hive.metastore.api.utils.DecimalUtils;
@@ -46,9 +48,11 @@ import org.apache.hadoop.hive.ql.CompilationOpContext;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -297,21 +301,41 @@ public class ColumnStatsUpdateTask extends Task<ColumnStatsUpdateWork> {
 
   private int persistColumnStats(Hive db) throws HiveException, MetaException, IOException {
     ColumnStatistics colStats = constructColumnStatsFromInput();
-    ColumnStatisticsDesc colStatsDesc = colStats.getStatsDesc();
-    // We do not support stats replication for a transactional table yet. If we are converting
-    // a non-transactional table to a transactional table during replication, we might get
-    // column statistics but we shouldn't update those.
-    if (work.getColStats() != null &&
-        AcidUtils.isTransactionalTable(getHive().getTable(colStatsDesc.getDbName(),
-                                                          colStatsDesc.getTableName()))) {
-      LOG.debug("Skipped updating column stats for table " +
-                TableName.getDbTable(colStatsDesc.getDbName(), colStatsDesc.getTableName()) +
-                " because it is converted to a transactional table during replication.");
-      return 0;
-    }
-
     SetPartitionsStatsRequest request =
             new SetPartitionsStatsRequest(Collections.singletonList(colStats));
+
+    // Set writeId and validWriteId list for replicated statistics. getColStats() will return
+    // non-null value only during replication.
+    if (work.getColStats() != null) {
+      String dbName = colStats.getStatsDesc().getDbName();
+      String tblName = colStats.getStatsDesc().getTableName();
+      Table tbl = db.getTable(dbName, tblName);
+      long writeId = work.getWriteId();
+      // If it's a transactional table on source and target, we will get a valid writeId
+      // associated with it. Otherwise it's a non-transactional table on source migrated to a
+      // transactional table on target, we need to craft a valid writeId here.
+      if (AcidUtils.isTransactionalTable(tbl)) {
+        ValidWriteIdList writeIds;
+        if (work.getIsMigratingToTxn()) {
+          Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+          if (tmpWriteId == null) {
+            throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
+          }
+          writeId = tmpWriteId;
+        }
+
+        // We need a valid writeId list to update column statistics for a transactional table. We
+        // do not have a valid writeId list which was used to update the column stats on the
+        // source. But we know for sure that the writeId associated with the stats was valid then
+        // (otherwise column stats update would have failed on the source). So use a valid
+        // transaction list with only that writeId and use it to update the stats.
+        writeIds = new ValidReaderWriteIdList(TableName.getDbTable(dbName, tblName), new long[0],
+                                              new BitSet(), writeId);
+        request.setValidWriteIdList(writeIds.toString());
+        request.setWriteId(writeId);
+      }
+    }
+
     db.setPartitionColumnStatistics(request);
     return 0;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
index a1d795f..7c5a47e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
@@ -1022,7 +1022,16 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
     }
     Partition part = db.getPartition(tbl, oldPartSpec, false);
     part.setValues(renamePartitionDesc.getNewPartSpec());
-    db.renamePartition(tbl, oldPartSpec, part);
+    long writeId = renamePartitionDesc.getWriteId();
+    if (renamePartitionDesc.getReplicationSpec() != null
+            && renamePartitionDesc.getReplicationSpec().isMigratingToTxnTable()) {
+      Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+      if (tmpWriteId == null) {
+        throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
+      }
+      writeId = tmpWriteId;
+    }
+    db.renamePartition(tbl, oldPartSpec, part, writeId);
     Partition newPart = db.getPartition(tbl, renamePartitionDesc.getNewPartSpec(), false);
     work.getInputs().add(new ReadEntity(oldPart));
     // We've already obtained a lock on the table, don't lock the partition too
@@ -2460,11 +2469,32 @@ public class DDLTask extends Task<DDLWork> implements Serializable {
       }
       environmentContext.putToProperties(HiveMetaHook.ALTER_TABLE_OPERATION_TYPE, alterTbl.getOp().name());
       if (allPartitions == null) {
-        db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), environmentContext, true);
+        long writeId = alterTbl.getWriteId() != null ? alterTbl.getWriteId() : 0;
+        if (alterTbl.getReplicationSpec() != null &&
+                alterTbl.getReplicationSpec().isMigratingToTxnTable()) {
+          Long tmpWriteId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+          if (tmpWriteId == null) {
+            throw new HiveException("DDLTask : Write id is not set in the config by open txn task for migration");
+          }
+          writeId = tmpWriteId;
+        }
+        db.alterTable(alterTbl.getOldName(), tbl, alterTbl.getIsCascade(), environmentContext,
+                true, writeId);
       } else {
         // Note: this is necessary for UPDATE_STATISTICS command, that operates via ADDPROPS (why?).
         //       For any other updates, we don't want to do txn check on partitions when altering table.
-        boolean isTxn = alterTbl.getPartSpec() != null && alterTbl.getOp() == AlterTableTypes.ADDPROPS;
+        boolean isTxn = false;
+        if (alterTbl.getPartSpec() != null && alterTbl.getOp() == AlterTableTypes.ADDPROPS) {
+          // ADDPROPS is used to add replication properties like repl.last.id, which isn't
+          // transactional change. In case of replication check for transactional properties
+          // explicitly.
+          Map<String, String> props = alterTbl.getProps();
+          if (alterTbl.getReplicationSpec() != null && alterTbl.getReplicationSpec().isInReplicationScope()) {
+            isTxn = (props.get(StatsSetupConst.COLUMN_STATS_ACCURATE) != null);
+          } else {
+            isTxn = true;
+          }
+        }
         db.alterPartitions(Warehouse.getQualifiedName(tbl.getTTable()), allPartitions, environmentContext, isTxn);
       }
       // Add constraints if necessary
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 1c2522d..15a266d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -284,9 +284,8 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
     }
 
     // If we are loading a table during replication, the stats will also be replicated
-    // and hence accurate if it's a non-transactional table. For transactional table we
-    // do not replicate stats yet.
-    return AcidUtils.isTransactionalTable(table.getParameters());
+    // and hence accurate. No need to reset those.
+    return false;
   }
 
   private final static class TaskInformation {
@@ -399,11 +398,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
         // for transactional table if write id is not set during replication from a cluster with STRICT_MANAGED set
         // to false then set it now.
         if (tbd.getWriteId() <= 0 && AcidUtils.isTransactionalTable(table.getParameters())) {
-          String writeId = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
+          Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
           if (writeId == null) {
             throw new HiveException("MoveTask : Write id is not set in the config by open txn task for migration");
           }
-          tbd.setWriteId(Long.parseLong(writeId));
+          tbd.setWriteId(writeId);
           tbd.setStmtId(driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement());
         }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index c34f075..dcf569f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -206,15 +206,14 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
           // getDeleteDestIfExist returns true if it is repl load for replace/insert overwrite event and
           // hence need to create base directory. If false, then it is repl load for regular insert into or
           // load flow and hence just create delta directory.
-          String writeIdString = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
-          if (writeIdString == null) {
+          Long writeId = ReplUtils.getMigrationCurrentTblWriteId(conf);
+          if (writeId == null) {
             console.printError("ReplCopyTask : Write id is not set in the config by open txn task for migration");
             return 6;
           }
-          long writeId = Long.parseLong(writeIdString);
           // Set stmt id 0 for bootstrap load as the directory needs to be searched during incremental load to avoid any
           // duplicate copy from the source. Check HIVE-21197 for more detail.
-          int stmtId = (writeId == ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID) ?
+          int stmtId = (writeId.equals(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID)) ?
                   ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_STMT_ID :
                   driverContext.getCtx().getHiveTxnManager().getStmtIdAndIncrement();
           toPath = new Path(toPath, AcidUtils.baseOrDeltaSubdir(work.getDeleteDestIfExist(), writeId, writeId, stmtId));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 3704344..eb5c18a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -367,9 +367,6 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
       // as bootstrap dump's last repl Id.
       tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
-
-      // For now we do not replicate stats for ACID table. So, wipe out column stats if any.
-      tableSpec.tableHandle.getTTable().unsetColStats();
     }
     MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
     new TableExport(
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
index 22b6e98..27009f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/bootstrap/events/filesystem/FSTableEvent.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.ql.exec.repl.bootstrap.events.TableEvent;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -106,11 +107,10 @@ public class FSTableEvent implements TableEvent {
         // If the conversion is from non transactional to transactional table
         if (AcidUtils.isTransactionalTable(table)) {
           replicationSpec().setMigratingToTxnTable();
-          // There won't be any writeId associated with statistics on source non-transactional
-          // table. We will need to associate a cooked up writeId on target for those. But that's
-          // not done yet. Till then we don't replicate statistics for ACID table even if it's
-          // available on the source.
-          table.getTTable().unsetColStats();
+          // For migrated tables associate bootstrap writeId when replicating stats.
+          if (table.getTTable().isSetColStats()) {
+            table.getTTable().setWriteId(ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID);
+          }
         }
         if (TableType.EXTERNAL_TABLE.equals(table.getTableType())) {
           // since we have converted to an external table now after applying the migration rules the
@@ -196,15 +196,15 @@ public class FSTableEvent implements TableEvent {
       }
       partsDesc.setReplicationSpec(replicationSpec());
 
-      // Right now, we do not have a way of associating a writeId with statistics for a table
-      // converted to a transactional table if it was non-transactional on the source. So, do not
-      // update statistics for converted tables even if available on the source.
-      if (partition.isSetColStats() && !replicationSpec().isMigratingToTxnTable()) {
+      if (partition.isSetColStats()) {
         ColumnStatistics colStats = partition.getColStats();
         ColumnStatisticsDesc colStatsDesc = new ColumnStatisticsDesc(colStats.getStatsDesc());
         colStatsDesc.setTableName(tblDesc.getTableName());
         colStatsDesc.setDbName(tblDesc.getDatabaseName());
         partDesc.setColStats(new ColumnStatistics(colStatsDesc, colStats.getStatsObj()));
+        long writeId = replicationSpec().isMigratingToTxnTable() ?
+                ReplUtils.REPL_BOOTSTRAP_MIGRATION_BASE_WRITE_ID : partition.getWriteId();
+        partDesc.setWriteId(writeId);
       }
       return partsDesc;
     } catch (Exception e) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
index 072189b..fbdbbdd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/util/ReplUtils.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.ReplConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -34,6 +35,7 @@ import org.apache.hadoop.hive.ql.parse.DDLSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.ReplLogger;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
+import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
@@ -161,6 +163,33 @@ public class ReplUtils {
     return false;
   }
 
+  public static boolean isTableMigratingToTransactional(HiveConf conf,
+                                                 org.apache.hadoop.hive.metastore.api.Table tableObj)
+  throws TException, IOException {
+    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) &&
+            !AcidUtils.isTransactionalTable(tableObj) &&
+            TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) {
+      //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata.
+      HiveStrictManagedMigration.TableMigrationOption migrationOption =
+              HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE,
+                      null, conf, null, true);
+      return migrationOption == MANAGED;
+    }
+    return false;
+  }
+
+  private static void addOpenTxnTaskForMigration(String actualDbName, String actualTblName,
+                                            HiveConf conf,
+                                         UpdatedMetaDataTracker updatedMetaDataTracker,
+                                         List<Task<? extends Serializable>> taskList,
+                                         Task<? extends Serializable> childTask) {
+    Task<? extends Serializable> replTxnTask = TaskFactory.get(new ReplTxnWork(actualDbName, actualTblName,
+            ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), conf);
+    replTxnTask.addDependentTask(childTask);
+    updatedMetaDataTracker.setNeedCommitTxn(true);
+    taskList.add(replTxnTask);
+  }
+
   public static List<Task<? extends Serializable>> addOpenTxnTaskForMigration(String actualDbName,
                                                                   String actualTblName, HiveConf conf,
                                                                   UpdatedMetaDataTracker updatedMetaDataTracker,
@@ -169,25 +198,36 @@ public class ReplUtils {
           throws IOException, TException {
     List<Task<? extends Serializable>> taskList = new ArrayList<>();
     taskList.add(childTask);
-    if (conf.getBoolVar(HiveConf.ConfVars.HIVE_STRICT_MANAGED_TABLES) && updatedMetaDataTracker != null &&
-            !AcidUtils.isTransactionalTable(tableObj) &&
-            TableType.valueOf(tableObj.getTableType()) == TableType.MANAGED_TABLE) {
-      //TODO : isPathOwnByHive is hard coded to true, need to get it from repl dump metadata.
-      HiveStrictManagedMigration.TableMigrationOption migrationOption =
-              HiveStrictManagedMigration.determineMigrationTypeAutomatically(tableObj, TableType.MANAGED_TABLE,
-                      null, conf, null, true);
-      if (migrationOption == MANAGED) {
-        //if conversion to managed table.
-        Task<? extends Serializable> replTxnTask = TaskFactory.get(new ReplTxnWork(actualDbName, actualTblName,
-                        ReplTxnWork.OperationType.REPL_MIGRATION_OPEN_TXN), conf);
-        replTxnTask.addDependentTask(childTask);
-        updatedMetaDataTracker.setNeedCommitTxn(true);
-        taskList.add(replTxnTask);
-      }
+    if (isTableMigratingToTransactional(conf, tableObj) && updatedMetaDataTracker != null) {
+      addOpenTxnTaskForMigration(actualDbName, actualTblName, conf, updatedMetaDataTracker,
+              taskList, childTask);
     }
     return taskList;
   }
 
+  public static List<Task<? extends Serializable>> addTasksForLoadingColStats(ColumnStatistics colStats,
+                                                                              HiveConf conf,
+                                                                              UpdatedMetaDataTracker updatedMetadata,
+                                                                              org.apache.hadoop.hive.metastore.api.Table tableObj,
+                                                                              long writeId)
+          throws IOException, TException {
+    List<Task<? extends Serializable>> taskList = new ArrayList<>();
+    boolean isMigratingToTxn = ReplUtils.isTableMigratingToTransactional(conf, tableObj);
+    ColumnStatsUpdateWork work = new ColumnStatsUpdateWork(colStats, isMigratingToTxn);
+    work.setWriteId(writeId);
+    Task<?> task = TaskFactory.get(work, conf);
+    taskList.add(task);
+    // If the table is going to be migrated to a transactional table we will need to open
+    // and commit a transaction to associate a valid writeId with the statistics.
+    if (isMigratingToTxn) {
+      ReplUtils.addOpenTxnTaskForMigration(colStats.getStatsDesc().getDbName(),
+              colStats.getStatsDesc().getTableName(), conf, updatedMetadata, taskList,
+              task);
+    }
+
+    return taskList;
+
+  }
   // Path filters to filter only events (directories) excluding "_bootstrap"
   public static PathFilter getEventsDirectoryFilter(final FileSystem fs) {
     return p -> {
@@ -216,4 +256,12 @@ public class ReplUtils {
     envContext.putToProperties(ReplConst.REPL_DATA_LOCATION_CHANGED, ReplConst.TRUE);
     return envContext;
   }
+
+  public static Long getMigrationCurrentTblWriteId(HiveConf conf) {
+    String writeIdString = conf.get(ReplUtils.REPL_CURRENT_TBL_WRITE_ID);
+    if (writeIdString == null) {
+      return null;
+    }
+    return Long.parseLong(writeIdString);
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 4350dc8..fdd3e46 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -42,6 +42,7 @@ import java.nio.ByteBuffer;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -665,6 +666,14 @@ public class Hive {
     alterTable(null, names[0], names[1], newTbl, cascade, environmentContext, transactional);
   }
 
+  public void alterTable(String fullyQlfdTblName, Table newTbl, boolean cascade,
+                         EnvironmentContext environmentContext, boolean transactional, long writeId)
+          throws HiveException {
+    String[] names = Utilities.getDbTableName(fullyQlfdTblName);
+    alterTable(null, names[0], names[1], newTbl, cascade, environmentContext, transactional,
+                writeId);
+  }
+
   public void alterTable(String catName, String dbName, String tblName, Table newTbl, boolean cascade,
                          EnvironmentContext environmentContext, boolean transactional) throws HiveException {
     alterTable(catName, dbName, tblName, newTbl, cascade, environmentContext, transactional, 0);
@@ -694,7 +703,14 @@ public class Hive {
       AcidUtils.TableSnapshot tableSnapshot = null;
       if (transactional) {
         if (replWriteId > 0) {
-          ValidWriteIdList writeIds = AcidUtils.getTableValidWriteIdListWithTxnList(conf, dbName, tblName);
+          // We need a valid writeId list for a transactional table modification. During
+          // replication we do not have a valid writeId list which was used to modify the table
+          // on the source. But we know for sure that the writeId associated with it was valid
+          // then (otherwise modification would have failed on the source). So use a valid
+          // transaction list with only that writeId.
+          ValidWriteIdList writeIds = new ValidReaderWriteIdList(TableName.getDbTable(dbName, tblName),
+                                                                  new long[0], new BitSet(),
+                                                                  replWriteId);
           tableSnapshot = new TableSnapshot(replWriteId, writeIds.writeToString());
         } else {
           // Make sure we pass in the names, so we can get the correct snapshot for rename table.
@@ -863,7 +879,8 @@ public class Hive {
    *          new partition
    * @throws HiveException
    */
-  public void renamePartition(Table tbl, Map<String, String> oldPartSpec, Partition newPart)
+  public void renamePartition(Table tbl, Map<String, String> oldPartSpec, Partition newPart,
+                              long replWriteId)
       throws HiveException {
     try {
       Map<String, String> newPartSpec = newPart.getSpec();
@@ -887,8 +904,21 @@ public class Hive {
       }
       String validWriteIds = null;
       if (AcidUtils.isTransactionalTable(tbl)) {
-        // Set table snapshot to api.Table to make it persistent.
-        TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+        TableSnapshot tableSnapshot;
+        if (replWriteId > 0) {
+          // We need a valid writeId list for a transactional table modification. During
+          // replication we do not have a valid writeId list which was used to modify the table
+          // on the source. But we know for sure that the writeId associated with it was valid
+          // then (otherwise modification would have failed on the source). So use a valid
+          // transaction list with only that writeId.
+          ValidWriteIdList writeIds = new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+                  tbl.getTableName()), new long[0], new BitSet(), replWriteId);
+          tableSnapshot = new TableSnapshot(replWriteId, writeIds.writeToString());
+        } else {
+          // Set table snapshot to api.Table to make it persistent.
+          tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+        }
+
         if (tableSnapshot != null) {
           newPart.getTPartition().setWriteId(tableSnapshot.getWriteId());
           validWriteIds = tableSnapshot.getValidWriteIdList();
@@ -987,10 +1017,14 @@ public class Hive {
           tTbl.setPrivileges(principalPrivs);
         }
       }
-      // Set table snapshot to api.Table to make it persistent.
-      TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
-      if (tableSnapshot != null) {
-        tbl.getTTable().setWriteId(tableSnapshot.getWriteId());
+      // Set table snapshot to api.Table to make it persistent. A transactional table being
+      // replicated may have a valid write Id copied from the source. Use that instead of
+      // crafting one on the replica.
+      if (tTbl.getWriteId() <= 0) {
+        TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+        if (tableSnapshot != null) {
+          tbl.getTTable().setWriteId(tableSnapshot.getWriteId());
+        }
       }
 
       if (primaryKeys == null && foreignKeys == null
@@ -2950,21 +2984,36 @@ private void constructOneLBLocationMap(FileStatus fSta,
     int size = addPartitionDesc.getPartitionCount();
     List<org.apache.hadoop.hive.metastore.api.Partition> in =
         new ArrayList<org.apache.hadoop.hive.metastore.api.Partition>(size);
-    AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
     long writeId;
     String validWriteIdList;
-    if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
-      writeId = tableSnapshot.getWriteId();
-      validWriteIdList = tableSnapshot.getValidWriteIdList();
+
+    // In case of replication, get the writeId from the source and use valid write Id list
+    // for replication.
+    if (addPartitionDesc.getReplicationSpec().isInReplicationScope() &&
+        addPartitionDesc.getPartition(0).getWriteId() > 0) {
+      writeId = addPartitionDesc.getPartition(0).getWriteId();
+      // We need a valid writeId list for a transactional change. During replication we do not
+      // have a valid writeId list which was used for this on the source. But we know for sure
+      // that the writeId associated with it was valid then (otherwise the change would have
+      // failed on the source). So use a valid transaction list with only that writeId.
+      validWriteIdList = new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+                                                                          tbl.getTableName()),
+                                                    new long[0], new BitSet(), writeId).writeToString();
     } else {
-      writeId = -1;
-      validWriteIdList = null;
+      AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+      if (tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
+        writeId = tableSnapshot.getWriteId();
+        validWriteIdList = tableSnapshot.getValidWriteIdList();
+      } else {
+        writeId = -1;
+        validWriteIdList = null;
+      }
     }
     for (int i = 0; i < size; ++i) {
       org.apache.hadoop.hive.metastore.api.Partition tmpPart =
           convertAddSpecToMetaPartition(tbl, addPartitionDesc.getPartition(i), conf);
-      if (tmpPart != null && tableSnapshot != null && tableSnapshot.getWriteId() > 0) {
-        tmpPart.setWriteId(tableSnapshot.getWriteId());
+      if (tmpPart != null && writeId > 0) {
+        tmpPart.setWriteId(writeId);
       }
       in.add(tmpPart);
     }
@@ -3004,12 +3053,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
           out.add(new Partition(tbl, outPart));
         }
         EnvironmentContext ec = new EnvironmentContext();
-        // In case of replication statistics is obtained from the source, so do not update those
-        // on replica. Since we are not replicating statistics for transactional tables, do not do
-        // so for a partition of a transactional table right now.
-        if (!AcidUtils.isTransactionalTable(tbl)) {
-          ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
-        }
+        // In case of replication, statistics is obtained from the source, so do not update those
+        // on replica.
+        ec.putToProperties(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE);
         getMSC().alter_partitions(addPartitionDesc.getDbName(), addPartitionDesc.getTableName(),
             partsToAlter, ec, validWriteIdList, writeId);
 
@@ -3064,6 +3110,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
     if (addSpec.getColStats() != null) {
       part.setColStats(addSpec.getColStats());
+      // Statistics will have an associated write Id for a transactional table. We need it to
+      // update column statistics.
+      part.setWriteId(addSpec.getWriteId());
     }
     return part;
   }
@@ -5068,11 +5117,16 @@ private void constructOneLBLocationMap(FileStatus fSta,
     try {
       ColumnStatistics colStat = request.getColStats().get(0);
       ColumnStatisticsDesc statsDesc = colStat.getStatsDesc();
-      Table tbl = getTable(statsDesc.getDbName(), statsDesc.getTableName());
 
-      AcidUtils.TableSnapshot tableSnapshot  = AcidUtils.getTableSnapshot(conf, tbl, true);
-      request.setValidWriteIdList(tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null);
-      request.setWriteId(tableSnapshot != null ? tableSnapshot.getWriteId() : 0);
+      // In case of replication, the request already has valid writeId and valid transaction id
+      // list obtained from the source. Just use it.
+      if (request.getWriteId() <= 0 || request.getValidWriteIdList() == null) {
+        Table tbl = getTable(statsDesc.getDbName(), statsDesc.getTableName());
+        AcidUtils.TableSnapshot tableSnapshot = AcidUtils.getTableSnapshot(conf, tbl, true);
+        request.setValidWriteIdList(tableSnapshot != null ? tableSnapshot.getValidWriteIdList() : null);
+        request.setWriteId(tableSnapshot != null ? tableSnapshot.getWriteId() : 0);
+      }
+
       return getMSC().setPartitionColumnStatistics(request);
     } catch (Exception e) {
       LOG.debug(StringUtils.stringifyException(e));
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
index fb1c8d4..ae030d9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
@@ -1119,12 +1118,6 @@ public class Table implements Serializable {
    * table or during replication.
    */
   public void setStatsStateLikeNewTable() {
-    // We do not replicate statistics for
-    // an ACID Table right now, so don't touch them right now.
-    if (AcidUtils.isTransactionalTable(this)) {
-      return;
-    }
-
     if (isPartitioned()) {
       StatsSetupConst.setStatsStateForCreateTable(getParameters(), null,
               StatsSetupConst.FALSE);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index cb9584c..07b40c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -288,11 +287,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         //if the conversion is from non transactional to transactional table
         if (TxnUtils.isTransactionalTable(tblObj)) {
           replicationSpec.setMigratingToTxnTable();
-          // There won't be any writeId associated with statistics on source non-transactional
-          // table. We will need to associate a cooked up writeId on target for those. But that's
-          // not done yet. Till then we don't replicate statistics for ACID table even if it's
-          // available on the source.
-          tblObj.unsetColStats();
         }
         tblDesc = getBaseCreateTableDescFromTable(dbname, tblObj);
         if (TableType.valueOf(tblObj.getTableType()) == TableType.EXTERNAL_TABLE) {
@@ -311,11 +305,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     boolean inReplicationScope = false;
     if ((replicationSpec != null) && replicationSpec.isInReplicationScope()) {
       tblDesc.setReplicationSpec(replicationSpec);
-      // Statistics for a non-transactional table will be replicated separately. Don't bother
-      // with it here.
-      if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) {
-        StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
-      }
       inReplicationScope = true;
       tblDesc.setReplWriteId(writeId);
       tblDesc.setOwnerName(tblObj.getOwner());
@@ -345,13 +334,6 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       AddPartitionDesc partsDesc =
               getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition,
                       replicationSpec, x.getConf());
-      if (inReplicationScope) {
-        // Statistics for a non-transactional table will be replicated separately. Don't bother
-        // with it here.
-        if (TxnUtils.isTransactionalTable(tblDesc.getTblProps())) {
-          StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE);
-        }
-      }
       partitionDescs.add(partsDesc);
     }
 
@@ -455,6 +437,9 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       partDesc.setLocation(new Path(fromPath,
               Warehouse.makePartName(tblDesc.getPartCols(), partition.getValues())).toString());
     }
+    if (tblDesc.getReplWriteId() != null) {
+      partDesc.setWriteId(tblDesc.getReplWriteId());
+    }
     return partsDesc;
   }
 
@@ -667,8 +652,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         copyTask = TaskFactory.get(new CopyWork(new Path(srcLocation), destPath, false));
       }
 
-      Task<?> addPartTask = TaskFactory.get(
-              new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
+      Task<?> addPartTask = null;
+      if (x.getEventType() != DumpType.EVENT_COMMIT_TXN) {
+        // During replication, by the time we are applying commit transaction event, we expect
+        // the partition/s to be already added or altered by previous events. So no need to
+        // create add partition event again.
+        addPartTask = TaskFactory.get(
+                new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
+      }
 
       MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(),
               null, null, false);
@@ -704,15 +695,20 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         if (x.getEventType() == DumpType.EVENT_INSERT) {
           copyTask.addDependentTask(TaskFactory.get(moveWork, x.getConf()));
         } else {
-          copyTask.addDependentTask(addPartTask);
+          if (addPartTask != null) {
+            copyTask.addDependentTask(addPartTask);
+          }
         }
         return copyTask;
       }
       Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
       copyTask.addDependentTask(loadPartTask);
-      addPartTask.addDependentTask(loadPartTask);
-      x.getTasks().add(copyTask);
-      return addPartTask;
+      if (addPartTask != null) {
+        addPartTask.addDependentTask(loadPartTask);
+        x.getTasks().add(copyTask);
+        return addPartTask;
+      }
+      return copyTask;
     }
   }
 
@@ -1225,19 +1221,19 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         lockType = WriteEntity.WriteType.DDL_SHARED;
       }
 
-      Task t = createTableTask(tblDesc, x);
       table = createNewTableMetadataObject(tblDesc, true);
 
+      List<Task<?>> dependentTasks = null;
       if (isPartitioned(tblDesc)) {
+        dependentTasks = new ArrayList<>(partitionDescs.size());
         for (AddPartitionDesc addPartitionDesc : partitionDescs) {
           addPartitionDesc.setReplicationSpec(replicationSpec);
           if (!replicationSpec.isMetadataOnly()) {
-            t.addDependentTask(
-                    addSinglePartition(tblDesc, table, wh, addPartitionDesc, replicationSpec, x,
-                            writeId, stmtId));
+            dependentTasks.add(addSinglePartition(tblDesc, table, wh, addPartitionDesc,
+                                                replicationSpec, x, writeId, stmtId));
           } else {
-            t.addDependentTask(alterSinglePartition(tblDesc, table, wh, addPartitionDesc,
-                    replicationSpec, null, x));
+            dependentTasks.add(alterSinglePartition(tblDesc, table, wh, addPartitionDesc,
+                                                  replicationSpec, null, x));
           }
           if (updatedMetadata != null) {
             updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
@@ -1247,17 +1243,37 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       } else if (!replicationSpec.isMetadataOnly()
               && !shouldSkipDataCopyInReplScope(tblDesc, replicationSpec)) {
         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
-        t.addDependentTask(loadTable(fromURI, table, replicationSpec.isReplace(),
-                new Path(tblDesc.getLocation()), replicationSpec, x, writeId, stmtId));
+        dependentTasks = new ArrayList<>(1);
+        dependentTasks.add(loadTable(fromURI, table, replicationSpec.isReplace(),
+                                  new Path(tblDesc.getLocation()), replicationSpec,
+                                  x, writeId, stmtId));
       }
 
-      if (dropTblTask != null) {
-        // Drop first and then create
-        dropTblTask.addDependentTask(t);
-        x.getTasks().add(dropTblTask);
+      // During replication, by the time we replay a commit transaction event, the table should
+      // have been already created when replaying previous events. So no need to create table
+      // again.
+      if (x.getEventType() != DumpType.EVENT_COMMIT_TXN) {
+        Task t = createTableTask(tblDesc, x);
+        if (dependentTasks != null) {
+          dependentTasks.forEach(task -> t.addDependentTask(task));
+        }
+        if (dropTblTask != null) {
+          // Drop first and then create
+          dropTblTask.addDependentTask(t);
+          x.getTasks().add(dropTblTask);
+        } else {
+          // Simply create
+          x.getTasks().add(t);
+        }
       } else {
-        // Simply create
-        x.getTasks().add(t);
+        // We should not require to create a drop table task when replaying a commit transaction
+        // event. That should have been done when replaying create table event itself.
+        assert dropTblTask == null;
+
+        // Add all the tasks created above directly
+        if (dependentTasks != null) {
+          x.getTasks().addAll(dependentTasks);
+        }
       }
     } else {
       // If table of current event has partition flag different from existing table, it means, some
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
index 4cd4d70..c2e26f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/TableExport.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -109,16 +108,8 @@ public class TableExport {
           if (replicationSpec.isMetadataOnly()) {
             return null;
           } else {
-            // For transactional tables, we do not replicate statistics right now, so don't
-            // include statistics in Partition object as well.
-            boolean getColStats;
-            if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
-              getColStats = false;
-            } else {
-              getColStats = true;
-            }
             return new PartitionIterable(db, tableSpec.tableHandle, null, conf.getIntVar(
-                HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX), getColStats);
+                HiveConf.ConfVars.METASTORE_BATCH_RETRIEVE_MAX), true);
           }
         } else {
           // PARTITIONS specified - partitions inside tableSpec
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
index 79e1361..54fc7a6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdatePartColStatHandler.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -53,12 +52,6 @@ class UpdatePartColStatHandler extends AbstractEventHandler<UpdatePartitionColum
       return;
     }
 
-    // For now we do not dump statistics for a transactional table since replicating the same is
-    // not supported.
-    if (AcidUtils.isTransactionalTable(tableObj)) {
-      return;
-    }
-
     if (!Utils.shouldReplicate(withinContext.replicationSpec, new Table(tableObj), true,
                               withinContext.hiveConf)) {
       return;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
index ca9af5e..62db959 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/UpdateTableColStatHandler.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.dump.Utils;
@@ -46,10 +45,6 @@ class UpdateTableColStatHandler extends AbstractEventHandler<UpdateTableColumnSt
     if (withinContext.replicationSpec.isMetadataOnly()) {
       return;
     }
-    // For now we do not replicate the statistics for transactional tables.
-    if (AcidUtils.isTransactionalTable(qlMdTable)) {
-      return;
-    }
 
     DumpMetaData dmd = withinContext.createDmd(this);
     dmd.setPayload(eventMessageAsJSON);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
index 1125f69..9c66210 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenamePartitionHandler.java
@@ -18,15 +18,17 @@
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -44,26 +46,34 @@ public class RenamePartitionHandler extends AbstractMessageHandler {
     Map<String, String> newPartSpec = new LinkedHashMap<>();
     Map<String, String> oldPartSpec = new LinkedHashMap<>();
     String tableName = actualDbName + "." + actualTblName;
+    Table tableObj;
+    ReplicationSpec replicationSpec = context.eventOnlyReplicationSpec();
     try {
       Iterator<String> beforeIterator = msg.getPtnObjBefore().getValuesIterator();
       Iterator<String> afterIterator = msg.getPtnObjAfter().getValuesIterator();
-      for (FieldSchema fs : msg.getTableObj().getPartitionKeys()) {
+      tableObj = msg.getTableObj();
+      for (FieldSchema fs : tableObj.getPartitionKeys()) {
         oldPartSpec.put(fs.getName(), beforeIterator.next());
         newPartSpec.put(fs.getName(), afterIterator.next());
       }
+      if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObj)) {
+        replicationSpec.setMigratingToTxnTable();
+      }
+
+      RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(
+              tableName, oldPartSpec, newPartSpec, replicationSpec, null);
+      renamePtnDesc.setWriteId(msg.getWriteId());
+      Task<DDLWork> renamePtnTask = TaskFactory.get(
+          new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf);
+      context.log.debug("Added rename ptn task : {}:{}->{}",
+                        renamePtnTask.getId(), oldPartSpec, newPartSpec);
+      updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec);
+      return ReplUtils.addOpenTxnTaskForMigration(actualDbName, actualTblName,
+              context.hiveConf, updatedMetadata, renamePtnTask, tableObj);
     } catch (Exception e) {
       throw (e instanceof SemanticException)
-          ? (SemanticException) e
-          : new SemanticException("Error reading message members", e);
+              ? (SemanticException) e
+              : new SemanticException("Error reading message members", e);
     }
-
-    RenamePartitionDesc renamePtnDesc = new RenamePartitionDesc(
-            tableName, oldPartSpec, newPartSpec, context.eventOnlyReplicationSpec(), null);
-    Task<DDLWork> renamePtnTask = TaskFactory.get(
-        new DDLWork(readEntitySet, writeEntitySet, renamePtnDesc), context.hiveConf);
-    context.log.debug("Added rename ptn task : {}:{}->{}",
-                      renamePtnTask.getId(), oldPartSpec, newPartSpec);
-    updatedMetadata.set(context.dmd.getEventTo().toString(), actualDbName, actualTblName, newPartSpec);
-    return Collections.singletonList(renamePtnTask);
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
index ddf2ca1..53d9982 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/RenameTableHandler.java
@@ -21,13 +21,14 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
 
 public class RenameTableHandler extends AbstractMessageHandler {
@@ -61,8 +62,13 @@ public class RenameTableHandler extends AbstractMessageHandler {
 
       String oldName = StatsUtils.getFullyQualifiedTableName(oldDbName, tableObjBefore.getTableName());
       String newName = StatsUtils.getFullyQualifiedTableName(newDbName, tableObjAfter.getTableName());
+      ReplicationSpec replicationSpec = context.eventOnlyReplicationSpec();
+      if (ReplUtils.isTableMigratingToTransactional(context.hiveConf, tableObjAfter)) {
+        replicationSpec.setMigratingToTxnTable();
+      }
       AlterTableDesc renameTableDesc = new AlterTableDesc(
-              oldName, newName, false, context.eventOnlyReplicationSpec());
+              oldName, newName, false, replicationSpec);
+      renameTableDesc.setWriteId(msg.getWriteId());
       Task<DDLWork> renameTableTask = TaskFactory.get(
           new DDLWork(readEntitySet, writeEntitySet, renameTableDesc), context.hiveConf);
       context.log.debug("Added rename table task : {}:{}->{}",
@@ -75,7 +81,8 @@ public class RenameTableHandler extends AbstractMessageHandler {
       // Note : edge-case here in interaction with table-level REPL LOAD, where that nukes out
       // tablesUpdated. However, we explicitly don't support repl of that sort, and error out above
       // if so. If that should ever change, this will need reworking.
-      return Collections.singletonList(renameTableTask);
+      return ReplUtils.addOpenTxnTaskForMigration(oldDbName, tableObjBefore.getTableName(),
+              context.hiveConf, updatedMetadata, renameTableTask, tableObjAfter);
     } catch (Exception e) {
       throw (e instanceof SemanticException)
           ? (SemanticException) e
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java
index 02e938e..cb85f7d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdatePartColStatHandler.java
@@ -21,12 +21,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.messaging.UpdatePartitionColumnStatMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -51,7 +49,12 @@ public class UpdatePartColStatHandler extends AbstractMessageHandler {
       updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName,
                   null);
     }
-    return Collections.singletonList(TaskFactory.get(new ColumnStatsUpdateWork(colStats),
-            context.hiveConf));
+
+    try {
+      return ReplUtils.addTasksForLoadingColStats(colStats, context.hiveConf, updatedMetadata,
+                                                  upcsm.getTableObject(), upcsm.getWriteId());
+    } catch(Exception e) {
+      throw new SemanticException(e);
+    }
   }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
index 9a60de4..371429e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/UpdateTableColStatHandler.java
@@ -21,12 +21,10 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.messaging.UpdateTableColumnStatMessage;
 import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ColumnStatsUpdateWork;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.List;
 
 /**
@@ -52,11 +50,11 @@ public class UpdateTableColStatHandler extends AbstractMessageHandler {
                     context.tableName, null);
         }
 
-      // TODO: For txn stats update, ColumnStatsUpdateTask.execute()->Hive
-      // .setPartitionColumnStatistics expects a valid writeId allocated by the current txn and
-      // also, there should be a table snapshot. But, it won't be there as update from
-      // ReplLoadTask which doesn't have a write id allocated. Need to check this further.
-        return Collections.singletonList(TaskFactory.get(new ColumnStatsUpdateWork(colStats),
-                context.hiveConf));
+        try {
+            return ReplUtils.addTasksForLoadingColStats(colStats, context.hiveConf, updatedMetadata,
+                    utcsm.getTableObject(), utcsm.getWriteId());
+        } catch(Exception e) {
+            throw new SemanticException(e);
+        }
     }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
index 26cb217..8ea857e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/AddPartitionDesc.java
@@ -60,6 +60,7 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
     List<String> bucketCols = null;
     List<Order> sortCols = null;
     ColumnStatistics colStats = null;
+    long writeId = -1;
 
     public Map<String, String> getPartSpec() {
       return partSpec;
@@ -151,6 +152,10 @@ public class AddPartitionDesc extends DDLDesc implements Serializable {
     public ColumnStatistics getColStats() { return colStats; }
 
     public void setColStats(ColumnStatistics colStats) { this.colStats = colStats; }
+
+    public long getWriteId() { return writeId; }
+
+    public void setWriteId(long writeId) { this.writeId = writeId; }
   }
 
   private static final long serialVersionUID = 1L;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
index 1219b62..4a14246 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ColumnStatsUpdateWork.java
@@ -44,6 +44,8 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
   private final String colName;
   private final String colType;
   private final ColumnStatistics colStats;
+  private final boolean isMigratingToTxn; // Is the table for which we are updating stats going
+                                          // to be migrated during replication.
   private long writeId;
 
   public ColumnStatsUpdateWork(String partName,
@@ -59,10 +61,12 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
     this.colName = colName;
     this.colType = colType;
     this.colStats = null;
+    this.isMigratingToTxn = false;
   }
 
-  public ColumnStatsUpdateWork(ColumnStatistics colStats) {
+  public ColumnStatsUpdateWork(ColumnStatistics colStats, boolean isMigratingToTxn) {
     this.colStats = colStats;
+    this.isMigratingToTxn = isMigratingToTxn;
     this.partName = null;
     this.mapProp = null;
     this.dbName = null;
@@ -102,11 +106,15 @@ public class ColumnStatsUpdateWork implements Serializable, DDLDescWithWriteId {
 
   public ColumnStatistics getColStats() { return colStats; }
 
+  public boolean getIsMigratingToTxn() { return isMigratingToTxn; }
+
   @Override
   public void setWriteId(long writeId) {
     this.writeId = writeId;
   }
 
+  public long getWriteId() { return writeId; }
+
   @Override
   public String getFullTableName() {
     return dbName + "." + tableName;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
index 8db5d33..dd3af1b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ImportTableDesc.java
@@ -88,7 +88,8 @@ public class ImportTableDesc {
                 null,
             null,
             null,
-                table.getColStats());
+                table.getColStats(),
+                table.getTTable().getWriteId());
         this.createTblDesc.setStoredAsSubDirectories(table.getSd().isStoredAsSubDirectories());
         break;
       case VIEW:
@@ -382,4 +383,11 @@ public class ImportTableDesc {
         throw new RuntimeException("Invalid table type : " + getDescType());
     }
   }
+
+  public Long getReplWriteId() {
+    if (this.createTblDesc != null) {
+      return this.createTblDesc.getReplWriteId();
+    }
+    return -1L;
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java
index a4a31a5..b4edbfe 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/RenamePartitionDesc.java
@@ -127,6 +127,8 @@ public class RenamePartitionDesc extends DDLDesc implements Serializable, DDLDes
     this.writeId = writeId;
   }
 
+  public long getWriteId() { return writeId; }
+
   @Override
   public String getFullTableName() {
     return fqTableName;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 854c85f..f1983c5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -40,6 +40,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -78,6 +79,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.ZooKeeperHiveHelper;
 import org.apache.hadoop.hive.common.ZKDeRegisterWatcher;
 import org.apache.hadoop.hive.metastore.api.*;
@@ -1894,6 +1897,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
                                    List<SQLCheckConstraint> checkConstraints)
         throws AlreadyExistsException, MetaException,
         InvalidObjectException, NoSuchObjectException, InvalidInputException {
+
+      ColumnStatistics colStats = null;
+      // If the given table has column statistics, save it here. We will update it later.
+      // We don't want it to be part of the Table object being created, lest the create table
+      // event will also have the col stats which we don't want.
+      if (tbl.isSetColStats()) {
+        colStats = tbl.getColStats();
+        tbl.unsetColStats();
+      }
+
       // To preserve backward compatibility throw MetaException in case of null database
       if (tbl.getDbName() == null) {
         throw new MetaException("Null database name is not allowed");
@@ -2128,13 +2141,23 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         }
       }
 
-      // If the table has column statistics, update it into the metastore. This feature is used
-      // by replication to replicate table level statistics.
-      if (tbl.isSetColStats()) {
-        // We do not replicate statistics for a transactional table right now and hence we do not
-        // expect a transactional table to have column statistics here. So passing null
-        // validWriteIds is fine for now.
-        updateTableColumnStatsInternal(tbl.getColStats(), null, tbl.getWriteId());
+      // If the table has column statistics, update it into the metastore. We need a valid
+      // writeId list to update column statistics for a transactional table. But during bootstrap
+      // replication, where we use this feature, we do not have a valid writeId list which was
+      // used to update the stats. But we know for sure that the writeId associated with the
+      // stats was valid then (otherwise stats update would have failed on the source). So, craft
+      // a valid transaction list with only that writeId and use it to update the stats.
+      if (colStats != null) {
+        long writeId = tbl.getWriteId();
+        String validWriteIds = null;
+        if (writeId > 0) {
+          ValidWriteIdList validWriteIdList =
+                  new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+                                                                  tbl.getTableName()),
+                                              new long[0], new BitSet(), writeId);
+          validWriteIds = validWriteIdList.toString();
+        }
+        updateTableColumnStatsInternal(colStats, validWriteIds, tbl.getWriteId());
       }
     }
 
@@ -3476,6 +3499,9 @@ public class HiveMetaStore extends ThriftHiveMetastore {
       Map<String, String> transactionalListenerResponses = Collections.emptyMap();
       Database db = null;
 
+      List<ColumnStatistics> partsColStats = new ArrayList<>(parts.size());
+      List<Long> partsWriteIds = new ArrayList<>(parts.size());
+
       try {
         ms.openTransaction();
         tbl = ms.getTable(catName, dbName, tblName, null);
@@ -3495,6 +3521,16 @@ public class HiveMetaStore extends ThriftHiveMetastore {
         List<Partition> partitionsToAdd = new ArrayList<>(parts.size());
         List<FieldSchema> partitionKeys = tbl.getPartitionKeys();
         for (final Partition part : parts) {
+          // Collect partition column stats to be updated if present. Partition objects passed down
+          // here at the time of replication may have statistics in them, which is required to be
+          // updated in the metadata. But we don't want it to be part of the Partition object when
+          // it's being created or altered, lest it becomes part of the notification event.
+          if (part.isSetColStats()) {
+            partsColStats.add(part.getColStats());
+            part.unsetColStats();
+            partsWriteIds.add(part.getWriteId());
+          }
+
           // Iterate through the partitions and validate them. If one of the partitions is
           // incorrect, an exception will be thrown before the threads which create the partition
           // folders are submitted. This way we can be sure that no partition and no partition
@@ -3538,11 +3574,24 @@ public class HiveMetaStore extends ThriftHiveMetastore {
           }
         }
 
-        // Update partition column statistics if available
-        for (Partition newPart : newParts) {
-          if (newPart.isSetColStats()) {
-            updatePartitonColStatsInternal(tbl, newPart.getColStats(), null, newPart.getWriteId());
+        // Update partition column statistics if available. We need a valid writeId list to
+        // update column statistics for a transactional table. But during bootstrap replication,
+        // where we use this feature, we do not have a valid writeId list which was used to
+        // update the stats. But we know for sure that the writeId associated with the stats was
+        // valid then (otherwise stats update would have failed on the source). So, craft a valid
+        // transaction list with only that writeId and use it to update the stats.
+        int cnt = 0;
+        for (ColumnStatistics partColStats: partsColStats) {
+          long writeId = partsWriteIds.get(cnt++);
+          String validWriteIds = null;
+          if (writeId > 0) {
+            ValidWriteIdList validWriteIdList =
+                    new ValidReaderWriteIdList(TableName.getDbTable(tbl.getDbName(),
+                            tbl.getTableName()),
+                            new long[0], new BitSet(), writeId);
+            validWriteIds = validWriteIdList.toString();
           }
+          updatePartitonColStatsInternal(tbl, partColStats, validWriteIds, writeId);
         }
 
         success = ms.commitTransaction();
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
index f9a4e48..391915a 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
@@ -4180,8 +4180,8 @@ public class ObjectStore implements RawStore, Configurable {
       boolean isToTxn = isTxn && !TxnUtils.isTransactionalTable(oldt.getParameters());
       if (!isToTxn && isTxn && areTxnStatsSupported) {
         // Transactional table is altered without a txn. Make sure there are no changes to the flag.
-        String errorMsg = verifyStatsChangeCtx(oldt.getParameters(), newTable.getParameters(),
-            newTable.getWriteId(), queryValidWriteIds, false);
+        String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(name, dbname), oldt.getParameters(),
+                newTable.getParameters(), newTable.getWriteId(), queryValidWriteIds, false);
         if (errorMsg != null) {
           throw new MetaException(errorMsg);
         }
@@ -4238,8 +4238,8 @@ public class ObjectStore implements RawStore, Configurable {
    * Verifies that the stats JSON string is unchanged for alter table (txn stats).
    * @return Error message with the details of the change, or null if the value has not changed.
    */
-  public static String verifyStatsChangeCtx(Map<String, String> oldP, Map<String, String> newP,
-      long writeId, String validWriteIds, boolean isColStatsChange) {
+  public static String verifyStatsChangeCtx(String fullTableName, Map<String, String> oldP, Map<String, String> newP,
+                                            long writeId, String validWriteIds, boolean isColStatsChange) {
     if (validWriteIds != null && writeId > 0) return null; // We have txn context.
     String oldVal = oldP == null ? null : oldP.get(StatsSetupConst.COLUMN_STATS_ACCURATE);
     String newVal = newP == null ? null : newP.get(StatsSetupConst.COLUMN_STATS_ACCURATE);
@@ -4255,9 +4255,10 @@ public class ObjectStore implements RawStore, Configurable {
     // Some change to the stats state is being made; it can only be made with a write ID.
     // Note - we could do this:  if (writeId > 0 && (validWriteIds != null || !StatsSetupConst.areBasicStatsUptoDate(newP))) { return null;
     //       However the only way ID list can be absent is if WriteEntity wasn't generated for the alter, which is a separate bug.
-    return "Cannot change stats state for a transactional table without providing the transactional"
-        + " write state for verification (new write ID " + writeId + ", valid write IDs "
-        + validWriteIds + "; current state " + oldVal + "; new state " + newVal;
+    return "Cannot change stats state for a transactional table " + fullTableName + " without " +
+            "providing the transactional write state for verification (new write ID " +
+            writeId + ", valid write IDs " + validWriteIds + "; current state " + oldVal + "; new" +
+            " state " + newVal;
   }
 
   @Override
@@ -4319,8 +4320,9 @@ public class ObjectStore implements RawStore, Configurable {
     boolean isTxn = TxnUtils.isTransactionalTable(table.getParameters());
     if (isTxn && areTxnStatsSupported) {
       // Transactional table is altered without a txn. Make sure there are no changes to the flag.
-      String errorMsg = verifyStatsChangeCtx(oldp.getParameters(), newPart.getParameters(),
-          newPart.getWriteId(), validWriteIds, false);
+      String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, name),
+              oldp.getParameters(),
+              newPart.getParameters(), newPart.getWriteId(), validWriteIds, false);
       if (errorMsg != null) {
         throw new MetaException(errorMsg);
       }
@@ -8525,7 +8527,7 @@ public class ObjectStore implements RawStore, Configurable {
         if (!areTxnStatsSupported) {
           StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
         } else {
-          String errorMsg = verifyStatsChangeCtx(
+          String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(dbname, name),
               oldt.getParameters(), newParams, writeId, validWriteIds, true);
           if (errorMsg != null) {
             throw new MetaException(errorMsg);
@@ -8620,8 +8622,9 @@ public class ObjectStore implements RawStore, Configurable {
         if (!areTxnStatsSupported) {
           StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
         } else {
-          String errorMsg = verifyStatsChangeCtx(
-              mPartition.getParameters(), newParams, writeId, validWriteIds, true);
+          String errorMsg = verifyStatsChangeCtx(TableName.getDbTable(statsDesc.getDbName(),
+                                                                      statsDesc.getTableName()),
+                  mPartition.getParameters(), newParams, writeId, validWriteIds, true);
           if (errorMsg != null) {
             throw new MetaException(errorMsg);
           }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index 3564efe..e366ebd 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -2067,7 +2067,7 @@ public class CachedStore implements RawStore, Configurable {
       if (!areTxnStatsSupported) {
         StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
       } else {
-        String errorMsg = ObjectStore.verifyStatsChangeCtx(
+        String errorMsg = ObjectStore.verifyStatsChangeCtx(TableName.getDbTable(dbName, tblName),
                 table.getParameters(), newParams, writeId, validWriteIds, true);
         if (errorMsg != null) {
           throw new MetaException(errorMsg);
diff --git a/testutils/ptest2/conf/deployed/master-mr2.properties b/testutils/ptest2/conf/deployed/master-mr2.properties
index 9166f4a..66c23ab 100644
--- a/testutils/ptest2/conf/deployed/master-mr2.properties
+++ b/testutils/ptest2/conf/deployed/master-mr2.properties
@@ -68,7 +68,15 @@ ut.service.batchSize=8
 
 unitTests.module.itests.hive-unit=itests.hive-unit
 ut.itests.hive-unit.batchSize=9
-ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat
+ut.itests.hive-unit.skipBatching=TestAcidOnTezWithSplitUpdate TestAcidOnTez TestMTQueries \
+  TestCompactor TestSchedulerQueue TestOperationLoggingAPIWithTez TestSSL TestJdbcDriver2 \
+  TestJdbcWithMiniHA TestJdbcWithMiniMr TestReplicationScenariosIncrementalLoadAcidTables \
+  TestReplIncrementalLoadAcidTablesWithJsonMessage TestReplicationScenarios \
+  TestReplWithJsonMessageFormat TestReplWithJsonMessageFormat \
+  TestStatsReplicationScenariosACIDNoAutogather TestStatsReplicationScenariosMMNoAutogather \
+  TestStatsReplicationScenariosACID TestStatsReplicationScenariosMM \
+  TestStatsReplicationScenariosMigrationNoAutogather TestStatsReplicationScenariosMigration \
+  TestStatsReplicationScenariosNoAutogather TestStatsReplicationScenarios
 
 unitTests.module.itests.qtest=itests.qtest
 ut.itests.qtest.batchSize=9