You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/03 10:02:55 UTC

[16/16] hive git commit: HIVE-19267: Replicate ACID/MM tables write operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

HIVE-19267: Replicate ACID/MM tables write operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f519db7e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f519db7e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f519db7e

Branch: refs/heads/master
Commit: f519db7eafacb4b4d2d9fe2a9e10e908d8077224
Parents: 285a9b4
Author: Sankar Hariappan <sa...@apache.org>
Authored: Tue Jul 3 15:32:05 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Tue Jul 3 15:32:05 2018 +0530

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  209 +-
 .../listener/DummyRawStoreFailEvent.java        |   15 +
 .../listener/TestDbNotificationListener.java    |    5 +
 .../hive/ql/parse/TestReplicationScenarios.java |   72 -
 .../TestReplicationScenariosAcidTables.java     |  602 ++-
 ...TestReplicationScenariosAcrossInstances.java |   15 +-
 .../hadoop/hive/ql/parse/WarehouseInstance.java |    5 +
 .../metastore/SynchronizedMetaStoreClient.java  |    5 +
 .../apache/hadoop/hive/ql/exec/MoveTask.java    |    6 +-
 .../hadoop/hive/ql/exec/ReplCopyTask.java       |    5 +-
 .../apache/hadoop/hive/ql/exec/ReplTxnTask.java |   31 +-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  |    4 +-
 .../IncrementalLoadTasksBuilder.java            |   73 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java |   37 +-
 .../hadoop/hive/ql/io/HiveInputFormat.java      |   24 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |   18 +-
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |    7 +-
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   10 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  167 +-
 .../hadoop/hive/ql/metadata/HiveUtils.java      |   11 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |   83 +-
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java  |    8 +-
 .../ql/parse/UpdateDeleteSemanticAnalyzer.java  |   16 +-
 .../hadoop/hive/ql/parse/repl/CopyUtils.java    |    2 +-
 .../hadoop/hive/ql/parse/repl/dump/Utils.java   |    4 -
 .../repl/dump/events/CommitTxnHandler.java      |  125 +-
 .../ql/parse/repl/dump/events/EventHandler.java |   23 +-
 .../parse/repl/dump/events/InsertHandler.java   |    4 +
 .../parse/repl/load/UpdatedMetaDataTracker.java |  124 +-
 .../repl/load/message/AbortTxnHandler.java      |    7 +-
 .../repl/load/message/AllocWriteIdHandler.java  |    2 +-
 .../repl/load/message/CommitTxnHandler.java     |   78 +-
 .../parse/repl/load/message/MessageHandler.java |    8 +-
 .../parse/repl/load/message/OpenTxnHandler.java |    7 +-
 .../apache/hadoop/hive/ql/plan/MoveWork.java    |   12 +-
 .../apache/hadoop/hive/ql/plan/ReplTxnWork.java |   15 +
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 2675 +++++++-----
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.h    |  126 +
 .../ThriftHiveMetastore_server.skeleton.cpp     |    5 +
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3905 ++++++++++--------
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |  218 +-
 .../metastore/api/AddDynamicPartitions.java     |   32 +-
 .../api/AllocateTableWriteIdsRequest.java       |   68 +-
 .../api/AllocateTableWriteIdsResponse.java      |   36 +-
 .../metastore/api/ClearFileMetadataRequest.java |   32 +-
 .../hive/metastore/api/ClientCapabilities.java  |   32 +-
 .../hive/metastore/api/CommitTxnRequest.java    |  168 +-
 .../hive/metastore/api/CompactionRequest.java   |   44 +-
 .../hive/metastore/api/CreationMetadata.java    |   32 +-
 .../metastore/api/FindSchemasByColsResp.java    |   36 +-
 .../hive/metastore/api/FireEventRequest.java    |   32 +-
 .../metastore/api/GetAllFunctionsResponse.java  |   36 +-
 .../api/GetFileMetadataByExprRequest.java       |   32 +-
 .../api/GetFileMetadataByExprResult.java        |   48 +-
 .../metastore/api/GetFileMetadataRequest.java   |   32 +-
 .../metastore/api/GetFileMetadataResult.java    |   44 +-
 .../hive/metastore/api/GetTablesRequest.java    |   32 +-
 .../hive/metastore/api/GetTablesResult.java     |   36 +-
 .../metastore/api/GetValidWriteIdsRequest.java  |   32 +-
 .../metastore/api/GetValidWriteIdsResponse.java |   36 +-
 .../api/HeartbeatTxnRangeResponse.java          |   64 +-
 .../metastore/api/InsertEventRequestData.java   |  227 +-
 .../hadoop/hive/metastore/api/LockRequest.java  |   36 +-
 .../hive/metastore/api/Materialization.java     |   32 +-
 .../api/NotificationEventResponse.java          |   36 +-
 .../metastore/api/PutFileMetadataRequest.java   |   64 +-
 .../api/ReplTblWriteIdStateRequest.java         |   32 +-
 .../hive/metastore/api/SchemaVersion.java       |   36 +-
 .../hive/metastore/api/ShowCompactResponse.java |   36 +-
 .../hive/metastore/api/ShowLocksResponse.java   |   36 +-
 .../hive/metastore/api/TableValidWriteIds.java  |   32 +-
 .../hive/metastore/api/ThriftHiveMetastore.java | 3468 ++++++++++------
 .../hive/metastore/api/WMFullResourcePlan.java  |  144 +-
 .../api/WMGetAllResourcePlanResponse.java       |   36 +-
 .../WMGetTriggersForResourePlanResponse.java    |   36 +-
 .../api/WMValidateResourcePlanResponse.java     |   64 +-
 .../hive/metastore/api/WriteEventInfo.java      | 1012 +++++
 .../api/WriteNotificationLogRequest.java        |  949 +++++
 .../api/WriteNotificationLogResponse.java       |  283 ++
 .../gen-php/metastore/ThriftHiveMetastore.php   | 1630 ++++----
 .../src/gen/thrift/gen-php/metastore/Types.php  | 1630 +++++---
 .../hive_metastore/ThriftHiveMetastore-remote   |    7 +
 .../hive_metastore/ThriftHiveMetastore.py       | 1139 ++---
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  933 +++--
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   86 +-
 .../gen/thrift/gen-rb/thrift_hive_metastore.rb  |   54 +
 .../hadoop/hive/metastore/HiveMetaStore.java    |   86 +
 .../hive/metastore/HiveMetaStoreClient.java     |   10 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   16 +-
 .../hive/metastore/MetaStoreEventListener.java  |   12 +
 .../metastore/MetaStoreListenerNotifier.java    |    6 +
 .../hadoop/hive/metastore/ObjectStore.java      |   60 +
 .../apache/hadoop/hive/metastore/RawStore.java  |   14 +
 .../hive/metastore/ReplChangeManager.java       |   10 +-
 .../hive/metastore/cache/CachedStore.java       |   12 +
 .../hive/metastore/events/AcidWriteEvent.java   |   91 +
 .../metastore/messaging/AcidWriteMessage.java   |   50 +
 .../metastore/messaging/CommitTxnMessage.java   |   23 +
 .../hive/metastore/messaging/EventMessage.java  |    3 +-
 .../messaging/MessageDeserializer.java          |    9 +
 .../metastore/messaging/MessageFactory.java     |   12 +
 .../messaging/json/JSONAcidWriteMessage.java    |  150 +
 .../messaging/json/JSONCommitTxnMessage.java    |   95 +
 .../messaging/json/JSONMessageDeserializer.java |    9 +
 .../messaging/json/JSONMessageFactory.java      |    8 +
 .../model/MTxnWriteNotificationLog.java         |  123 +
 .../hive/metastore/tools/SQLGenerator.java      |    9 +
 .../hadoop/hive/metastore/txn/TxnDbUtil.java    |   28 +
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  187 +-
 .../hadoop/hive/metastore/txn/TxnStore.java     |   11 +
 .../hadoop/hive/metastore/utils/FileUtils.java  |   12 +-
 .../src/main/resources/package.jdo              |   35 +
 .../main/sql/derby/hive-schema-3.1.0.derby.sql  |   15 +
 .../main/sql/derby/hive-schema-4.0.0.derby.sql  |   15 +
 .../sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql  |    1 -
 .../sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql  |   16 +
 .../main/sql/mssql/hive-schema-3.1.0.mssql.sql  |   17 +
 .../main/sql/mssql/hive-schema-4.0.0.mssql.sql  |   17 +
 .../sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql  |   16 +
 .../main/sql/mysql/hive-schema-3.0.0.mysql.sql  |    1 -
 .../main/sql/mysql/hive-schema-3.1.0.mysql.sql  |   16 +
 .../main/sql/mysql/hive-schema-4.0.0.mysql.sql  |   16 +
 .../sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql  |    4 +-
 .../sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql  |   16 +
 .../sql/oracle/hive-schema-3.0.0.oracle.sql     |    1 -
 .../sql/oracle/hive-schema-3.1.0.oracle.sql     |   15 +
 .../sql/oracle/hive-schema-4.0.0.oracle.sql     |   15 +
 .../oracle/upgrade-2.3.0-to-3.0.0.oracle.sql    |    4 +-
 .../oracle/upgrade-3.0.0-to-3.1.0.oracle.sql    |   16 +
 .../sql/postgres/hive-schema-3.0.0.postgres.sql |    2 -
 .../sql/postgres/hive-schema-3.1.0.postgres.sql |   15 +
 .../sql/postgres/hive-schema-4.0.0.postgres.sql |   15 +
 .../upgrade-3.0.0-to-3.1.0.postgres.sql         |   16 +
 .../src/main/thrift/hive_metastore.thrift       |   30 +-
 .../DummyRawStoreControlledCommit.java          |   11 +
 .../DummyRawStoreForJdoConnection.java          |   10 +
 .../HiveMetaStoreClientPreCatalog.java          |   10 +-
 137 files changed, 15896 insertions(+), 7205 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 6321f9b..717cc8a 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -23,6 +23,7 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -75,11 +76,14 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
 import org.apache.hadoop.hive.metastore.tools.SQLGenerator;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -269,10 +273,16 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     public PartitionFiles next() {
       try {
         Partition p = partitionIter.next();
-        List<String> files = Lists.newArrayList(new FileIterator(p.getSd().getLocation()));
+        Iterator<String> fileIterator;
+        //For transactional tables, the actual file copy will be done by acid write event during replay of commit txn.
+        if (!TxnUtils.isTransactionalTable(t)) {
+          List<String> files = Lists.newArrayList(new FileIterator(p.getSd().getLocation()));
+          fileIterator = files.iterator();
+        } else {
+          fileIterator = Collections.emptyIterator();
+        }
         PartitionFiles partitionFiles =
-            new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()),
-            files.iterator());
+            new PartitionFiles(Warehouse.makePartName(t.getPartitionKeys(), p.getValues()), fileIterator);
         return partitionFiles;
       } catch (MetaException e) {
         throw new RuntimeException(e);
@@ -414,10 +424,15 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   class FileChksumIterator implements Iterator<String> {
     private List<String> files;
     private List<String> chksums;
+    private List<String> subDirs;
     int i = 0;
     FileChksumIterator(List<String> files, List<String> chksums) {
+      this(files, chksums, null);
+    }
+    FileChksumIterator(List<String> files, List<String> chksums, List<String> subDirs) {
       this.files = files;
       this.chksums = chksums;
+      this.subDirs = subDirs;
     }
     @Override
     public boolean hasNext() {
@@ -428,7 +443,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     public String next() {
       String result;
       try {
-        result = ReplChangeManager.encodeFileUri(files.get(i), chksums != null ? chksums.get(i) : null, null);
+        result = ReplChangeManager.encodeFileUri(files.get(i), chksums != null ? chksums.get(i) : null,
+                subDirs != null ? subDirs.get(i) : null);
       } catch (IOException e) {
         // File operations failed
         LOG.error("Encoding file URI failed with error " + e.getMessage());
@@ -623,6 +639,23 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     }
   }
 
+  @Override
+  public void onAcidWrite(AcidWriteEvent acidWriteEvent, Connection dbConn, SQLGenerator sqlGenerator)
+          throws MetaException {
+    AcidWriteMessage msg = msgFactory.buildAcidWriteMessage(acidWriteEvent,
+            new FileChksumIterator(acidWriteEvent.getFiles(), acidWriteEvent.getChecksums(),
+                    acidWriteEvent.getSubDirs()));
+    NotificationEvent event = new NotificationEvent(0, now(), EventType.ACID_WRITE.toString(), msg.toString());
+    event.setMessageFormat(msgFactory.getMessageFormat());
+    event.setDbName(acidWriteEvent.getDatabase());
+    event.setTableName(acidWriteEvent.getTable());
+    try {
+      addWriteNotificationLog(event, acidWriteEvent, dbConn, sqlGenerator, msg);
+    } catch (SQLException e) {
+      throw new MetaException("Unable to add write notification log " + StringUtils.stringifyException(e));
+    }
+  }
+
   private int now() {
     long millis = System.currentTimeMillis();
     millis /= 1000;
@@ -634,12 +667,133 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     return (int)millis;
   }
 
+  /**
+   * Close statement instance.
+   * @param stmt statement instance.
+   */
+  private static void closeStmt(Statement stmt) {
+    try {
+      if (stmt != null && !stmt.isClosed()) {
+        stmt.close();
+      }
+    } catch (SQLException e) {
+      LOG.warn("Failed to close statement " + e.getMessage());
+    }
+  }
+
+  /**
+   * Close the ResultSet.
+   * @param rs may be {@code null}
+   */
+  private static void close(ResultSet rs) {
+    try {
+      if (rs != null && !rs.isClosed()) {
+        rs.close();
+      }
+    } catch(SQLException ex) {
+      LOG.warn("Failed to close result set " + ex.getMessage());
+    }
+  }
+
+  private long getNextNLId(Statement stmt, SQLGenerator sqlGenerator, String sequence)
+          throws SQLException, MetaException {
+    String s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " +
+            "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " + quoteString(sequence));
+    LOG.debug("Going to execute query <" + s + ">");
+    ResultSet rs = null;
+    try {
+      rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        throw new MetaException("Transaction database not properly configured, can't find next NL id.");
+      }
+
+      long nextNLId = rs.getLong(1);
+      long updatedNLId = nextNLId + 1;
+      s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " +
+              quoteString(sequence);
+      LOG.debug("Going to execute update <" + s + ">");
+      stmt.executeUpdate(s);
+      return nextNLId;
+    }finally {
+      close(rs);
+    }
+  }
+
+  private void addWriteNotificationLog(NotificationEvent event, AcidWriteEvent acidWriteEvent, Connection dbConn,
+                                 SQLGenerator sqlGenerator, AcidWriteMessage msg) throws MetaException, SQLException {
+    LOG.debug("DbNotificationListener: adding write notification log for : {}", event.getMessage());
+    assert ((dbConn != null) && (sqlGenerator != null));
+
+    Statement stmt =null;
+    ResultSet rs = null;
+    String dbName = acidWriteEvent.getDatabase();
+    String tblName = acidWriteEvent.getTable();
+    String partition = acidWriteEvent.getPartition();
+    String tableObj = msg.getTableObjStr();
+    String partitionObj = msg.getPartitionObjStr();
+    String files = ReplChangeManager.joinWithSeparator(msg.getFiles());
+
+    try {
+      stmt = dbConn.createStatement();
+      if (sqlGenerator.getDbProduct() == MYSQL) {
+        stmt.execute("SET @@session.sql_mode=ANSI_QUOTES");
+      }
+
+      String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" +
+                      " \"TXN_WRITE_NOTIFICATION_LOG\" " +
+                      "where \"WNL_DATABASE\" = " + quoteString(dbName) +
+                      "and \"WNL_TABLE\" = " + quoteString(tblName) +  " and \"WNL_PARTITION\" = " +
+                      quoteString(partition) + " and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId()));
+      LOG.debug("Going to execute query <" + s + ">");
+      rs = stmt.executeQuery(s);
+      if (!rs.next()) {
+        // if rs is empty then no lock is taken and thus it can not cause deadlock.
+        long nextNLId = getNextNLId(stmt, sqlGenerator,
+                "org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog");
+        s = "insert into \"TXN_WRITE_NOTIFICATION_LOG\" (\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\"," +
+                " \"WNL_DATABASE\", \"WNL_TABLE\"," +
+                " \"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", \"WNL_FILES\", \"WNL_EVENT_TIME\")" +
+                " values (" + nextNLId
+                + "," + acidWriteEvent.getTxnId() +  "," + acidWriteEvent.getWriteId()+  "," +
+                quoteString(dbName)+  "," +  quoteString(tblName)+  "," + quoteString(partition)+  "," +
+                quoteString(tableObj)+  "," + quoteString(partitionObj) +  "," +  quoteString(files)+
+                "," +  now() + ")";
+        LOG.info("Going to execute insert <" + s + ">");
+        stmt.execute(sqlGenerator.addEscapeCharacters(s));
+      } else {
+        String existingFiles = rs.getString(1);
+        if (existingFiles.contains(sqlGenerator.addEscapeCharacters(files))) {
+          // If list of files are already present then no need to update it again. This scenario can come in case of
+          // retry done to the meta store for the same operation.
+          LOG.info("file list " + files + " already present");
+          return;
+        }
+        long nlId = rs.getLong(2);
+        files = ReplChangeManager.joinWithSeparator(Lists.newArrayList(files, existingFiles));
+        s = "update \"TXN_WRITE_NOTIFICATION_LOG\" set \"WNL_TABLE_OBJ\" = " +  quoteString(tableObj) + "," +
+                " \"WNL_PARTITION_OBJ\" = " + quoteString(partitionObj) + "," +
+                " \"WNL_FILES\" = " + quoteString(files) + "," +
+                " \"WNL_EVENT_TIME\" = " + now() +
+                " where \"WNL_ID\" = " + nlId;
+        LOG.info("Going to execute update <" + s + ">");
+        stmt.executeUpdate(sqlGenerator.addEscapeCharacters(s));
+      }
+    } catch (SQLException e) {
+      LOG.warn("failed to add write notification log" + e.getMessage());
+      throw e;
+    } finally {
+      closeStmt(stmt);
+      close(rs);
+    }
+  }
+
   static String quoteString(String input) {
     return "'" + input + "'";
   }
 
   private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent, Connection dbConn,
                                   SQLGenerator sqlGenerator) throws MetaException, SQLException {
+    LOG.debug("DbNotificationListener: adding notification log for : {}", event.getMessage());
     if ((dbConn == null) || (sqlGenerator == null)) {
       LOG.info("connection or sql generator is not set so executing sql via DN");
       process(event, listenerEvent);
@@ -669,22 +823,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       LOG.debug("Going to execute update <" + s + ">");
       stmt.executeUpdate(s);
 
-      s = sqlGenerator.addForUpdateClause("select \"NEXT_VAL\" from " +
-              "\"SEQUENCE_TABLE\" where \"SEQUENCE_NAME\" = " +
-              " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'");
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        throw new MetaException("failed to get next NEXT_VAL from SEQUENCE_TABLE");
-      }
-
-      long nextNLId = rs.getLong(1);
-      long updatedNLId = nextNLId + 1;
-      s = "update \"SEQUENCE_TABLE\" set \"NEXT_VAL\" = " + updatedNLId + " where \"SEQUENCE_NAME\" = " +
-
-              " 'org.apache.hadoop.hive.metastore.model.MNotificationLog'";
-      LOG.debug("Going to execute update <" + s + ">");
-      stmt.executeUpdate(s);
+      long nextNLId = getNextNLId(stmt, sqlGenerator,
+              "org.apache.hadoop.hive.metastore.model.MNotificationLog");
 
       List<String> insert = new ArrayList<>();
 
@@ -712,20 +852,8 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       LOG.warn("failed to add notification log" + e.getMessage());
       throw e;
     } finally {
-      if (stmt != null && !stmt.isClosed()) {
-        try {
-          stmt.close();
-        } catch (SQLException e) {
-          LOG.warn("Failed to close statement " + e.getMessage());
-        }
-      }
-      if (rs != null && !rs.isClosed()) {
-        try {
-          rs.close();
-        } catch (SQLException e) {
-          LOG.warn("Failed to close result set " + e.getMessage());
-        }
-      }
+      closeStmt(stmt);
+      close(rs);
     }
   }
 
