You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/03 17:48:00 UTC

[32/46] hive git commit: HIVE-19267: Replicate ACID/MM tables write operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
index 9e0ce82..5e113c1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/incremental/IncrementalLoadTasksBuilder.java
@@ -179,7 +179,7 @@ public class IncrementalLoadTasksBuilder {
       Database database;
       try {
         database = Hive.get().getDatabase(dbName);
-        return isEventNotReplayed(database.getParameters(), dir, dumpType);
+        return database == null ? true : isEventNotReplayed(database.getParameters(), dir, dumpType);
       } catch (HiveException e) {
         //may be the db is getting created in this load
         log.debug("failed to get the database " + dbName);
@@ -255,50 +255,55 @@ public class IncrementalLoadTasksBuilder {
     return updateReplIdTask;
   }
 
-  private List<Task<? extends Serializable>> addUpdateReplStateTasks(boolean isDatabaseLoad,
-                                           UpdatedMetaDataTracker updatedMetadata,
-                                           List<Task<? extends Serializable>> importTasks) throws SemanticException {
-    String replState = updatedMetadata.getReplicationState();
-    String database = updatedMetadata.getDatabase();
-    String table = updatedMetadata.getTable();
-
-    // If no import tasks generated by the event or no table updated for table level load, then no
-    // need to update the repl state to any object.
-    if (importTasks.isEmpty() || (!isDatabaseLoad && (table == null))) {
-      log.debug("No objects need update of repl state: Either 0 import tasks or table level load");
+  private List<Task<? extends Serializable>> addUpdateReplStateTasks(
+          boolean isDatabaseLoad,
+          UpdatedMetaDataTracker updatedMetaDataTracker,
+          List<Task<? extends Serializable>> importTasks) throws SemanticException {
+    // If no import tasks generated by the event then no need to update the repl state to any object.
+    if (importTasks.isEmpty()) {
+      log.debug("No objects need update of repl state: 0 import tasks");
       return importTasks;
     }
 
     // Create a barrier task for dependency collection of import tasks
-    Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork());
-
-    // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
-    for (Task<? extends Serializable> t : importTasks){
-      t.addDependentTask(barrierTask);
-      log.debug("Added {}:{} as a precursor of barrier task {}:{}",
-              t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
-    }
-
+    Task<? extends Serializable> barrierTask = TaskFactory.get(new DependencyCollectionWork(), conf);
     List<Task<? extends Serializable>> tasks = new ArrayList<>();
     Task<? extends Serializable> updateReplIdTask;
 
-    // If any partition is updated, then update repl state in partition object
-    for (final Map<String, String> partSpec : updatedMetadata.getPartitions()) {
-      updateReplIdTask = tableUpdateReplStateTask(database, table, partSpec, replState, barrierTask);
-      tasks.add(updateReplIdTask);
+    for (UpdatedMetaDataTracker.UpdateMetaData updateMetaData : updatedMetaDataTracker.getUpdateMetaDataList()) {
+      String replState = updateMetaData.getReplState();
+      String dbName = updateMetaData.getDbName();
+      String tableName = updateMetaData.getTableName();
+      // If any partition is updated, then update repl state in partition object
+      for (final Map<String, String> partSpec : updateMetaData.getPartitionsList()) {
+        updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, partSpec, replState, barrierTask);
+        tasks.add(updateReplIdTask);
+      }
+
+      if (tableName != null) {
+        // If any table/partition is updated, then update repl state in table object
+        updateReplIdTask = tableUpdateReplStateTask(dbName, tableName, null, replState, barrierTask);
+        tasks.add(updateReplIdTask);
+      }
+
+      // For table level load, need not update replication state for the database
+      if (isDatabaseLoad) {
+        // If any table/partition is updated, then update repl state in db object
+        updateReplIdTask = dbUpdateReplStateTask(dbName, replState, barrierTask);
+        tasks.add(updateReplIdTask);
+      }
     }
 
-    if (table != null) {
-      // If any table/partition is updated, then update repl state in table object
-      updateReplIdTask = tableUpdateReplStateTask(database, table, null, replState, barrierTask);
-      tasks.add(updateReplIdTask);
+    if (tasks.isEmpty()) {
+      log.debug("No objects need update of repl state: 0 update tracker tasks");
+      return importTasks;
     }
 
-    // For table level load, need not update replication state for the database
-    if (isDatabaseLoad) {
-      // If any table/partition is updated, then update repl state in db object
-      updateReplIdTask = dbUpdateReplStateTask(database, replState, barrierTask);
-      tasks.add(updateReplIdTask);
+    // Link import tasks to the barrier task which will in-turn linked with repl state update tasks
+    for (Task<? extends Serializable> t : importTasks){
+      t.addDependentTask(barrierTask);
+      log.debug("Added {}:{} as a precursor of barrier task {}:{}",
+              t.getClass(), t.getId(), barrierTask.getClass(), barrierTask.getId());
     }
 
     // At least one task would have been added to update the repl state

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 7fce67f..16ba82e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -22,7 +22,6 @@ import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
 
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -71,21 +70,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-
-import java.io.IOException;
-import java.io.Serializable;
 import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Pattern;
-
-import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
-
 
 /**
  * Utilities that are shared by all of the ACID input and output formats. They
@@ -1907,6 +1892,28 @@ public class AcidUtils {
     return null;
   }
 
+  //Get the first level acid directory (if any) from a given path
+  public static String getFirstLevelAcidDirPath(Path dataPath, FileSystem fileSystem) throws IOException {
+    if (dataPath == null) {
+      return null;
+    }
+    String firstLevelAcidDir = getAcidSubDir(dataPath);
+    if (firstLevelAcidDir != null) {
+      return firstLevelAcidDir;
+    }
+
+    String acidDirPath = getFirstLevelAcidDirPath(dataPath.getParent(), fileSystem);
+    if (acidDirPath == null) {
+      return null;
+    }
+
+    // We need the path for directory so no need to append file name
+    if (fileSystem.isDirectory(dataPath)) {
+      return acidDirPath + Path.SEPARATOR + dataPath.getName();
+    }
+    return acidDirPath;
+  }
+
   public static boolean isAcidEnabled(HiveConf hiveConf) {
     String txnMgr = hiveConf.getVar(HiveConf.ConfVars.HIVE_TXN_MANAGER);
     boolean concurrency =  hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY);

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index bcc0508..ec8527e 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -26,30 +26,32 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.StringInternUtils;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.apache.hive.common.util.Ref;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.FileUtils;
-import org.apache.hadoop.hive.common.JavaUtils;
-import org.apache.hadoop.hive.common.StringInternUtils;
-import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 import org.apache.hadoop.hive.llap.io.api.LlapIo;
 import org.apache.hadoop.hive.llap.io.api.LlapProxy;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
@@ -62,8 +64,6 @@ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
-import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc;
-import org.apache.hadoop.hive.ql.plan.VectorPartitionDesc.VectorMapOperatorReadType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -78,10 +78,7 @@ import org.apache.hadoop.mapred.JobConfigurable;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hive.common.util.Ref;
 import org.apache.hive.common.util.ReflectionUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * HiveInputFormat is a parameterized InputFormat which looks at the path name
@@ -460,8 +457,9 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
       InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
       TableDesc table, List<InputSplit> result)
           throws IOException {
+    String tableName = table.getTableName();
     ValidWriteIdList validWriteIdList = AcidUtils.getTableValidWriteIdList(
-        conf, table.getTableName());
+        conf, tableName == null ? null : HiveStringUtils.normalizeIdentifier(tableName));
     ValidWriteIdList validMmWriteIdList = getMmValidWriteIds(conf, table, validWriteIdList);
 
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 4fd1d4e..78980fa 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.Context;
@@ -638,14 +639,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
-  public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException {
+  public void replCommitTxn(CommitTxnRequest rqst) throws LockException {
     try {
-      getMS().replCommitTxn(srcTxnId, replPolicy);
+      getMS().replCommitTxn(rqst);
     } catch (NoSuchTxnException e) {
-      LOG.error("Metastore could not find " + JavaUtils.txnIdToString(srcTxnId));
-      throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(srcTxnId));
+      LOG.error("Metastore could not find " + JavaUtils.txnIdToString(rqst.getTxnid()));
+      throw new LockException(e, ErrorMsg.TXN_NO_SUCH_TRANSACTION, JavaUtils.txnIdToString(rqst.getTxnid()));
     } catch (TxnAbortedException e) {
-      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED, JavaUtils.txnIdToString(srcTxnId), e.getMessage());
+      LockException le = new LockException(e, ErrorMsg.TXN_ABORTED,
+              JavaUtils.txnIdToString(rqst.getTxnid()), e.getMessage());
       LOG.error(le.getMessage());
       throw le;
     } catch (TException e) {
@@ -1013,7 +1015,11 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
     assert isTxnOpen();
     return stmtId++;
   }
-
+  @Override
+  public int getCurrentStmtId() {
+    assert isTxnOpen();
+    return stmtId;
+  }
   @Override
   public long getTableWriteId(String dbName, String tableName) throws LockException {
     assert isTxnOpen();

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index ab9d67e..1feddeb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +75,10 @@ class DummyTxnManager extends HiveTxnManagerImpl {
     return 0;
   }
   @Override
+  public int getCurrentStmtId() {
+    return  0;
+  }
+  @Override
   public long getTableWriteId(String dbName, String tableName) throws LockException {
     return 0L;
   }
@@ -220,7 +225,7 @@ class DummyTxnManager extends HiveTxnManagerImpl {
   }
 
   @Override
-  public void replCommitTxn(String replPolicy, long srcTxnId) throws LockException {
+  public void replCommitTxn(CommitTxnRequest rqst) throws LockException {
     // No-op
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index 5f68e08..9575552 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.ql.Context;
@@ -61,11 +62,11 @@ public interface HiveTxnManager {
 
   /**
    * Commit the transaction in target cluster.
-   * @param replPolicy Replication policy to uniquely identify the source cluster.
-   * @param srcTxnId The id of the transaction at the source cluster
+   *
+   * @param rqst Commit transaction request having information related to commit txn and write events.
    * @throws LockException in case of failure to commit the transaction.
    */
-  void replCommitTxn(String replPolicy, long srcTxnId) throws LockException;
+  void replCommitTxn(CommitTxnRequest rqst) throws LockException;
 
  /**
    * Abort the transaction in target cluster.
@@ -295,6 +296,9 @@ public interface HiveTxnManager {
    */
   int getStmtIdAndIncrement();
 
+  // Can be used by operation to set the stmt id when allocation is done somewhere else.
+  int getCurrentStmtId();
+
   /**
    * Acquire the materialization rebuild lock for a given view. We need to specify the fully
    * qualified name of the materialized view and the open transaction ID so we can identify

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index c3809d8..c2ffe02 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -159,6 +159,7 @@ import org.apache.hadoop.hive.metastore.api.WMResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMTrigger;
 import org.apache.hadoop.hive.metastore.api.WMValidateResourcePlanResponse;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator;
@@ -1719,6 +1720,13 @@ public class Hive {
       List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());
 
       perfLogger.PerfLogBegin("MoveTask", PerfLogger.FILE_MOVES);
+      
+      // If config is set, table is not temporary and partition being inserted exists, capture
+      // the list of files added. For not yet existing partitions (insert overwrite to new partition
+      // or dynamic partition inserts), the add partition event will capture the list of files added.
+      if (areEventsForDmlNeeded(tbl, oldPart)) {
+        newFiles = Collections.synchronizedList(new ArrayList<Path>());
+      }
 
       // Note: the stats for ACID tables do not have any coordination with either Hive ACID logic
       //       like txn commits, time outs, etc.; nor the lower level sync in metastore pertaining
@@ -1731,8 +1739,8 @@ public class Hive {
           Utilities.FILE_OP_LOGGER.trace("not moving " + loadPath + " to " + newPartPath + " (MM)");
         }
         assert !isAcidIUDoperation;
-        if (areEventsForDmlNeeded(tbl, oldPart)) {
-          newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
+        if (newFiles != null) {
+          listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTableWrite ? isInsertOverwrite : false, newFiles);
         }
         if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
           Utilities.FILE_OP_LOGGER.trace("maybe deleting stuff from " + oldPartPath
@@ -1781,8 +1789,15 @@ public class Hive {
       // or dynamic partition inserts), the add partition event will capture the list of files added.
       // Generate an insert event only if inserting into an existing partition
       // When inserting into a new partition, the add partition event takes care of insert event
-      if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && (null != oldPart)) {
-        fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
+      if ((null != oldPart) && (null != newFiles)) {
+        if (isTxnTable) {
+          addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+        } else {
+          fireInsertEvent(tbl, partSpec, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
+        }
+      } else {
+        LOG.debug("No new files were created, and is not a replace, or we're inserting into a "
+                + "partition that does not exist yet. Skipping generating INSERT event.");
       }
 
       // column stats will be inaccurate
@@ -1852,6 +1867,12 @@ public class Hive {
           }
           throw e;
         }
+
+        // For acid table, add the acid_write event with file list at the time of load itself. But
+        // it should be done after partition is created.
+        if (isTxnTable && (null != newFiles)) {
+          addWriteNotificationLog(tbl, partSpec, newFiles, writeId);
+        }
       } else {
         setStatsPropAndAlterPartition(hasFollowingStatsTask, tbl, newTPart);
       }
@@ -1906,50 +1927,47 @@ public class Hive {
   }
 
   private boolean areEventsForDmlNeeded(Table tbl, Partition oldPart) {
-    return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null;
+    // For Acid IUD, add partition is a meta data only operation. So need to add the new files added
+    // information into the TXN_WRITE_NOTIFICATION_LOG table.
+    return conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() &&
+            ((null != oldPart) || AcidUtils.isTransactionalTable(tbl));
+  }
+
+  private void listFilesInsideAcidDirectory(Path acidDir, FileSystem srcFs, List<Path> newFiles) throws IOException {
+    // list out all the files/directory in the path
+    FileStatus[] acidFiles;
+    acidFiles = srcFs.listStatus(acidDir);
+    if (acidFiles == null) {
+      LOG.debug("No files added by this query in: " + acidDir);
+      return;
+    }
+    for (FileStatus acidFile : acidFiles) {
+      // need to list out only files, ignore folders.
+      if (!acidFile.isDirectory()) {
+        newFiles.add(acidFile.getPath());
+      } else {
+        listFilesInsideAcidDirectory(acidFile.getPath(), srcFs, newFiles);
+      }
+    }
   }
 
-  private List<Path> listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId) throws HiveException {
-    List<Path> newFiles = new ArrayList<Path>();
-    final String filePrefix = AcidUtils.deltaSubdir(writeId, writeId, stmtId);
-    FileStatus[] srcs;
-    FileSystem srcFs;
+  private void listFilesCreatedByQuery(Path loadPath, long writeId, int stmtId,
+                                             boolean isInsertOverwrite, List<Path> newFiles) throws HiveException {
+    Path acidDir = new Path(loadPath, AcidUtils.baseOrDeltaSubdir(isInsertOverwrite, writeId, writeId, stmtId));
     try {
-      srcFs = loadPath.getFileSystem(conf);
-      srcs = srcFs.listStatus(loadPath);
+      FileSystem srcFs = loadPath.getFileSystem(conf);
+      if (srcFs.exists(acidDir) && srcFs.isDirectory(acidDir)){
+        // list out all the files in the path
+        listFilesInsideAcidDirectory(acidDir, srcFs, newFiles);
+      } else {
+        LOG.info("directory does not exist: " + acidDir);
+        return;
+      }
     } catch (IOException e) {
       LOG.error("Error listing files", e);
       throw new HiveException(e);
     }
-    if (srcs == null) {
-      LOG.info("No sources specified: " + loadPath);
-      return newFiles;
-    }
-    PathFilter subdirFilter = null;
-
-    // Note: just like the move path, we only do one level of recursion.
-    for (FileStatus src : srcs) {
-      if (src.isDirectory()) {
-        if (subdirFilter == null) {
-          subdirFilter = new PathFilter() {
-            @Override
-            public boolean accept(Path path) {
-              return path.getName().startsWith(filePrefix);
-            }
-          };
-        }
-        try {
-          for (FileStatus srcFile : srcFs.listStatus(src.getPath(), subdirFilter)) {
-            newFiles.add(srcFile.getPath());
-          }
-        } catch (IOException e) {
-          throw new HiveException(e);
-        }
-      } else if (src.getPath().getName().startsWith(filePrefix)) {
-        newFiles.add(src.getPath());
-      }
-    }
-    return newFiles;
+    return;
   }
 
   private void setStatsPropAndAlterPartition(boolean hasFollowingStatsTask, Table tbl,
@@ -2301,13 +2319,17 @@ private void constructOneLBLocationMap(FileStatus fSta,
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin("MoveTask", PerfLogger.LOAD_TABLE);
 
-    List<Path> newFiles = Collections.synchronizedList(new ArrayList<Path>());
+    List<Path> newFiles = null;
     Table tbl = getTable(tableName);
     assert tbl.getPath() != null : "null==getPath() for " + tbl.getTableName();
     boolean isTxnTable = AcidUtils.isTransactionalTable(tbl);
     boolean isMmTable = AcidUtils.isInsertOnlyTable(tbl);
     boolean isFullAcidTable = AcidUtils.isFullAcidTable(tbl);
 
+    if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
+      newFiles = Collections.synchronizedList(new ArrayList<Path>());
+    }
+
     // Note: this assumes both paths are qualified; which they are, currently.
     if ((isMmTable || isFullAcidTable) && loadPath.equals(tbl.getPath())) {
       /**
@@ -2319,7 +2341,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
         Utilities.FILE_OP_LOGGER.debug(
             "not moving " + loadPath + " to " + tbl.getPath() + " (MM)");
       }
-      newFiles = listFilesCreatedByQuery(loadPath, writeId, stmtId);
+
+      //new files list is required only for event notification.
+      if (newFiles != null) {
+        listFilesCreatedByQuery(loadPath, writeId, stmtId, isMmTable ? isInsertOverwrite : false, newFiles);
+      }
     } else {
       // Either a non-MM query, or a load into MM table from an external source.
       Path tblPath = tbl.getPath();
@@ -2390,10 +2416,10 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
     alterTable(tbl, environmentContext);
 
-    if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
-      fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
+    if (AcidUtils.isTransactionalTable(tbl)) {
+      addWriteNotificationLog(tbl, null, newFiles, writeId);
     } else {
-      fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), null);
+      fireInsertEvent(tbl, null, (loadFileType == LoadFileType.REPLACE_ALL), newFiles);
     }
 
     perfLogger.PerfLogEnd("MoveTask", PerfLogger.LOAD_TABLE);
@@ -2647,6 +2673,48 @@ private void constructOneLBLocationMap(FileStatus fSta,
     tpart.getSd().setLocation(partPath);
   }
 
+  private void addWriteNotificationLog(Table tbl, Map<String, String> partitionSpec,
+                                       List<Path> newFiles, Long writeId) throws HiveException {
+    if (!conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
+      LOG.debug("write notification log is ignored as dml event logging is disabled");
+      return;
+    }
+
+    if (tbl.isTemporary()) {
+      LOG.debug("write notification log is ignored as " + tbl.getTableName() + " is temporary : " + writeId);
+      return;
+    }
+
+    if (newFiles == null || newFiles.isEmpty()) {
+      LOG.debug("write notification log is ignored as file list is empty");
+      return;
+    }
+
+    LOG.debug("adding write notification log for operation " + writeId + " table " + tbl.getCompleteName() +
+                        "partition " + partitionSpec + " list of files " + newFiles);
+
+    try {
+      FileSystem fileSystem = tbl.getDataLocation().getFileSystem(conf);
+      Long txnId = SessionState.get().getTxnMgr().getCurrentTxnId();
+
+      InsertEventRequestData insertData = new InsertEventRequestData();
+      insertData.setReplace(true);
+
+      WriteNotificationLogRequest rqst = new WriteNotificationLogRequest(txnId, writeId,
+              tbl.getDbName(), tbl.getTableName(), insertData);
+      addInsertFileInformation(newFiles, fileSystem, insertData);
+
+      if (partitionSpec != null && !partitionSpec.isEmpty()) {
+        for (FieldSchema fs : tbl.getPartitionKeys()) {
+          rqst.addToPartitionVals(partitionSpec.get(fs.getName()));
+        }
+      }
+      getSynchronizedMSC().addWriteNotificationLog(rqst);
+    } catch (IOException | TException e) {
+      throw new HiveException(e);
+    }
+  }
+
   private void fireInsertEvent(Table tbl, Map<String, String> partitionSpec, boolean replace, List<Path> newFiles)
       throws HiveException {
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML)) {
@@ -2723,6 +2791,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
       InsertEventRequestData insertData) throws IOException {
     insertData.addToFilesAdded(p.toString());
     FileChecksum cksum = fileSystem.getFileChecksum(p);
+    String acidDirPath = AcidUtils.getFirstLevelAcidDirPath(p.getParent(), fileSystem);
     // File checksum is not implemented for local filesystem (RawLocalFileSystem)
     if (cksum != null) {
       String checksumString =
@@ -2732,6 +2801,11 @@ private void constructOneLBLocationMap(FileStatus fSta,
       // Add an empty checksum string for filesystems that don't generate one
       insertData.addToFilesAddedChecksum("");
     }
+
+    // acid dir will be present only for acid write operations.
+    if (acidDirPath != null) {
+      insertData.addToSubDirectoryList(acidDirPath);
+    }
   }
 
   public boolean dropPartition(String tblName, List<String> part_vals, boolean deleteData)
@@ -3690,7 +3764,6 @@ private void constructOneLBLocationMap(FileStatus fSta,
                   @Override
                   public Void call() throws HiveException {
                     SessionState.setCurrentSessionState(parentSession);
-                    final String group = srcStatus.getGroup();
                     try {
                       boolean success = false;
                       if (destFs instanceof DistributedFileSystem) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
index f1c4d98..e04a0f3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.metadata;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -431,11 +432,19 @@ public final class HiveUtils {
 
   public static String getReplPolicy(String dbName, String tableName) {
     if ((dbName == null) || (dbName.isEmpty())) {
-      return null;
+      return "*.*";
     } else if ((tableName == null) || (tableName.isEmpty())) {
       return dbName.toLowerCase() + ".*";
     } else {
       return dbName.toLowerCase() + "." + tableName.toLowerCase();
     }
   }
+
+  public static Path getDumpPath(Path root, String dbName, String tableName) {
+    assert (dbName != null);
+    if ((tableName != null) && (!tableName.isEmpty())) {
+      return new Path(root, dbName + "." + tableName);
+    }
+    return new Path(root, dbName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index d34de61..eb594f8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.ImportTableDesc;
 import org.apache.hadoop.hive.ql.plan.DDLWork;
 import org.apache.hadoop.hive.ql.plan.DropTableDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
+import org.apache.hadoop.hive.ql.plan.LoadMultiFilesDesc;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc.LoadFileType;
 import org.apache.hadoop.hive.ql.plan.MoveWork;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -249,9 +250,11 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       throw new HiveException(e);
     }
 
+    boolean inReplicationScope = false;
     if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
       tblDesc.setReplicationSpec(replicationSpec);
       StatsSetupConst.setBasicStatsState(tblDesc.getTblProps(), StatsSetupConst.FALSE);
+      inReplicationScope = true;
     }
 
     if (isExternalSet) {
@@ -275,7 +278,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     for (Partition partition : partitions) {
       // TODO: this should ideally not create AddPartitionDesc per partition
       AddPartitionDesc partsDesc = getBaseAddPartitionDescFromPartition(fromPath, dbname, tblDesc, partition);
-      if ((replicationSpec != null) && replicationSpec.isInReplicationScope()){
+      if (inReplicationScope){
         StatsSetupConst.setBasicStatsState(partsDesc.getPartition(0).getPartParams(), StatsSetupConst.FALSE);
       }
       partitionDescs.add(partsDesc);
@@ -335,13 +338,14 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       //if importing into existing transactional table or will create a new transactional table
       //(because Export was done from transactional table), need a writeId
       // Explain plan doesn't open a txn and hence no need to allocate write id.
-      if (x.getCtx().getExplainConfig() == null) {
+      // In replication flow, no need to allocate write id. It will be allocated using the alloc write id event.
+      if (x.getCtx().getExplainConfig() == null && !inReplicationScope) {
         writeId = txnMgr.getTableWriteId(tblDesc.getDatabaseName(), tblDesc.getTableName());
         stmtId = txnMgr.getStmtIdAndIncrement();
       }
     }
 
-    if (!replicationSpec.isInReplicationScope()) {
+    if (!inReplicationScope) {
       createRegularImportTasks(
           tblDesc, partitionDescs,
           isPartSpecSet, replicationSpec, table,
@@ -390,7 +394,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
     Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
     Path destPath = null, loadPath = null;
     LoadFileType lft;
-    if (AcidUtils.isTransactionalTable(table)) {
+    if (AcidUtils.isTransactionalTable(table) && !replicationSpec.isInReplicationScope()) {
       String mmSubdir = replace ? AcidUtils.baseDir(writeId)
           : AcidUtils.deltaSubdir(writeId, writeId, stmtId);
       destPath = new Path(tgtPath, mmSubdir);
@@ -428,13 +432,26 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       copyTask = TaskFactory.get(new CopyWork(dataPath, destPath, false));
     }
 
-    LoadTableDesc loadTableWork = new LoadTableDesc(
-        loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
-    loadTableWork.setStmtId(stmtId);
+    MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(), null, null, false);
+
+
+    if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(table)) {
+      LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+              Collections.singletonList(destPath),
+              Collections.singletonList(tgtPath),
+              true, null, null);
+      moveWork.setMultiFilesDesc(loadFilesWork);
+      moveWork.setNeedCleanTarget(false);
+    } else {
+      LoadTableDesc loadTableWork = new LoadTableDesc(
+              loadPath, Utilities.getTableDesc(table), new TreeMap<>(), lft, writeId);
+      loadTableWork.setStmtId(stmtId);
+      moveWork.setLoadTableWork(loadTableWork);
+    }
+
     //if Importing into existing table, FileFormat is checked by
     // ImportSemanticAnalzyer.checked checkTable()
-    MoveWork mv = new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false);
-    Task<?> loadTableTask = TaskFactory.get(mv, x.getConf());
+    Task<?> loadTableTask = TaskFactory.get(moveWork, x.getConf());
     copyTask.addDependentTask(loadTableTask);
     x.getTasks().add(copyTask);
     return loadTableTask;
@@ -498,8 +515,10 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           + partSpecToString(partSpec.getPartSpec())
           + " with source location: " + srcLocation);
       Path tgtLocation = new Path(partSpec.getLocation());
-      Path destPath = !AcidUtils.isTransactionalTable(table.getParameters()) ?
-          x.getCtx().getExternalTmpPath(tgtLocation)
+      //Replication scope the write id will be invalid
+      Boolean useStagingDirectory = !AcidUtils.isTransactionalTable(table.getParameters()) ||
+              replicationSpec.isInReplicationScope();
+      Path destPath =  useStagingDirectory ? x.getCtx().getExternalTmpPath(tgtLocation)
           : new Path(tgtLocation, AcidUtils.deltaSubdir(writeId, writeId, stmtId));
       Path moveTaskSrc =  !AcidUtils.isTransactionalTable(table.getParameters()) ? destPath : tgtLocation;
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
@@ -523,17 +542,29 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
       Task<?> addPartTask = TaskFactory.get(
               new DDLWork(x.getInputs(), x.getOutputs(), addPartitionDesc), x.getConf());
 
+      MoveWork moveWork = new MoveWork(x.getInputs(), x.getOutputs(),
+              null, null, false);
+
       // Note: this sets LoadFileType incorrectly for ACID; is that relevant for import?
       //       See setLoadFileType and setIsAcidIow calls elsewhere for an example.
-      LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
-          partSpec.getPartSpec(),
-          replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
-              writeId);
-      loadTableWork.setStmtId(stmtId);
-      loadTableWork.setInheritTableSpecs(false);
-      Task<?> loadPartTask = TaskFactory.get(
-              new MoveWork(x.getInputs(), x.getOutputs(), loadTableWork, null, false),
-              x.getConf());
+      if (replicationSpec.isInReplicationScope() && AcidUtils.isTransactionalTable(tblDesc.getTblProps())) {
+        LoadMultiFilesDesc loadFilesWork = new LoadMultiFilesDesc(
+                Collections.singletonList(destPath),
+                Collections.singletonList(tgtLocation),
+                true, null, null);
+        moveWork.setMultiFilesDesc(loadFilesWork);
+        moveWork.setNeedCleanTarget(false);
+      } else {
+        LoadTableDesc loadTableWork = new LoadTableDesc(moveTaskSrc, Utilities.getTableDesc(table),
+                partSpec.getPartSpec(),
+                replicationSpec.isReplace() ? LoadFileType.REPLACE_ALL : LoadFileType.OVERWRITE_EXISTING,
+                writeId);
+        loadTableWork.setStmtId(stmtId);
+        loadTableWork.setInheritTableSpecs(false);
+        moveWork.setLoadTableWork(loadTableWork);
+      }
+
+      Task<?> loadPartTask = TaskFactory.get(moveWork, x.getConf());
       copyTask.addDependentTask(loadPartTask);
       addPartTask.addDependentTask(loadPartTask);
       x.getTasks().add(copyTask);
@@ -1005,7 +1036,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
             t.addDependentTask(
                 addSinglePartition(fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
             if (updatedMetadata != null) {
-              updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+              updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+                      addPartitionDesc.getPartition(0).getPartSpec());
             }
           }
         } else {
@@ -1057,13 +1089,15 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
               x.getTasks().add(addSinglePartition(
                   fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, x, writeId, stmtId));
               if (updatedMetadata != null) {
-                updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+                updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+                        addPartitionDesc.getPartition(0).getPartSpec());
               }
             } else {
               x.getTasks().add(alterSinglePartition(
                       fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, null, x));
               if (updatedMetadata != null) {
-                updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+                updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+                        addPartitionDesc.getPartition(0).getPartSpec());
               }
             }
           } else {
@@ -1078,7 +1112,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
                     fromURI, fs, tblDesc, table, wh, addPartitionDesc, replicationSpec, ptn, x));
               }
               if (updatedMetadata != null) {
-                updatedMetadata.addPartition(addPartitionDesc.getPartition(0).getPartSpec());
+                updatedMetadata.addPartition(table.getDbName(), table.getTableName(),
+                        addPartitionDesc.getPartition(0).getPartSpec());
               }
               if (lockType == WriteEntity.WriteType.DDL_NO_LOCK){
                 lockType = WriteEntity.WriteType.DDL_SHARED;

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 576f337..1271799 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7309,7 +7309,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         }
         try {
           if (ctx.getExplainConfig() != null) {
-            writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
+            writeId = null; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
           } else {
             if (isMmTable) {
               writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName());
@@ -7324,6 +7324,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         boolean isReplace = !qb.getParseInfo().isInsertIntoTable(
             dest_tab.getDbName(), dest_tab.getTableName());
         ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, writeId);
+        if (writeId != null) {
+          ltd.setStmtId(txnMgr.getCurrentStmtId());
+        }
         // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old
         // deltas and base and leave them up to the cleaner to clean up
         boolean isInsertInto = qb.getParseInfo().isInsertIntoTable(
@@ -7419,6 +7422,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         throw new SemanticException("Failed to allocate write Id", ex);
       }
       ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, writeId);
+      if (writeId != null) {
+        ltd.setStmtId(txnMgr.getCurrentStmtId());
+      }
       // For the current context for generating File Sink Operator, it is either INSERT INTO or INSERT OVERWRITE.
       // So the next line works.
       boolean isInsertInto = !qb.getParseInfo().isDestToOpTypeInsertOverwrite(dest);

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
index ce7e65a..8df2904 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/UpdateDeleteSemanticAnalyzer.java
@@ -179,9 +179,23 @@ public class UpdateDeleteSemanticAnalyzer extends SemanticAnalyzer {
     String newTableName = getTmptTableNameForExport(exportTable); //this is db.table
     Map<String, String> tblProps = new HashMap<>();
     tblProps.put(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, Boolean.FALSE.toString());
+    String location;
+
+    // for temporary tables we set the location to something in the session's scratch dir
+    // it has the same life cycle as the tmp table
+    try {
+      // Generate a unique ID for temp table path.
+      // This path will be fixed for the life of the temp table.
+      Path path = new Path(SessionState.getTempTableSpace(conf), UUID.randomUUID().toString());
+      path = Warehouse.getDnsPath(path, conf);
+      location = path.toString();
+    } catch (MetaException err) {
+      throw new SemanticException("Error while generating temp table path:", err);
+    }
+
     CreateTableLikeDesc ctlt = new CreateTableLikeDesc(newTableName,
         false, true, null,
-        null, null, null, null,
+        null, location, null, null,
         tblProps,
         true, //important so we get an exception on name collision
         Warehouse.getQualifiedName(exportTable.getTTable()), false);

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
index 61bf6b9..75dcaa3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/CopyUtils.java
@@ -400,7 +400,7 @@ public class CopyUtils {
     return result;
   }
 
-  private Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) {
+  public static Path getCopyDestination(ReplChangeManager.FileInfo fileInfo, Path destRoot) {
     if (fileInfo.getSubDir() == null) {
       return destRoot;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
index c0701c5..62d699f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/Utils.java
@@ -186,10 +186,6 @@ public class Utils {
         return false;
       }
 
-      boolean isAcidTable = AcidUtils.isTransactionalTable(tableHandle);
-      if (isAcidTable) {
-        return hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_INCLUDE_ACID_TABLES);
-      }
       return !tableHandle.isTemporary();
     }
     return true;

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
index db97d7c..f04cd93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/CommitTxnHandler.java
@@ -18,9 +18,27 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.dump.events;
 
+import com.google.common.collect.Lists;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
 import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+import org.apache.hadoop.fs.FileSystem;
+import java.io.BufferedWriter;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.List;
 
 class CommitTxnHandler extends AbstractEventHandler {
 
@@ -28,11 +46,116 @@ class CommitTxnHandler extends AbstractEventHandler {
     super(event);
   }
 
+  private BufferedWriter writer(Context withinContext, Path dataPath) throws IOException {
+    Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+    FileSystem fs = dataPath.getFileSystem(withinContext.hiveConf);
+    return new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+  }
+
+  private void writeDumpFiles(Context withinContext, Iterable<String> files, Path dataPath) throws IOException {
+    // encoded filename/checksum of files, write into _files
+    try (BufferedWriter fileListWriter = writer(withinContext, dataPath)) {
+      for (String file : files) {
+        fileListWriter.write(file + "\n");
+      }
+    }
+  }
+
+  private void createDumpFile(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
+                  List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException {
+    if (fileListArray == null || fileListArray.isEmpty()) {
+      return;
+    }
+
+    Path metaDataPath = new Path(withinContext.eventRoot, EximUtil.METADATA_NAME);
+    withinContext.replicationSpec.setIsReplace(true);
+    EximUtil.createExportDump(metaDataPath.getFileSystem(withinContext.hiveConf), metaDataPath,
+            qlMdTable, qlPtns,
+            withinContext.replicationSpec,
+            withinContext.hiveConf);
+
+    if ((null == qlPtns) || qlPtns.isEmpty()) {
+      Path dataPath = new Path(withinContext.eventRoot, EximUtil.DATA_PATH_NAME);
+      writeDumpFiles(withinContext, fileListArray.get(0), dataPath);
+    } else {
+      for (int idx = 0; idx < qlPtns.size(); idx++) {
+        Path dataPath = new Path(withinContext.eventRoot, qlPtns.get(idx).getName());
+        writeDumpFiles(withinContext, fileListArray.get(idx), dataPath);
+      }
+    }
+  }
+
+  private void createDumpFileForTable(Context withinContext, org.apache.hadoop.hive.ql.metadata.Table qlMdTable,
+                    List<Partition> qlPtns, List<List<String>> fileListArray) throws IOException, SemanticException {
+    Path newPath = HiveUtils.getDumpPath(withinContext.eventRoot, qlMdTable.getDbName(), qlMdTable.getTableName());
+    Context context = new Context(withinContext);
+    context.setEventRoot(newPath);
+    createDumpFile(context, qlMdTable, qlPtns, fileListArray);
+  }
+
   @Override
   public void handle(Context withinContext) throws Exception {
     LOG.info("Processing#{} COMMIT_TXN message : {}", fromEventId(), event.getMessage());
+    String payload = event.getMessage();
+
+    if (!withinContext.hiveConf.getBoolVar(HiveConf.ConfVars.REPL_DUMP_METADATA_ONLY)) {
+      CommitTxnMessage commitTxnMessage = deserializer.getCommitTxnMessage(event.getMessage());
+
+      String contextDbName =  withinContext.dbName == null ? null :
+              StringUtils.normalizeIdentifier(withinContext.dbName);
+      String contextTableName =  withinContext.tableName == null ? null :
+              StringUtils.normalizeIdentifier(withinContext.tableName);
+      List<WriteEventInfo> writeEventInfoList = HiveMetaStore.HMSHandler.getMSForConf(withinContext.hiveConf).
+              getAllWriteEventInfo(commitTxnMessage.getTxnId(), contextDbName, contextTableName);
+      int numEntry = (writeEventInfoList != null ? writeEventInfoList.size() : 0);
+      if (numEntry != 0) {
+        commitTxnMessage.addWriteEventInfo(writeEventInfoList);
+        payload = commitTxnMessage.toString();
+        LOG.debug("payload for commit txn event : " + payload);
+      }
+
+      org.apache.hadoop.hive.ql.metadata.Table qlMdTablePrev = null;
+      org.apache.hadoop.hive.ql.metadata.Table qlMdTable = null;
+      List<Partition> qlPtns = new ArrayList<>();
+      List<List<String>> filesTobeAdded = new ArrayList<>();
+
+      // The below loop creates dump directory for each table. It reads through the list of write notification events,
+      // groups the entries per table and creates the lists of files to be replicated. The event directory in the dump
+      // path will have subdirectory for each table. This folder will have metadata for the table and the list of files
+      // to be replicated. The entries are added in the table with txn id, db name,table name, partition name
+      // combination as primary key, so the entries with same table will come together. Only basic table metadata is
+      // used during import, so we need not dump the latest table metadata.
+      for (int idx = 0; idx < numEntry; idx++) {
+        qlMdTable = new org.apache.hadoop.hive.ql.metadata.Table(commitTxnMessage.getTableObj(idx));
+        if (qlMdTablePrev == null) {
+          qlMdTablePrev = qlMdTable;
+        }
+
+        // one dump directory per table
+        if (!qlMdTablePrev.getCompleteName().equals(qlMdTable.getCompleteName())) {
+          createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded);
+          qlPtns = new ArrayList<>();
+          filesTobeAdded = new ArrayList<>();
+          qlMdTablePrev = qlMdTable;
+        }
+
+        if (qlMdTable.isPartitioned() && (null != commitTxnMessage.getPartitionObj(idx))) {
+          qlPtns.add(new org.apache.hadoop.hive.ql.metadata.Partition(qlMdTable,
+                  commitTxnMessage.getPartitionObj(idx)));
+        }
+
+        filesTobeAdded.add(Lists.newArrayList(
+                ReplChangeManager.getListFromSeparatedString(commitTxnMessage.getFiles(idx))));
+      }
+
+      //Dump last table in the list
+      if (qlMdTablePrev != null) {
+        createDumpFileForTable(withinContext, qlMdTablePrev, qlPtns, filesTobeAdded);
+      }
+    }
+
     DumpMetaData dmd = withinContext.createDmd(this);
-    dmd.setPayload(event.getMessage());
+    dmd.setPayload(payload);
     dmd.write();
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
index c0fa7b2..ec35f4e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandler.java
@@ -35,18 +35,37 @@ public interface EventHandler {
   DumpType dumpType();
 
   class Context {
-    final Path eventRoot, cmRoot;
+    Path eventRoot;
+    final Path  cmRoot;
     final Hive db;
     final HiveConf hiveConf;
     final ReplicationSpec replicationSpec;
+    final String dbName;
+    final String tableName;
 
     public Context(Path eventRoot, Path cmRoot, Hive db, HiveConf hiveConf,
-        ReplicationSpec replicationSpec) {
+        ReplicationSpec replicationSpec, String dbName, String tableName) {
       this.eventRoot = eventRoot;
       this.cmRoot = cmRoot;
       this.db = db;
       this.hiveConf = hiveConf;
       this.replicationSpec = replicationSpec;
+      this.dbName = dbName;
+      this.tableName = tableName;
+    }
+
+    public Context(Context other) {
+      this.eventRoot = other.eventRoot;
+      this.cmRoot = other.cmRoot;
+      this.db = other.db;
+      this.hiveConf = other.hiveConf;
+      this.replicationSpec = other.replicationSpec;
+      this.dbName = other.dbName;
+      this.tableName = other.tableName;
+    }
+
+    public void setEventRoot(Path eventRoot) {
+      this.eventRoot = eventRoot;
     }
 
     DumpMetaData createDmd(EventHandler eventHandler) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
index 5ac3af0..cf3822a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/InsertHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -53,6 +54,9 @@ class InsertHandler extends AbstractEventHandler {
       return;
     }
 
+    // In case of ACID tables, insert event should not have fired.
+    assert(!AcidUtils.isTransactionalTable(qlMdTable));
+
     List<Partition> qlPtns = null;
     if (qlMdTable.isPartitioned() && (null != insertMsg.getPtnObj())) {
       qlPtns = Collections.singletonList(partitionObject(qlMdTable, insertMsg));

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
index d76f419..614e071 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/UpdatedMetaDataTracker.java
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load;
 
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hive.common.util.HiveStringUtils;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.List;
 
@@ -25,52 +28,113 @@ import java.util.List;
  * Utility class to help track and return the metadata which are updated by repl load
  */
 public class UpdatedMetaDataTracker {
-  private String replState;
-  private String dbName;
-  private String tableName;
-  private List<Map <String, String>> partitionsList;
 
-  public UpdatedMetaDataTracker() {
-    this.replState = null;
-    this.dbName = null;
-    this.tableName = null;
-    this.partitionsList = new ArrayList<>();
+  /**
+   * Utility class to store replication state of a table.
+   */
+  public static class UpdateMetaData {
+    private String replState;
+    private String dbName;
+    private String tableName;
+    private List<Map <String, String>> partitionsList;
+
+    UpdateMetaData(String replState, String dbName, String tableName, Map <String, String> partSpec) {
+      this.replState = replState;
+      this.dbName = dbName;
+      this.tableName = tableName;
+      this.partitionsList = new ArrayList<>();
+      if (partSpec != null) {
+        this.partitionsList.add(partSpec);
+      }
+    }
+
+    public String getReplState() {
+      return replState;
+    }
+
+    public String getDbName() {
+      return dbName;
+    }
+
+    public String getTableName() {
+      return tableName;
+    }
+
+    public List<Map <String, String>> getPartitionsList() {
+      return partitionsList;
+    }
+
+    public void addPartition(Map<String, String> partSpec) {
+      this.partitionsList.add(partSpec);
+    }
   }
 
-  public void copyUpdatedMetadata(UpdatedMetaDataTracker other) {
-    this.replState = other.replState;
-    this.dbName = other.dbName;
-    this.tableName = other.tableName;
-    this.partitionsList = other.getPartitions();
+  private List<UpdateMetaData> updateMetaDataList;
+  private Map<String, Integer> updateMetaDataMap;
+
+  public UpdatedMetaDataTracker() {
+    updateMetaDataList = new ArrayList<>();
+    updateMetaDataMap = new HashMap<>();
   }
 
-  public void set(String replState, String dbName, String tableName, Map <String, String> partSpec) {
-    this.replState = replState;
-    this.dbName = dbName;
-    this.tableName = tableName;
-    if (partSpec != null) {
-      addPartition(partSpec);
+  public void copyUpdatedMetadata(UpdatedMetaDataTracker other) {
+    int size = updateMetaDataList.size();
+    for (UpdateMetaData updateMetaDataOther : other.updateMetaDataList) {
+      String key = getKey(normalizeIdentifier(updateMetaDataOther.getDbName()),
+              normalizeIdentifier(updateMetaDataOther.getTableName()));
+      Integer idx = updateMetaDataMap.get(key);
+      if (idx == null) {
+        updateMetaDataList.add(updateMetaDataOther);
+        updateMetaDataMap.put(key, size++);
+      } else if (updateMetaDataOther.partitionsList != null && updateMetaDataOther.partitionsList.size() != 0) {
+        UpdateMetaData updateMetaData = updateMetaDataList.get(idx);
+        for (Map<String, String> partSpec : updateMetaDataOther.partitionsList) {
+          updateMetaData.addPartition(partSpec);
+        }
+      }
     }
   }
 
-  public void addPartition(Map <String, String> partSpec) {
-    partitionsList.add(partSpec);
+  public void set(String replState, String dbName, String tableName, Map <String, String> partSpec)
+          throws SemanticException {
+    if (dbName == null) {
+      throw new SemanticException("db name can not be null");
+    }
+    String key = getKey(normalizeIdentifier(dbName), normalizeIdentifier(tableName));
+    Integer idx = updateMetaDataMap.get(key);
+    if (idx == null) {
+      updateMetaDataList.add(new UpdateMetaData(replState, dbName, tableName, partSpec));
+      updateMetaDataMap.put(key, updateMetaDataList.size() - 1);
+    } else {
+      updateMetaDataList.get(idx).addPartition(partSpec);
+    }
   }
 
-  public String getReplicationState() {
-    return replState;
+  public void addPartition(String dbName, String tableName, Map <String, String> partSpec) throws SemanticException {
+    if (dbName == null) {
+      throw new SemanticException("db name can not be null");
+    }
+    String key = getKey(normalizeIdentifier(dbName), normalizeIdentifier(tableName));
+    Integer idx = updateMetaDataMap.get(key);
+    if (idx == null) {
+      throw new SemanticException("add partition to metadata map failed as list is not yet set for table : " + key);
+    }
+    updateMetaDataList.get(idx).addPartition(partSpec);
   }
 
-  public String getDatabase() {
-    return dbName;
+  public List<UpdateMetaData> getUpdateMetaDataList() {
+    return updateMetaDataList;
   }
 
-  public String getTable() {
-    return tableName;
+  private String getKey(String dbName, String tableName) {
+    if (tableName == null) {
+      return dbName + ".*";
+    }
+    return dbName + "." + tableName;
   }
 
-  public List<Map <String, String>> getPartitions() {
-    return partitionsList;
+  private String normalizeIdentifier(String name) {
+    return name == null ? null : HiveStringUtils.normalizeIdentifier(name);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
index afc7426..d3f3306 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -48,7 +48,12 @@ public class AbortTxnHandler extends AbstractMessageHandler {
                 msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec()),
         context.hiveConf
     );
-    updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+
+    // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database.
+    // Anyways, if this event gets executed again, it is taken care of.
+    if (!context.isDbNameEmpty()) {
+      updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+    }
     context.log.debug("Added Abort txn task : {}", abortTxnTask.getId());
     return Collections.singletonList(abortTxnTask);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
index 9bdbf64..63f2577 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
@@ -52,7 +52,7 @@ public class AllocWriteIdHandler extends AbstractMessageHandler {
             .getTableName());
 
     // Repl policy should be created based on the table name in context.
-    ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(dbName, context.tableName), dbName, tableName,
+    ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), dbName, tableName,
         ReplTxnWork.OperationType.REPL_ALLOC_WRITE_ID, msg.getTxnToWriteIdList(), context.eventOnlyReplicationSpec());
 
     Task<? extends Serializable> allocWriteIdTask = TaskFactory.get(work, context.hiveConf);

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index d25102e..0619bd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -17,7 +17,12 @@
  */
 package org.apache.hadoop.hive.ql.parse.repl.load.message;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
+import org.apache.hadoop.hive.ql.exec.repl.util.AddDependencyToLeaves;
+import org.apache.hadoop.hive.ql.exec.util.DAGTraversal;
+import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
@@ -25,7 +30,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import java.io.Serializable;
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -35,20 +40,75 @@ import java.util.List;
 public class CommitTxnHandler extends AbstractMessageHandler {
   @Override
   public List<Task<? extends Serializable>> handle(Context context)
-      throws SemanticException {
+          throws SemanticException {
     if (!AcidUtils.isAcidEnabled(context.hiveConf)) {
       context.log.error("Cannot load transaction events as acid is not enabled");
       throw new SemanticException("Cannot load transaction events as acid is not enabled");
     }
 
     CommitTxnMessage msg = deserializer.getCommitTxnMessage(context.dmd.getPayload());
-    Task<ReplTxnWork> commitTxnTask = TaskFactory.get(
-        new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
-              msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()),
-        context.hiveConf
-    );
-    updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+    int numEntry = (msg.getTables() == null ? 0 : msg.getTables().size());
+    List<Task<? extends Serializable>> tasks = new ArrayList<>();
+    String dbName = context.dbName;
+    String tableNamePrev = null;
+    String tblName = context.tableName;
+
+    ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName,
+      context.tableName, msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec());
+
+    if (numEntry > 0) {
+      context.log.debug("Commit txn handler for txnid " + msg.getTxnId() + " databases : " + msg.getDatabases() +
+              " tables : " + msg.getTables() + " partitions : " + msg.getPartitions() + " files : " +
+              msg.getFilesList() + " write ids : " + msg.getWriteIds());
+    }
+
+    for (int idx = 0; idx < numEntry; idx++) {
+      String actualTblName = msg.getTables().get(idx);
+      String actualDBName = msg.getDatabases().get(idx);
+      String completeName = Table.getCompleteName(actualDBName, actualTblName);
+
+      // One import task per table. Events for same table are kept together in one dump directory during dump and are
+      // grouped together in commit txn message.
+      if (tableNamePrev == null || !(completeName.equals(tableNamePrev))) {
+        // The data location is created by source, so the location should be formed based on the table name in msg.
+        Path location = HiveUtils.getDumpPath(new Path(context.location), actualDBName, actualTblName);
+        tblName = context.isTableNameEmpty() ? actualTblName : context.tableName;
+        // for warehouse level dump, use db name from write event
+        dbName = (context.isDbNameEmpty() ? actualDBName : context.dbName);
+        Context currentContext = new Context(context, dbName, tblName);
+        currentContext.setLocation(location.toUri().toString());
+
+        // Piggybacking in Import logic for now
+        TableHandler tableHandler = new TableHandler();
+        tasks.addAll((tableHandler.handle(currentContext)));
+        readEntitySet.addAll(tableHandler.readEntities());
+        writeEntitySet.addAll(tableHandler.writeEntities());
+        getUpdatedMetadata().copyUpdatedMetadata(tableHandler.getUpdatedMetadata());
+        tableNamePrev = completeName;
+      }
+
+      try {
+        WriteEventInfo writeEventInfo = new WriteEventInfo(msg.getWriteIds().get(idx),
+                dbName, tblName, msg.getFiles(idx));
+        if (msg.getPartitions().get(idx) != null && !msg.getPartitions().get(idx).isEmpty()) {
+          writeEventInfo.setPartition(msg.getPartitions().get(idx));
+        }
+        work.addWriteEventInfo(writeEventInfo);
+      } catch (Exception e) {
+        throw new SemanticException("Failed to extract write event info from commit txn message : " + e.getMessage());
+      }
+    }
+
+    Task<ReplTxnWork> commitTxnTask = TaskFactory.get(work, context.hiveConf);
+
+    // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database.
+    // Anyways, if this event gets executed again, it is taken care of.
+    if (!context.isDbNameEmpty()) {
+      updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+    }
     context.log.debug("Added Commit txn task : {}", commitTxnTask.getId());
-    return Collections.singletonList(commitTxnTask);
+    DAGTraversal.traverse(tasks, new AddDependencyToLeaves(commitTxnTask));
+    return tasks;
   }
 }
+

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
index ef4a901..cdf51dd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/MessageHandler.java
@@ -46,8 +46,8 @@ public interface MessageHandler {
   UpdatedMetaDataTracker getUpdatedMetadata();
 
   class Context {
-    public String dbName;
-    public final String tableName, location;
+    public String location;
+    public final String tableName, dbName;
     public final Task<? extends Serializable> precursor;
     public DumpMetaData dmd;
     final HiveConf hiveConf;
@@ -101,5 +101,9 @@ public interface MessageHandler {
     public HiveTxnManager getTxnMgr() {
       return nestedContext.getHiveTxnManager();
     }
+
+    public void setLocation(String location) {
+      this.location = location;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
index 190e021..5dcc44e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
@@ -47,7 +47,12 @@ public class OpenTxnHandler extends AbstractMessageHandler {
                 msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN, context.eventOnlyReplicationSpec()),
         context.hiveConf
     );
-    updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+
+    // For warehouse level dump, don't update the metadata of database as we don't know this txn is for which database.
+    // Anyways, if this event gets executed again, it is taken care of.
+    if (!context.isDbNameEmpty()) {
+      updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
+    }
     context.log.debug("Added Open txn task : {}", openTxnTask.getId());
     return Collections.singletonList(openTxnTask);
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
index 9a1e3a1..47a56d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/MoveWork.java
@@ -40,6 +40,7 @@ public class MoveWork implements Serializable {
   private LoadMultiFilesDesc loadMultiFilesWork;
   private boolean checkFileFormat;
   private boolean srcLocal;
+  private boolean needCleanTarget;
 
   /**
    * ReadEntitites that are passed to the hooks.
@@ -63,6 +64,7 @@ public class MoveWork implements Serializable {
   private MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs) {
     this.inputs = inputs;
     this.outputs = outputs;
+    this.needCleanTarget = true;
   }
 
   public MoveWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs,
@@ -93,6 +95,7 @@ public class MoveWork implements Serializable {
     srcLocal = o.isSrcLocal();
     inputs = o.getInputs();
     outputs = o.getOutputs();
+    needCleanTarget = o.needCleanTarget;
   }
 
   @Explain(displayName = "tables", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED })
@@ -153,5 +156,12 @@ public class MoveWork implements Serializable {
   public void setSrcLocal(boolean srcLocal) {
     this.srcLocal = srcLocal;
   }
-  
+
+  public boolean isNeedCleanTarget() {
+    return needCleanTarget;
+  }
+
+  public void setNeedCleanTarget(boolean needCleanTarget) {
+    this.needCleanTarget = needCleanTarget;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
index 3c853c9..a6ab836 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/ReplTxnWork.java
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.plan;
 import java.io.Serializable;
 
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -40,6 +42,7 @@ public class ReplTxnWork implements Serializable {
   private List<Long> txnIds;
   private List<TxnToWriteId> txnToWriteIdList;
   private ReplicationSpec replicationSpec;
+  private List<WriteEventInfo> writeEventInfos;
 
   /**
    * OperationType.
@@ -60,6 +63,7 @@ public class ReplTxnWork implements Serializable {
     this.replPolicy = replPolicy;
     this.txnToWriteIdList = txnToWriteIdList;
     this.replicationSpec = replicationSpec;
+    this.writeEventInfos = null;
   }
 
   public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
@@ -86,6 +90,13 @@ public class ReplTxnWork implements Serializable {
     this.operation = type;
   }
 
+  public void addWriteEventInfo(WriteEventInfo writeEventInfo) {
+    if (this.writeEventInfos == null) {
+      this.writeEventInfos = new ArrayList<>();
+    }
+    this.writeEventInfos.add(writeEventInfo);
+  }
+
   public List<Long> getTxnIds() {
     return txnIds;
   }
@@ -121,4 +132,8 @@ public class ReplTxnWork implements Serializable {
   public ReplicationSpec getReplicationSpec() {
     return replicationSpec;
   }
+
+  public List<WriteEventInfo> getWriteEventInfos() {
+    return writeEventInfos;
+  }
 }