@@ -742,12 +870,12 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
         event.getMessage());
     HMSHandler.getMSForConf(conf).addNotificationEvent(event);
 
-      // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
-      if (event.isSetEventId()) {
-        listenerEvent.putParameter(
-            MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
-            Long.toString(event.getEventId()));
-      }
+    // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
+    if (event.isSetEventId()) {
+      listenerEvent.putParameter(
+          MetaStoreEventListenerConstants.DB_NOTIFICATION_EVENT_ID_KEY_NAME,
+          Long.toString(event.getEventId()));
+    }
   }
 
   private static class CleanerThread extends Thread {
@@ -768,6 +896,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       while (true) {
         try {
           rs.cleanNotificationEvents(ttl);
+          rs.cleanWriteNotificationEvents(ttl);
         } catch (Exception ex) {
           //catching exceptions here makes sure that the thread doesn't die in case of unexpected
           //exceptions

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
index abf67a8..b4b118e 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/DummyRawStoreFailEvent.java
@@ -89,6 +89,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
 import org.apache.hadoop.hive.metastore.api.WMNullablePool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
 import org.apache.thrift.TException;
@@ -880,6 +881,20 @@ public class DummyRawStoreFailEvent implements RawStore, Configurable {
   }
 
   @Override
+  public void cleanWriteNotificationEvents(int olderThan) {
+    if (!shouldEventSucceed) {
+      //throw exception to simulate an issue with cleaner thread
+      throw new RuntimeException("Dummy exception while cleaning write notifications");
+    }
+    objectStore.cleanWriteNotificationEvents(olderThan);
+  }
+
+  @Override
+  public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+    return objectStore.getAllWriteEventInfo(txnId, dbName, tableName);
+  }
+
+  @Override
   public CurrentNotificationEventId getCurrentNotificationEventId() {
     return objectStore.getCurrentNotificationEventId();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index eef917e..82429e3 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -75,6 +75,7 @@ import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
@@ -238,6 +239,10 @@ public class TestDbNotificationListener {
     public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws MetaException {
       pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent);
     }
+
+    public void onAcidWrite(AcidWriteEvent acidWriteEvent) throws MetaException {
+      pushEventId(EventType.ACID_WRITE, acidWriteEvent);
+    }
   }
 
   @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
index 46c623d..c82a933 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenarios.java
@@ -2833,78 +2833,6 @@ public class TestReplicationScenarios {
     verifyRun("SELECT max(a) from " + replDbName + ".ptned2 where b=1", new String[]{"8"}, driverMirror);
   }
 
-  // TODO: This test should be removed once ACID tables replication is supported.
-  @Test
-  public void testSkipTables() throws Exception {
-    String testName = "skipTables";
-    String dbName = createDB(testName, driver);
-    String replDbName = dbName + "_dupe";
-
-    // TODO: this is wrong; this test sets up dummy txn manager and so it cannot create ACID tables.
-    //       If I change it to use proper txn manager, the setup for some tests hangs.
-    //       This used to work by accident, now this works due a test flag. The test needs to be fixed.
-    // Create table
-    run("CREATE TABLE " + dbName + ".acid_table (key int, value int) PARTITIONED BY (load_date date) " +
-        "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
-    run("CREATE TABLE " + dbName + ".mm_table (key int, value int) PARTITIONED BY (load_date date) " +
-        "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'," +
-        " 'transactional_properties'='insert_only')", driver);
-    verifyIfTableExist(dbName, "acid_table", metaStoreClient);
-    verifyIfTableExist(dbName, "mm_table", metaStoreClient);
-
-    // Bootstrap test
-    Tuple bootstrapDump = bootstrapLoadAndVerify(dbName, replDbName);
-    String replDumpId = bootstrapDump.lastReplId;
-    verifyIfTableNotExist(replDbName, "acid_table", metaStoreClientMirror);
-    verifyIfTableNotExist(replDbName, "mm_table", metaStoreClientMirror);
-
-    // Test alter table
-    run("ALTER TABLE " + dbName + ".acid_table RENAME TO " + dbName + ".acid_table_rename", driver);
-    verifyIfTableExist(dbName, "acid_table_rename", metaStoreClient);
-
-    // Dummy create table command to mark proper last repl ID after dump
-    run("CREATE TABLE " + dbName + ".dummy (a int)", driver);
-
-    // Perform REPL-DUMP/LOAD
-    Tuple incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName);
-    replDumpId = incrementalDump.lastReplId;
-    verifyIfTableNotExist(replDbName, "acid_table_rename", metaStoreClientMirror);
-
-    // Create another table for incremental repl verification
-    run("CREATE TABLE " + dbName + ".acid_table_incremental (key int, value int) PARTITIONED BY (load_date date) " +
-        "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
-    run("CREATE TABLE " + dbName + ".mm_table_incremental (key int, value int) PARTITIONED BY (load_date date) " +
-        "CLUSTERED BY(key) INTO 2 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true'," +
-        " 'transactional_properties'='insert_only')", driver);
-    verifyIfTableExist(dbName, "acid_table_incremental", metaStoreClient);
-    verifyIfTableExist(dbName, "mm_table_incremental", metaStoreClient);
-
-    // Dummy insert into command to mark proper last repl ID after dump
-    run("INSERT INTO " + dbName + ".dummy values(1)", driver);
-
-    // Perform REPL-DUMP/LOAD
-    incrementalDump = incrementalLoadAndVerify(dbName, replDumpId, replDbName);
-    replDumpId = incrementalDump.lastReplId;
-    verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror);
-    verifyIfTableNotExist(replDbName, "mm_table_incremental", metaStoreClientMirror);
-
-    // Test adding a constraint
-    run("ALTER TABLE " + dbName + ".acid_table_incremental ADD CONSTRAINT key_pk PRIMARY KEY (key) DISABLE NOVALIDATE", driver);
-    try {
-      List<SQLPrimaryKey> pks = metaStoreClient.getPrimaryKeys(new PrimaryKeysRequest(dbName, "acid_table_incremental"));
-      assertEquals(pks.size(), 1);
-    } catch (TException te) {
-      assertNull(te);
-    }
-
-    // Dummy insert into command to mark proper last repl ID after dump
-    run("INSERT INTO " + dbName + ".dummy values(2)", driver);
-
-    // Perform REPL-DUMP/LOAD
-    incrementalLoadAndVerify(dbName, replDumpId, replDbName);
-    verifyIfTableNotExist(replDbName, "acid_table_incremental", metaStoreClientMirror);
-  }
-
   @Test
   public void testDeleteStagingDir() throws IOException {
     String testName = "deleteStagingDir";

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 86c0405..8c683cf 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.junit.rules.TestName;
+
 import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,6 +54,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import javax.annotation.Nullable;
+import java.util.Collections;
+import com.google.common.collect.Lists;
 
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables
@@ -66,8 +69,13 @@ public class TestReplicationScenariosAcidTables {
 
   protected static final Logger LOG = LoggerFactory.getLogger(TestReplicationScenarios.class);
   private static WarehouseInstance primary, replica, replicaNonAcid;
-  private String primaryDbName, replicatedDbName;
   private static HiveConf conf;
+  private String primaryDbName, replicatedDbName, primaryDbNameExtra;
+  private enum OperationType {
+    REPL_TEST_ACID_INSERT, REPL_TEST_ACID_INSERT_SELECT, REPL_TEST_ACID_CTAS,
+    REPL_TEST_ACID_INSERT_OVERWRITE, REPL_TEST_ACID_INSERT_IMPORT, REPL_TEST_ACID_INSERT_LOADLOCAL,
+    REPL_TEST_ACID_INSERT_UNION
+  }
 
   @BeforeClass
   public static void classLevelSetup() throws Exception {
@@ -80,9 +88,13 @@ public class TestReplicationScenariosAcidTables {
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put("hive.support.concurrency", "true");
         put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager");
-        put("hive.repl.dump.include.acid.tables", "true");
         put("hive.metastore.client.capability.check", "false");
         put("hive.repl.bootstrap.dump.open.txn.timeout", "1s");
+        put("hive.exec.dynamic.partition.mode", "nonstrict");
+        put("hive.strict.checks.bucketing", "false");
+        put("hive.mapred.mode", "nonstrict");
+        put("mapred.input.dir.recursive", "true");
+        put("hive.metastore.disallow.incompatible.col.type.changes", "false");
     }};
     primary = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
     replica = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf);
@@ -90,7 +102,6 @@ public class TestReplicationScenariosAcidTables {
         put("fs.defaultFS", miniDFSCluster.getFileSystem().getUri().toString());
         put("hive.support.concurrency", "false");
         put("hive.txn.manager", "org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager");
-        put("hive.repl.dump.include.acid.tables", "true");
         put("hive.metastore.client.capability.check", "false");
     }};
     replicaNonAcid = new WarehouseInstance(LOG, miniDFSCluster, overridesForHiveConf1);
@@ -109,6 +120,9 @@ public class TestReplicationScenariosAcidTables {
     replicatedDbName = "replicated_" + primaryDbName;
     primary.run("create database " + primaryDbName + " WITH DBPROPERTIES ( '" +
             SOURCE_OF_REPLICATION + "' = '1,2,3')");
+    primaryDbNameExtra = primaryDbName+"_extra";
+    primary.run("create database " + primaryDbNameExtra + " WITH DBPROPERTIES ( '" +
+            SOURCE_OF_REPLICATION + "' = '1,2,3')");
   }
 
   @After
@@ -116,6 +130,7 @@ public class TestReplicationScenariosAcidTables {
     primary.run("drop database if exists " + primaryDbName + " cascade");
     replica.run("drop database if exists " + replicatedDbName + " cascade");
     replicaNonAcid.run("drop database if exists " + replicatedDbName + " cascade");
+    primary.run("drop database if exists " + primaryDbName + "_extra cascade");
   }
 
   @Test
@@ -482,4 +497,585 @@ public class TestReplicationScenariosAcidTables {
     primary.run("DROP TABLE " + dbName + ".normal");
     primary.run("drop database " + dbName);
   }
+
+  @Test
+  public void testAcidTableIncrementalReplication() throws Throwable {
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId);
+    List<String> selectStmtList = new ArrayList<>();
+    List<String[]> expectedValues = new ArrayList<>();
+
+    appendInsert(selectStmtList, expectedValues);
+    appendDelete(selectStmtList, expectedValues);
+    appendUpdate(selectStmtList, expectedValues);
+    appendTruncate(selectStmtList, expectedValues);
+    appendInsertIntoFromSelect(selectStmtList, expectedValues);
+    appendMerge(selectStmtList, expectedValues);
+    appendCreateAsSelect(selectStmtList, expectedValues);
+    appendImport(selectStmtList, expectedValues);
+    appendInsertOverwrite(selectStmtList, expectedValues);
+    //appendLoadLocal(selectStmtList, expectedValues);
+    appendInsertUnion(selectStmtList, expectedValues);
+    appendMultiStatementTxn(selectStmtList, expectedValues);
+    appendMultiStatementTxnUpdateDelete(selectStmtList, expectedValues);
+
+    verifyIncrementalLoad(selectStmtList, expectedValues, bootStrapDump.lastReplicationId);
+  }
+
+  private void appendInsert(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testInsert";
+    String tableNameMM = tableName + "_MM";
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  private void appendDelete(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testDelete";
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    deleteRecords(tableName);
+    selectStmtList.add("select count(*) from " + tableName);
+    expectedValues.add(new String[] {"0"});
+  }
+
+  private void appendUpdate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testUpdate";
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    updateRecords(tableName);
+    selectStmtList.add("select value from " + tableName + " order by value");
+    expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
+  }
+
+  private void appendTruncate(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testTruncate";
+    String tableNameMM = tableName + "_MM";
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    truncateTable(primaryDbName, tableName);
+    selectStmtList.add("select count(*) from " + tableName);
+    expectedValues.add(new String[] {"0"});
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    truncateTable(primaryDbName, tableNameMM);
+    selectStmtList.add("select count(*) from " + tableNameMM);
+    expectedValues.add(new String[] {"0"});
+  }
+
+  private void appendInsertIntoFromSelect(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testInsertIntoFromSelect";
+    String tableNameMM =tableName + "_MM";
+    String tableNameSelect = testName.getMethodName() + "_Select";
+    String tableNameSelectMM = testName.getMethodName() + "_SelectMM";
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableName, tableNameSelect, false, OperationType.REPL_TEST_ACID_INSERT_SELECT);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameSelect + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableNameMM, tableNameSelectMM, true, OperationType.REPL_TEST_ACID_INSERT_SELECT);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameSelectMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  private void appendMerge(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testMerge";
+    String tableNameMerge = testName.getMethodName() + "_Merge";
+
+    insertForMerge(tableName, tableNameMerge, false);
+    selectStmtList.add("select last_update_user from " + tableName + " order by last_update_user");
+    expectedValues.add(new String[] {"creation", "creation", "creation", "creation", "creation",
+            "creation", "creation", "merge_update", "merge_insert", "merge_insert"});
+    selectStmtList.add("select ID from " + tableNameMerge + " order by ID");
+    expectedValues.add(new String[] {"1", "4", "7", "8", "8", "11"});
+  }
+
+  private void appendCreateAsSelect(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testCreateAsSelect";
+    String tableNameMM = tableName + "_MM";
+    String tableNameCTAS = testName.getMethodName() + "_CTAS";
+    String tableNameCTASMM = testName.getMethodName() + "_CTASMM";
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableName, tableNameCTAS, false, OperationType.REPL_TEST_ACID_CTAS);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameCTAS + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableNameMM, tableNameCTASMM, true, OperationType.REPL_TEST_ACID_CTAS);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameCTASMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  private void appendImport(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testImport";
+    String tableNameMM = tableName + "_MM";
+    String tableNameImport = testName.getMethodName() + "_Import";
+    String tableNameImportMM = testName.getMethodName() + "_ImportMM";
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableName, tableNameImport, false, OperationType.REPL_TEST_ACID_INSERT_IMPORT);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameImport + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableNameMM, tableNameImportMM, true, OperationType.REPL_TEST_ACID_INSERT_IMPORT);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameImportMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  private void appendInsertOverwrite(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testInsertOverwrite";
+    String tableNameOW = testName.getMethodName() +"_OW";
+    String tableNameMM = tableName + "_MM";
+    String tableNameOWMM = testName.getMethodName() +"_OWMM";
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableName, tableNameOW, false, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameOW + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableNameMM, tableNameOWMM, true, OperationType.REPL_TEST_ACID_INSERT_OVERWRITE);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameOWMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  //TODO: need to check why its failing. Loading to acid table from local path is failing.
+  private void appendLoadLocal(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testLoadLocal";
+    String tableNameLL = testName.getMethodName() +"_LL";
+    String tableNameMM = tableName + "_MM";
+    String tableNameLLMM = testName.getMethodName() +"_LLMM";
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableName, tableNameLL, false, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameLL + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableNameMM, tableNameLLMM, true, OperationType.REPL_TEST_ACID_INSERT_LOADLOCAL);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+    selectStmtList.add("select key from " + tableNameLLMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  private void appendInsertUnion(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testInsertUnion";
+    String tableNameUnion = testName.getMethodName() +"_UNION";
+    String tableNameMM = tableName + "_MM";
+    String tableNameUnionMM = testName.getMethodName() +"_UNIONMM";
+    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+    String[] resultArrayUnion = new String[]{"1", "1", "2", "2", "3", "3", "4", "4", "5", "5"};
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableName, tableNameUnion, false, OperationType.REPL_TEST_ACID_INSERT_UNION);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(resultArray);
+    selectStmtList.add( "select key from " + tableNameUnion + " order by key");
+    expectedValues.add(resultArrayUnion);
+    selectStmtList.add("select key from " + tableName + "_nopart" + " order by key");
+    expectedValues.add(resultArray);
+    selectStmtList.add("select key from " + tableNameUnion + "_nopart" + " order by key");
+    expectedValues.add(resultArrayUnion);
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    insertRecords(tableNameMM, tableNameUnionMM, true, OperationType.REPL_TEST_ACID_INSERT_UNION);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(resultArray);
+    selectStmtList.add( "select key from " + tableNameUnionMM + " order by key");
+    expectedValues.add(resultArrayUnion);
+    selectStmtList.add("select key from " + tableNameMM + "_nopart" + " order by key");
+    expectedValues.add(resultArray);
+    selectStmtList.add("select key from " + tableNameUnionMM + "_nopart" + " order by key");
+    expectedValues.add(resultArrayUnion);
+  }
+
+  private void appendMultiStatementTxn(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable {
+    String tableName = testName.getMethodName() + "testMultiStatementTxn";
+    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+    String tableNameMM = tableName + "_MM";
+    String tableProperty = "'transactional'='true'";
+
+    insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true);
+    selectStmtList.add("select key from " + tableName + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+
+    tableProperty = setMMtableProperty(tableProperty);
+    insertIntoDB(primaryDbName, tableNameMM, tableProperty, resultArray, true);
+    selectStmtList.add("select key from " + tableNameMM + " order by key");
+    expectedValues.add(new String[]{"1", "2", "3", "4", "5"});
+  }
+
+  private void appendMultiStatementTxnUpdateDelete(List<String> selectStmtList, List<String[]> expectedValues)
+          throws Throwable {
+    String tableName = testName.getMethodName() + "testMultiStatementTxnUpdate";
+    String tableNameDelete = testName.getMethodName() + "testMultiStatementTxnDelete";
+    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+    String tableProperty = "'transactional'='true'";
+
+    insertIntoDB(primaryDbName, tableName, tableProperty, resultArray, true);
+    updateRecords(tableName);
+    selectStmtList.add("select value from " + tableName + " order by value");
+    expectedValues.add(new String[] {"1", "100", "100", "100", "100"});
+
+    insertIntoDB(primaryDbName, tableNameDelete, tableProperty, resultArray, true);
+    deleteRecords(tableNameDelete);
+    selectStmtList.add("select count(*) from " + tableNameDelete);
+    expectedValues.add(new String[] {"0"});
+  }
+
+  @Test
+  public void testReplCM() throws Throwable {
+    String tableName = testName.getMethodName();
+    String tableNameMM = testName.getMethodName() + "_MM";
+    String[] result = new String[]{"5"};
+
+    WarehouseInstance.Tuple incrementalDump;
+    WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
+    replica.load(replicatedDbName, bootStrapDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName)
+            .verifyResult(bootStrapDump.lastReplicationId);
+
+    insertRecords(tableName, null, false, OperationType.REPL_TEST_ACID_INSERT);
+    incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    truncateTable(primaryDbName, tableName);
+    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+    verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableName,
+                                              "select count(*) from " + tableName + "_nopart"),
+                            Lists.newArrayList(result, result));
+
+    insertRecords(tableNameMM, null, true, OperationType.REPL_TEST_ACID_INSERT);
+    incrementalDump = primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
+    truncateTable(primaryDbName, tableNameMM);
+    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+    verifyResultsInReplica(Lists.newArrayList("select count(*) from " + tableNameMM,
+            "select count(*) from " + tableNameMM + "_nopart"),
+            Lists.newArrayList(result, result));
+  }
+
+  @Test
+  public void testMultiDBTxn() throws Throwable {
+    String tableName = testName.getMethodName();
+    String dbName1 = tableName + "_db1";
+    String dbName2 = tableName + "_db2";
+    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+    String tableProperty = "'transactional'='true'";
+    String txnStrStart = "START TRANSACTION";
+    String txnStrCommit = "COMMIT";
+
+    WarehouseInstance.Tuple incrementalDump;
+    primary.run("alter database default set dbproperties ('repl.source.for' = '1, 2, 3')");
+    WarehouseInstance.Tuple bootStrapDump = primary.dump("`*`", null);
+
+    primary.run("use " + primaryDbName)
+          .run("create database " + dbName1 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
+          .run("create database " + dbName2 + " WITH DBPROPERTIES ( '" + SOURCE_OF_REPLICATION + "' = '1,2,3')")
+          .run("CREATE TABLE " + dbName1 + "." + tableName + " (key int, value int) PARTITIONED BY (load_date date) " +
+                  "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+          .run("use " + dbName1)
+          .run("SHOW TABLES LIKE '" + tableName + "'")
+          .verifyResult(tableName)
+          .run("CREATE TABLE " + dbName2 + "." + tableName + " (key int, value int) PARTITIONED BY (load_date date) " +
+                  "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+          .run("use " + dbName2)
+          .run("SHOW TABLES LIKE '" + tableName + "'")
+          .verifyResult(tableName)
+          .run(txnStrStart)
+          .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)")
+          .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)")
+          .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)")
+          .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)")
+          .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)")
+          .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)")
+          .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)")
+          .run("INSERT INTO " + dbName1 + "." + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)")
+          .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)")
+          .run("INSERT INTO " + dbName2 + "." + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)")
+          .run("select key from " + dbName2 + "." + tableName + " order by key")
+          .verifyResults(resultArray)
+          .run("select key from " + dbName1 + "." + tableName + " order by key")
+          .verifyResults(resultArray)
+          .run(txnStrCommit);
+
+    incrementalDump = primary.dump("`*`", bootStrapDump.lastReplicationId);
+
+    // Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
+    // we are not able to create multiple embedded derby instances for two different MetaStore instances.
+    primary.run("drop database " + primaryDbName + " cascade");
+    primary.run("drop database " + dbName1 + " cascade");
+    primary.run("drop database " + dbName2 + " cascade");
+    //End of additional steps
+
+    replica.loadWithoutExplain("", bootStrapDump.dumpLocation)
+            .run("REPL STATUS default")
+            .verifyResult(bootStrapDump.lastReplicationId);
+
+    replica.loadWithoutExplain("", incrementalDump.dumpLocation)
+          .run("REPL STATUS " + dbName1)
+          .run("select key from " + dbName1 + "." + tableName + " order by key")
+          .verifyResults(resultArray)
+          .run("select key from " + dbName2 + "." + tableName + " order by key")
+          .verifyResults(resultArray);
+
+    replica.run("drop database " + primaryDbName + " cascade");
+    replica.run("drop database " + dbName1 + " cascade");
+    replica.run("drop database " + dbName2 + " cascade");
+  }
+
+  private void verifyResultsInReplica(List<String> selectStmtList, List<String[]> expectedValues) throws Throwable  {
+    for (int idx = 0; idx < selectStmtList.size(); idx++) {
+      replica.run("use " + replicatedDbName)
+              .run(selectStmtList.get(idx))
+              .verifyResults(expectedValues.get(idx));
+    }
+  }
+
+  private WarehouseInstance.Tuple verifyIncrementalLoad(List<String> selectStmtList,
+                                                  List<String[]> expectedValues, String lastReplId) throws Throwable {
+    WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, lastReplId);
+    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+    verifyResultsInReplica(selectStmtList, expectedValues);
+
+    replica.loadWithoutExplain(replicatedDbName, incrementalDump.dumpLocation)
+            .run("REPL STATUS " + replicatedDbName).verifyResult(incrementalDump.lastReplicationId);
+    verifyResultsInReplica(selectStmtList, expectedValues);
+    return incrementalDump;
+  }
+
+  private void deleteRecords(String tableName) throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("delete from " + tableName)
+            .run("select count(*) from " + tableName)
+            .verifyResult("0");
+  }
+
+  private void updateRecords(String tableName) throws Throwable {
+    primary.run("use " + primaryDbName)
+            .run("update " + tableName + " set value = 100 where key >= 2")
+            .run("select value from " + tableName + " order by value")
+            .verifyResults(new String[] {"1", "100", "100", "100", "100"});
+  }
+
+  private void truncateTable(String dbName, String tableName) throws Throwable {
+    primary.run("use " + dbName)
+            .run("truncate table " + tableName)
+            .run("select count(*) from " + tableName)
+            .verifyResult("0")
+            .run("truncate table " + tableName + "_nopart")
+            .run("select count(*) from " + tableName + "_nopart")
+            .verifyResult("0");
+  }
+
+  private WarehouseInstance.Tuple verifyLoad(String tableName, String tableNameOp, String lastReplId) throws Throwable {
+    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+    if (tableNameOp == null) {
+      return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key",
+              "select key from " + tableName + "_nopart order by key"),
+              Lists.newArrayList(resultArray, resultArray), lastReplId);
+    }
+    return verifyIncrementalLoad(Lists.newArrayList("select key from " + tableName + " order by key",
+                                                    "select key from " + tableNameOp + " order by key",
+                                                    "select key from " + tableName + "_nopart" + " order by key",
+                                                    "select key from " + tableNameOp + "_nopart" + " order by key"),
+                    Lists.newArrayList(resultArray, resultArray, resultArray, resultArray), lastReplId);
+  }
+
+  private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray, boolean isTxn)
+          throws Throwable {
+    String txnStrStart = "START TRANSACTION";
+    String txnStrCommit = "COMMIT";
+    if (!isTxn) {
+      txnStrStart = "use " + dbName; //dummy
+      txnStrCommit = "use " + dbName; //dummy
+    }
+    primary.run("use " + dbName);
+    primary.run("CREATE TABLE " + tableName + " (key int, value int) PARTITIONED BY (load_date date) " +
+            "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+            .run("SHOW TABLES LIKE '" + tableName + "'")
+            .verifyResult(tableName)
+            .run("CREATE TABLE " + tableName + "_nopart (key int, value int) " +
+                    "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+            .run("SHOW TABLES LIKE '" + tableName + "_nopart'")
+            .run("ALTER TABLE " + tableName + " ADD PARTITION (load_date='2016-03-03')")
+            .run(txnStrStart)
+            .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (1, 1)")
+            .run("INSERT INTO " + tableName + " partition (load_date='2016-03-01') VALUES (2, 2)")
+            .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (3, 3)")
+            .run("INSERT INTO " + tableName + " partition (load_date='2016-03-03') VALUES (4, 4)")
+            .run("INSERT INTO " + tableName + " partition (load_date='2016-03-02') VALUES (5, 5)")
+            .run("select key from " + tableName + " order by key")
+            .verifyResults(resultArray)
+            .run("INSERT INTO " + tableName + "_nopart (key, value) select key, value from " + tableName)
+            .run("select key from " + tableName + "_nopart" + " order by key")
+            .verifyResults(resultArray)
+            .run(txnStrCommit);
+  }
+
+  private void insertIntoDB(String dbName, String tableName, String tableProperty, String[] resultArray)
+          throws Throwable {
+    insertIntoDB(dbName, tableName, tableProperty, resultArray, false);
+  }
+
+  private void insertRecords(String tableName, String tableNameOp, boolean isMMTable,
+                             OperationType opType) throws Throwable {
+    insertRecordsIntoDB(primaryDbName, tableName, tableNameOp, isMMTable, opType);
+  }
+
+  private void insertRecordsIntoDB(String DbName, String tableName, String tableNameOp, boolean isMMTable,
+                             OperationType opType) throws Throwable {
+    String[] resultArray = new String[]{"1", "2", "3", "4", "5"};
+    String tableProperty = "'transactional'='true'";
+    if (isMMTable) {
+      tableProperty = setMMtableProperty(tableProperty);
+    }
+    primary.run("use " + DbName);
+
+    switch (opType) {
+      case REPL_TEST_ACID_INSERT:
+        insertIntoDB(DbName, tableName, tableProperty, resultArray);
+        insertIntoDB(primaryDbNameExtra, tableName, tableProperty, resultArray);
+        return;
+      case REPL_TEST_ACID_INSERT_OVERWRITE:
+        primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+              "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )")
+        .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (2, 2)")
+        .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-01') VALUES (10, 12)")
+        .run("INSERT INTO " + tableNameOp + " partition (load_date='2016-03-02') VALUES (11, 1)")
+        .run("select key from " + tableNameOp + " order by key")
+        .verifyResults(new String[]{"2", "10", "11"})
+        .run("insert overwrite table " + tableNameOp + " select * from " + tableName)
+        .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( "+ tableProperty + " )")
+        .run("INSERT INTO " + tableNameOp + "_nopart VALUES (2, 2)")
+        .run("INSERT INTO " + tableNameOp + "_nopart VALUES (10, 12)")
+        .run("INSERT INTO " + tableNameOp + "_nopart VALUES (11, 1)")
+        .run("select key from " + tableNameOp + "_nopart" + " order by key")
+        .verifyResults(new String[]{"2", "10", "11"})
+        .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName + "_nopart")
+        .run("select key from " + tableNameOp + "_nopart" + " order by key");
+        break;
+      case REPL_TEST_ACID_INSERT_SELECT:
+        primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+            "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )")
+        .run("insert into " + tableNameOp + " partition (load_date) select * from " + tableName)
+        .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + " )")
+        .run("insert into " + tableNameOp + "_nopart select * from " + tableName + "_nopart");
+        break;
+      case REPL_TEST_ACID_INSERT_IMPORT:
+        String path = "hdfs:///tmp/" + DbName + "/";
+        String exportPath = "'" + path + tableName + "/'";
+        String exportPathNoPart = "'" + path + tableName + "_nopart/'";
+        primary.run("export table " + tableName + " to " + exportPath)
+        .run("import table " + tableNameOp + " from " + exportPath)
+        .run("export table " + tableName + "_nopart to " + exportPathNoPart)
+        .run("import table " + tableNameOp + "_nopart from " + exportPathNoPart);
+        break;
+      case REPL_TEST_ACID_CTAS:
+        primary.run("create table " + tableNameOp + " as select * from " + tableName)
+                .run("create table " + tableNameOp + "_nopart as select * from " + tableName + "_nopart");
+        break;
+      case REPL_TEST_ACID_INSERT_LOADLOCAL:
+        primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+              "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+        .run("SHOW TABLES LIKE '" + tableNameOp + "'")
+        .verifyResult(tableNameOp)
+        .run("INSERT OVERWRITE LOCAL DIRECTORY './test.dat' SELECT a.* FROM " + tableName + " a")
+        .run("LOAD DATA LOCAL INPATH './test.dat' OVERWRITE INTO TABLE " + tableNameOp +
+                " PARTITION (load_date='2008-08-15')")
+        .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+                      "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+        .run("SHOW TABLES LIKE '" + tableNameOp + "_nopart'")
+        .verifyResult(tableNameOp + "_nopart")
+        .run("LOAD DATA LOCAL INPATH './test.dat' OVERWRITE INTO TABLE " + tableNameOp + "_nopart");
+        break;
+      case REPL_TEST_ACID_INSERT_UNION:
+        primary.run("CREATE TABLE " + tableNameOp + " (key int, value int) PARTITIONED BY (load_date date) " +
+                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+                .run("SHOW TABLES LIKE '" + tableNameOp + "'")
+                .verifyResult(tableNameOp)
+                .run("insert overwrite table " + tableNameOp + " partition (load_date) select * from " + tableName +
+                    " union all select * from " + tableName)
+                .run("CREATE TABLE " + tableNameOp + "_nopart (key int, value int) " +
+                "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ( " + tableProperty + ")")
+                .run("insert overwrite table " + tableNameOp + "_nopart select * from " + tableName +
+                        "_nopart union all select * from " + tableName + "_nopart");
+        resultArray = new String[]{"1", "2", "3", "4", "5", "1", "2", "3", "4", "5"};
+        break;
+      default:
+        return;
+    }
+    primary.run("select key from " + tableNameOp + " order by key").verifyResults(resultArray);
+    primary.run("select key from " + tableNameOp + "_nopart" + " order by key").verifyResults(resultArray);
+  }
+
+  private String setMMtableProperty(String tableProperty) throws Throwable  {
+    return tableProperty.concat(", 'transactional_properties' = 'insert_only'");
+  }
+
+  private void insertForMerge(String tableName, String tableNameMerge, boolean isMMTable) throws Throwable  {
+    String tableProperty = "'transactional'='true'";
+    if (isMMTable) {
+      tableProperty = setMMtableProperty(tableProperty);
+    }
+    primary.run("use " + primaryDbName)
+        .run("CREATE TABLE " + tableName + "( ID int, TranValue string, last_update_user string) PARTITIONED BY " +
+                "(tran_date string) CLUSTERED BY (ID) into 5 buckets STORED AS ORC TBLPROPERTIES " +
+                " ( "+ tableProperty + " )")
+        .run("SHOW TABLES LIKE '" + tableName + "'")
+        .verifyResult(tableName)
+        .run("CREATE TABLE " + tableNameMerge + " ( ID int, TranValue string, tran_date string) STORED AS ORC ")
+        .run("SHOW TABLES LIKE '" + tableNameMerge + "'")
+        .verifyResult(tableNameMerge)
+        .run("INSERT INTO " + tableName + " PARTITION (tran_date) VALUES (1, 'value_01', 'creation', '20170410')," +
+                " (2, 'value_02', 'creation', '20170410'), (3, 'value_03', 'creation', '20170410'), " +
+                " (4, 'value_04', 'creation', '20170410'), (5, 'value_05', 'creation', '20170413'), " +
+                " (6, 'value_06', 'creation', '20170413'), (7, 'value_07', 'creation', '20170413'),  " +
+                " (8, 'value_08', 'creation', '20170413'), (9, 'value_09', 'creation', '20170413'), " +
+                " (10, 'value_10','creation', '20170413')")
+        .run("select ID from " + tableName + " order by ID")
+        .verifyResults(new String[] {"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"})
+        .run("INSERT INTO " + tableNameMerge + " VALUES (1, 'value_01', '20170410'), " +
+                " (4, NULL, '20170410'), (7, 'value_77777', '20170413'), " +
+                " (8, NULL, '20170413'), (8, 'value_08', '20170415'), " +
+                "(11, 'value_11', '20170415')")
+        .run("select ID from " + tableNameMerge + " order by ID")
+        .verifyResults(new String[] {"1", "4", "7", "8", "8", "11"})
+        .run("MERGE INTO " + tableName + " AS T USING " + tableNameMerge + " AS S ON T.ID = S.ID and" +
+                " T.tran_date = S.tran_date WHEN MATCHED AND (T.TranValue != S.TranValue AND S.TranValue " +
+                " IS NOT NULL) THEN UPDATE SET TranValue = S.TranValue, last_update_user = " +
+                " 'merge_update' WHEN MATCHED AND S.TranValue IS NULL THEN DELETE WHEN NOT MATCHED " +
+                " THEN INSERT VALUES (S.ID, S.TranValue,'merge_insert', S.tran_date)")
+        .run("select last_update_user from " + tableName + " order by last_update_user")
+        .verifyResults(new String[] {"creation", "creation", "creation", "creation", "creation",
+                "creation", "creation", "merge_update", "merge_insert", "merge_insert"});
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index ff7f9bc..16c124c 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -321,8 +321,7 @@ public class TestReplicationScenariosAcrossInstances {
             "clustered by(key) into 2 buckets stored as orc tblproperties ('transactional'='true')")
         .run("create table table1 (i int, j int)")
         .run("insert into table1 values (1,2)")
-        .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
-            "'hive.repl.dump.include.acid.tables'='true'"));
+        .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
 
     replica.load(replicatedDbName, tuple.dumpLocation)
         .run("use " + replicatedDbName)
@@ -341,8 +340,7 @@ public class TestReplicationScenariosAcrossInstances {
         .run("create table table2 (a int, city string) partitioned by (country string)")
         .run("create table table3 (i int, j int)")
         .run("insert into table1 values (1,2)")
-        .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
-            "'hive.repl.dump.include.acid.tables'='true'"));
+        .dump(primaryDbName, null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
 
     replica.load(replicatedDbName, bootstrapTuple.dumpLocation)
         .run("use " + replicatedDbName)
@@ -467,8 +465,7 @@ public class TestReplicationScenariosAcrossInstances {
                 SOURCE_OF_REPLICATION + "' = '1,2,3')")
         .run("use " + dbTwo)
         .run("create table t1 (i int, j int)")
-        .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
-            "'hive.repl.dump.include.acid.tables'='true'"));
+        .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
 
     /*
       Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM
@@ -527,8 +524,7 @@ public class TestReplicationScenariosAcrossInstances {
         .run("use " + dbOne)
         .run("create table t1 (i int, j int) partitioned by (load_date date) "
             + "clustered by(i) into 2 buckets stored as orc tblproperties ('transactional'='true') ")
-        .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'",
-            "'hive.repl.dump.include.acid.tables'='true'"));
+        .dump("`*`", null, Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
 
     String dbTwo = primaryDbName + randomTwo;
     WarehouseInstance.Tuple incrementalTuple = primary
@@ -539,8 +535,7 @@ public class TestReplicationScenariosAcrossInstances {
         .run("use " + dbOne)
         .run("create table t2 (a int, b int)")
         .dump("`*`", bootstrapTuple.lastReplicationId,
-            Arrays.asList("'hive.repl.dump.metadata.only'='true'",
-                "'hive.repl.dump.include.acid.tables'='true'"));
+            Arrays.asList("'hive.repl.dump.metadata.only'='true'"));
 
     /*
       Due to the limitation that we can only have one instance of Persistence Manager Factory in a JVM

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
index f666df1..1e3478d 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/WarehouseInstance.java
@@ -249,6 +249,11 @@ public class WarehouseInstance implements Closeable {
     return this;
   }
 
+  WarehouseInstance loadWithoutExplain(String replicatedDbName, String dumpLocation) throws Throwable {
+    run("REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'");
+    return this;
+  }
+
   WarehouseInstance load(String replicatedDbName, String dumpLocation, List<String> withClauseOptions)
           throws Throwable {
     String replLoadCmd = "REPL LOAD " + replicatedDbName + " FROM '" + dumpLocation + "'";

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
index f87a6aa..2ba6d07 100644
--- a/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
+++ b/ql/src/java/org/apache/hadoop/hive/metastore/SynchronizedMetaStoreClient.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
 import org.apache.thrift.TException;
 
 
@@ -109,6 +110,10 @@ public final class SynchronizedMetaStoreClient {
     return client.fireListenerEvent(rqst);
   }
 
+  public synchronized void addWriteNotificationLog(WriteNotificationLogRequest rqst) throws TException {
+    client.addWriteNotificationLog(rqst);
+  }
+
   public synchronized void close() {
     client.close();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index 19097f5..bf7749d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -139,7 +139,11 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
       if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_INSERT_INTO_MULTILEVEL_DIRS)) {
         deletePath = createTargetPath(targetPath, tgtFs);
       }
-      Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false);
+      //For acid table incremental replication, just copy the content of staging directory to destination.
+      //No need to clean it.
+      if (work.isNeedCleanTarget()) {
+        Hive.clearDestForSubDirSrc(conf, targetPath, sourcePath, false);
+      }
       // Set isManaged to false as this is not load data operation for which it is needed.
       if (!Hive.moveFile(conf, sourcePath, targetPath, true, false, false)) {
         try {

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 3a7f1bc..d095de6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -151,10 +151,11 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements Serializable {
           continue;
         }
         String destFileName = srcFile.getCmPath().getName();
-        Path destFile = new Path(toPath, destFileName);
+        Path destRoot = CopyUtils.getCopyDestination(srcFile, toPath);
+        Path destFile = new Path(destRoot, destFileName);
         if (dstFs.exists(destFile)) {
           String destFileWithSourceName = srcFile.getSourcePath().getName();
-          Path newDestFile = new Path(toPath, destFileWithSourceName);
+          Path newDestFile = new Path(destRoot, destFileWithSourceName);
           boolean result = dstFs.rename(destFile, newDestFile);
           if (!result) {
             throw new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index 5bbc25a..c2953c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
@@ -60,8 +62,19 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
           return 0;
         }
       } catch (InvalidTableException e) {
-        LOG.info("Table does not exist so, ignoring the operation as it might be a retry(idempotent) case.");
-        return 0;
+        // In scenarios like import to mm tables, the alloc write id event is generated before create table event.
+        try {
+          Database database = Hive.get().getDatabase(work.getDbName());
+          if (!replicationSpec.allowReplacementInto(database.getParameters())) {
+            // if the event is already replayed, then no need to replay it again.
+            LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " +
+                    replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType());
+            return 0;
+          }
+        } catch (HiveException e1) {
+          LOG.error("Get database failed with exception " + e1.getMessage());
+          return 1;
+        }
       } catch (HiveException e) {
         LOG.error("Get table failed with exception " + e.getMessage());
         return 1;
@@ -85,10 +98,16 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
         }
         return 0;
       case REPL_COMMIT_TXN:
-        for (long txnId : work.getTxnIds()) {
-          txnManager.replCommitTxn(replPolicy, txnId);
-          LOG.info("Replayed CommitTxn Event for policy " + replPolicy + " with srcTxn " + txnId);
-        }
+        // Currently only one commit txn per event is supported.
+        assert (work.getTxnIds().size() == 1);
+
+        long txnId = work.getTxnIds().get(0);
+        CommitTxnRequest commitTxnRequest = new CommitTxnRequest(txnId);
+        commitTxnRequest.setReplPolicy(work.getReplPolicy());
+        commitTxnRequest.setWriteEventInfos(work.getWriteEventInfos());
+        txnManager.replCommitTxn(commitTxnRequest);
+        LOG.info("Replayed CommitTxn Event for replPolicy: " + replPolicy + " with srcTxn: " + txnId +
+                "WriteEventInfos: " + work.getWriteEventInfos());
         return 0;
       case REPL_ALLOC_WRITE_ID:
         assert work.getTxnToWriteIdList() != null;

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index e48657c..82ecad1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -199,7 +199,9 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         cmRoot,
         getHive(),
         conf,
-        getNewEventOnlyReplicationSpec(ev.getEventId())
+        getNewEventOnlyReplicationSpec(ev.getEventId()),
+        work.dbNameOrPattern,
+        work.tableNameOrPattern
     );
     EventHandler eventHandler = EventHandlerFactory.handlerFor(ev);
     eventHandler.handle(context);