You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2018/07/03 17:47:47 UTC

[19/46] hive git commit: HIVE-19267: Replicate ACID/MM tables write operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index f7018c2..ac1d3c8 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -59,6 +59,7 @@ public class ReplChangeManager {
   static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
   private static final String URI_FRAGMENT_SEPARATOR = "#";
   public static final String SOURCE_OF_REPLICATION = "repl.source.for";
+  private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]";
 
   public enum RecycleType {
     MOVE,
@@ -472,7 +473,6 @@ public class ReplChangeManager {
   }
 
   public static boolean isSourceOfReplication(Database db) {
-    // Can not judge, so assuming replication is not enabled.
     assert (db != null);
     String replPolicyIds = getReplPolicyIdString(db);
     return  !StringUtils.isEmpty(replPolicyIds);
@@ -490,4 +490,12 @@ public class ReplChangeManager {
     }
     return null;
   }
+
+  public static String joinWithSeparator(Iterable<?> strings) {
+    return org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, strings);
+  }
+
+  public static String[] getListFromSeparatedString(String commaSeparatedString) {
+    return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index a526019..8ff056f 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
@@ -2414,6 +2415,17 @@ public class CachedStore implements RawStore, Configurable {
     return sharedCache.getUpdateCount();
   }
 
+  @Override
+  public void cleanWriteNotificationEvents(int olderThan) {
+    rawStore.cleanWriteNotificationEvents(olderThan);
+  }
+
+
+  @Override
+  public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+    return rawStore.getAllWriteEventInfo(txnId, dbName, tableName);
+  }
+
   static boolean isNotInBlackList(String catName, String dbName, String tblName) {
     String str = TableName.getQualified(catName, dbName, tblName);
     for (Pattern pattern : blacklistPatterns) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
new file mode 100644
index 0000000..001179a
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+
+import java.util.List;
+
+/**
+ * AcidWriteEvent
+ * Event generated for acid write operations
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class AcidWriteEvent extends ListenerEvent {
+  private final WriteNotificationLogRequest writeNotificationLogRequest;
+  private final String partition;
+  private final Table tableObj;
+  private final Partition partitionObj;
+
+  public AcidWriteEvent(String partition, Table tableObj, Partition partitionObj,
+                        WriteNotificationLogRequest writeNotificationLogRequest) {
+    super(true, null);
+    this.writeNotificationLogRequest = writeNotificationLogRequest;
+    this.partition = partition;
+    this.tableObj = tableObj;
+    this.partitionObj = partitionObj;
+  }
+
+  public Long getTxnId() {
+    return writeNotificationLogRequest.getTxnId();
+  }
+
+  public List<String> getFiles() {
+    return writeNotificationLogRequest.getFileInfo().getFilesAdded();
+  }
+
+  public List<String> getChecksums() {
+    return writeNotificationLogRequest.getFileInfo().getFilesAddedChecksum();
+  }
+
+  public String getDatabase() {
+    return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getDb());
+  }
+
+  public String getTable() {
+    return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getTable());
+  }
+
+  public String getPartition() {
+    return partition; //Don't normalize partition value, as its case sensitive.
+  }
+
+  public Long getWriteId() {
+    return writeNotificationLogRequest.getWriteId();
+  }
+
+  public Table getTableObj() {
+    return tableObj;
+  }
+
+  public Partition getPartitionObj() {
+    return partitionObj;
+  }
+
+  public List<String> getSubDirs() {
+    return writeNotificationLogRequest.getFileInfo().getSubDirectoryList();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
new file mode 100644
index 0000000..e2c9ccf
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
@@ -0,0 +1,50 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import java.util.List;
+
+/**
+ * HCat message sent when an ACID write is done.
+ */
+public abstract class AcidWriteMessage extends EventMessage {
+
+  protected AcidWriteMessage() {
+    super(EventType.ACID_WRITE);
+  }
+
+  public abstract Long getTxnId();
+
+  public abstract String getTable();
+
+  public abstract Long getWriteId();
+
+  public abstract String getPartition();
+
+  public abstract List<String> getFiles();
+
+  public abstract Table getTableObj() throws Exception;
+
+  public abstract Partition getPartitionObj() throws Exception;
+
+  public abstract String getTableObjStr();
+
+  public abstract String getPartitionObjStr();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
index 49004f2..9733039 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
@@ -17,6 +17,12 @@
 
 package org.apache.hadoop.hive.metastore.messaging;
 
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+
+import java.util.List;
+
 /**
  * HCat message sent when an commit transaction is done.
  */
@@ -33,4 +39,21 @@ public abstract class CommitTxnMessage extends EventMessage {
    */
   public abstract Long getTxnId();
 
+  public abstract List<Long> getWriteIds();
+
+  public abstract List<String> getDatabases();
+
+  public abstract List<String> getTables();
+
+  public abstract List<String> getPartitions();
+
+  public abstract Table getTableObj(int idx) throws Exception;
+
+  public abstract Partition getPartitionObj(int idx) throws Exception;
+
+  public abstract String getFiles(int idx);
+
+  public abstract List<String> getFilesList();
+
+  public abstract void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
index 969dd7b..f24b419 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
@@ -60,7 +60,8 @@ public abstract class EventMessage {
     COMMIT_TXN(MessageFactory.COMMIT_TXN_EVENT),
     ABORT_TXN(MessageFactory.ABORT_TXN_EVENT),
     ALLOC_WRITE_ID(MessageFactory.ALLOC_WRITE_ID_EVENT),
-    ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT);
+    ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT),
+    ACID_WRITE(MessageFactory.ACID_WRITE_EVENT);
 
     private String typeString;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
index ca33579..b701d84 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
@@ -70,6 +70,10 @@ public abstract class MessageDeserializer {
       return getCommitTxnMessage(messageBody);
     case ABORT_TXN:
       return getAbortTxnMessage(messageBody);
+    case ALLOC_WRITE_ID:
+      return getAllocWriteIdMessage(messageBody);
+    case ACID_WRITE:
+      return getAcidWriteMessage(messageBody);
     default:
       throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
     }
@@ -186,6 +190,11 @@ public abstract class MessageDeserializer {
    */
   public abstract AllocWriteIdMessage getAllocWriteIdMessage(String messageBody);
 
+  /*
+   * Method to de-serialize AcidWriteMessage instance.
+   */
+  public abstract AcidWriteMessage getAcidWriteMessage(String messageBody);
+
   // Protection against construction.
   protected MessageDeserializer() {}
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index e0629ea..d529147 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 
 import java.util.Iterator;
@@ -74,6 +75,7 @@ public abstract class MessageFactory {
   public static final String ABORT_TXN_EVENT = "ABORT_TXN";
   public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT";
   public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG";
+  public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT";
 
   private static MessageFactory instance = null;
 
@@ -326,4 +328,14 @@ public abstract class MessageFactory {
   public abstract DropCatalogMessage buildDropCatalogMessage(Catalog catalog);
 
   public abstract AlterCatalogMessage buildAlterCatalogMessage(Catalog oldCat, Catalog newCat);
+
+  /**
+   * Factory method for building acid write message
+   *
+   *
+   * @param acidWriteEvent information related to the acid write operation
+   * @param files files added by this write operation
+   * @return instance of AcidWriteMessage
+   */
+  public abstract AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
new file mode 100644
index 0000000..515a2cb
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging.json;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.thrift.TException;
+import org.codehaus.jackson.annotate.JsonProperty;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * JSON implementation of AcidWriteMessage
+ */
+public class JSONAcidWriteMessage extends AcidWriteMessage {
+
+  @JsonProperty
+  private Long txnid, writeId, timestamp;
+
+  @JsonProperty
+  private String server, servicePrincipal, database, table, partition, tableObjJson, partitionObjJson;
+
+  @JsonProperty
+  private List<String> files;
+
+  /**
+   * Default constructor, needed for Jackson.
+   */
+  public JSONAcidWriteMessage() {
+  }
+
+  public JSONAcidWriteMessage(String server, String servicePrincipal, Long timestamp, AcidWriteEvent acidWriteEvent,
+                              Iterator<String> files) {
+    this.timestamp = timestamp;
+    this.txnid = acidWriteEvent.getTxnId();
+    this.server = server;
+    this.servicePrincipal = servicePrincipal;
+    this.database = acidWriteEvent.getDatabase();
+    this.table = acidWriteEvent.getTable();
+    this.writeId = acidWriteEvent.getWriteId();
+    this.partition = acidWriteEvent.getPartition();
+    try {
+      this.tableObjJson = JSONMessageFactory.createTableObjJson(acidWriteEvent.getTableObj());
+      if (acidWriteEvent.getPartitionObj() != null) {
+        this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(acidWriteEvent.getPartitionObj());
+      } else {
+        this.partitionObjJson = null;
+      }
+    } catch (TException e) {
+      throw new IllegalArgumentException("Could not serialize JSONAcidWriteMessage : ", e);
+    }
+    this.files = Lists.newArrayList(files);
+  }
+
+  @Override
+  public Long getTxnId() {
+    return txnid;
+  }
+
+  @Override
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public String getDB() {
+    return database;
+  }
+
+  @Override
+  public String getServicePrincipal() {
+    return servicePrincipal;
+  }
+
+  @Override
+  public String getServer() {
+    return server;
+  }
+
+  @Override
+  public String getTable() {
+    return table;
+  }
+
+  @Override
+  public Long getWriteId() {
+    return writeId;
+  }
+
+  @Override
+  public String getPartition() {
+    return partition;
+  }
+
+  @Override
+  public List<String> getFiles() {
+    return files;
+  }
+
+  @Override
+  public Table getTableObj() throws Exception {
+    return (tableObjJson == null) ? null : (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class);
+  }
+
+  @Override
+  public Partition getPartitionObj() throws Exception {
+    return ((partitionObjJson == null) ? null :
+            (Partition) JSONMessageFactory.getTObj(partitionObjJson, Partition.class));
+  }
+
+  @Override
+  public String getTableObjStr() {
+    return tableObjJson;
+  }
+
+  @Override
+  public String getPartitionObjStr() {
+    return partitionObjJson;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return JSONMessageDeserializer.mapper.writeValueAsString(this);
+    } catch (Exception exception) {
+      throw new IllegalArgumentException("Could not serialize: ", exception);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
index 595a3d1..6082b8e 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
@@ -18,9 +18,15 @@
  */
 
 package org.apache.hadoop.hive.metastore.messaging.json;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.codehaus.jackson.annotate.JsonProperty;
 
+import java.util.List;
+
 /**
  * JSON implementation of CommitTxnMessage
  */
@@ -38,6 +44,12 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
   @JsonProperty
   private String servicePrincipal;
 
+  @JsonProperty
+  private List<Long> writeIds;
+
+  @JsonProperty
+  private List<String> databases, tables, partitions, tableObjs, partitionObjs, files;
+
   /**
    * Default constructor, needed for Jackson.
    */
@@ -49,6 +61,13 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
     this.txnid = txnid;
     this.server = server;
     this.servicePrincipal = servicePrincipal;
+    this.databases = null;
+    this.tables = null;
+    this.writeIds = null;
+    this.partitions = null;
+    this.tableObjs = null;
+    this.partitionObjs = null;
+    this.files = null;
   }
 
   @Override
@@ -77,6 +96,82 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
   }
 
   @Override
+  public List<Long> getWriteIds() {
+    return writeIds;
+  }
+
+  @Override
+  public List<String> getDatabases() {
+    return databases;
+  }
+
+  @Override
+  public List<String> getTables() {
+    return tables;
+  }
+
+  @Override
+  public List<String> getPartitions() {
+    return partitions;
+  }
+
+  @Override
+  public Table getTableObj(int idx) throws Exception {
+    return tableObjs == null ? null :  (Table) JSONMessageFactory.getTObj(tableObjs.get(idx), Table.class);
+  }
+
+  @Override
+  public Partition getPartitionObj(int idx) throws Exception {
+    return (partitionObjs == null ? null : (partitionObjs.get(idx) == null ? null :
+            (Partition)JSONMessageFactory.getTObj(partitionObjs.get(idx), Partition.class)));
+  }
+
+  @Override
+  public String getFiles(int idx) {
+    return files == null ? null : files.get(idx);
+  }
+
+  @Override
+  public List<String> getFilesList() {
+    return files;
+  }
+
+  @Override
+  public void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList) {
+    if (this.databases == null) {
+      this.databases = Lists.newArrayList();
+    }
+    if (this.tables == null) {
+      this.tables = Lists.newArrayList();
+    }
+    if (this.writeIds == null) {
+      this.writeIds = Lists.newArrayList();
+    }
+    if (this.tableObjs == null) {
+      this.tableObjs = Lists.newArrayList();
+    }
+    if (this.partitions == null) {
+      this.partitions = Lists.newArrayList();
+    }
+    if (this.partitionObjs == null) {
+      this.partitionObjs = Lists.newArrayList();
+    }
+    if (this.files == null) {
+      this.files = Lists.newArrayList();
+    }
+
+    for (WriteEventInfo writeEventInfo : writeEventInfoList) {
+      this.databases.add(writeEventInfo.getDatabase());
+      this.tables.add(writeEventInfo.getTable());
+      this.writeIds.add(writeEventInfo.getWriteId());
+      this.partitions.add(writeEventInfo.getPartition());
+      this.tableObjs.add(writeEventInfo.getTableObj());
+      this.partitionObjs.add(writeEventInfo.getPartitionObj());
+      this.files.add(writeEventInfo.getFiles());
+    }
+  }
+
+  @Override
   public String toString() {
     try {
       return JSONMessageDeserializer.mapper.writeValueAsString(this);

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
index f54e24d..be6b751 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
@@ -259,4 +260,12 @@ public class JSONMessageDeserializer extends MessageDeserializer {
       throw new IllegalArgumentException("Could not construct AllocWriteIdMessage", e);
     }
   }
+
+  public AcidWriteMessage getAcidWriteMessage(String messageBody) {
+    try {
+      return mapper.readValue(messageBody, JSONAcidWriteMessage.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Could not construct AcidWriteMessage", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index d64c3ff..07f51f0 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
 import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -230,11 +232,17 @@ public class JSONMessageFactory extends MessageFactory {
     return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
   }
 
+  @Override
   public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,
                                                       String dbName, String tableName) {
     return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, dbName, tableName, now());
   }
 
+  @Override
+  public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files) {
+    return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, files);
+  }
+
   private long now() {
     return System.currentTimeMillis() / 1000;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
new file mode 100644
index 0000000..f5ca386
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.model;
+
+/**
+ * MTxnWriteNotificationLog
+ * DN table for ACID write events.
+ */
+public class MTxnWriteNotificationLog {
+  private long txnId;
+  private long writeId;
+  private int eventTime;
+  private String database;
+  private String table;
+  private String partition;
+  private String tableObject;
+  private String partObject;
+  private String files;
+
+  public MTxnWriteNotificationLog() {
+  }
+
+  public MTxnWriteNotificationLog(long txnId, long writeId, int eventTime, String database, String table,
+                               String partition, String tableObject, String partObject, String files) {
+    this.txnId = txnId;
+    this.writeId = writeId;
+    this.eventTime = eventTime;
+    this.database = database;
+    this.table = table;
+    this.partition = partition;
+    this.tableObject = tableObject;
+    this.partObject = partObject;
+    this.files = files;
+  }
+
+  public long getTxnId() {
+    return txnId;
+  }
+
+  public void setTxnId(long txnId) {
+    this.txnId = txnId;
+  }
+
+  public long getWriteId() {
+    return writeId;
+  }
+
+  public void setWriteId(long writeId) {
+    this.writeId = writeId;
+  }
+
+  public int getEventTime() {
+    return eventTime;
+  }
+
+  public void setEventTime(int eventTime) {
+    this.eventTime = eventTime;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public String getPartition() {
+    return partition;
+  }
+
+  public void setPartition(String partition) {
+    this.partition = partition;
+  }
+
+  public String getTableObject() {
+    return tableObject;
+  }
+
+  public void setTableObject(String tableObject) {
+    this.tableObject = tableObject;
+  }
+
+  public String getPartObject() {
+    return partObject;
+  }
+
+  public void setPartObject(String partObject) {
+    this.partObject = partObject;
+  }
+
+  public String getFiles() {
+    return files;
+  }
+
+  public void setFiles(String files) {
+    this.files = files;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index b23a6d7..d0ac7db 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -175,4 +175,13 @@ public final class SQLGenerator {
     return dbProduct;
   }
 
+  // This is required for SQL executed directly. If the SQL has double quotes then some dbs tend to
+  // remove the escape characters and store the variable without double quote.
+  public String addEscapeCharacters(String s) {
+    if (dbProduct ==  DatabaseProduct.MYSQL) {
+      return s.replaceAll("\\\\", "\\\\\\\\");
+    }
+    return s;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 50bfca3..f8c2ca2 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -245,6 +245,34 @@ public final class TxnDbUtil {
       stmt.execute("INSERT INTO \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\", \"NEXT_EVENT_ID\")" +
               " SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT " +
               "\"NEXT_EVENT_ID\" FROM \"APP\".\"NOTIFICATION_SEQUENCE\")");
+
+      try {
+        stmt.execute("CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (" +
+                "WNL_ID bigint NOT NULL," +
+                "WNL_TXNID bigint NOT NULL," +
+                "WNL_WRITEID bigint NOT NULL," +
+                "WNL_DATABASE varchar(128) NOT NULL," +
+                "WNL_TABLE varchar(128) NOT NULL," +
+                "WNL_PARTITION varchar(1024) NOT NULL," +
+                "WNL_TABLE_OBJ clob NOT NULL," +
+                "WNL_PARTITION_OBJ clob," +
+                "WNL_FILES clob," +
+                "WNL_EVENT_TIME integer NOT NULL," +
+                "PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION))"
+        );
+      } catch (SQLException e) {
+        if (e.getMessage() != null && e.getMessage().contains("already exists")) {
+          LOG.info("TXN_WRITE_NOTIFICATION_LOG table already exist, ignoring");
+        } else {
+          throw e;
+        }
+      }
+
+      stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " +
+              "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', " +
+              "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" +
+              ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" +
+              ".model.MTxnWriteNotificationLog')");
     } catch (SQLException e) {
       try {
         conn.rollback();

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 361ede5..3785f89 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -698,6 +699,38 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   @Override
   @RetrySemantics.Idempotent
+  public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt);
+        if (targetTxnIds.isEmpty()) {
+          LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy);
+          return -1;
+        }
+        assert (targetTxnIds.size() == 1);
+        return targetTxnIds.get(0);
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")");
+        throw new MetaException("Unable to get target transaction id "
+                + StringUtils.stringifyException(e));
+      } finally {
+        close(null, stmt, dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      return getTargetTxnId(replPolicy, sourceTxnId);
+    }
+  }
+
+  @Override
+  @RetrySemantics.Idempotent
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
     long txnid = rqst.getTxnid();
     long sourceTxnId = -1;
@@ -892,10 +925,17 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           shouldNeverHappen(txnid);
           //dbConn is rolled back in finally{}
         }
-        String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
-          quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
-        rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
-        if (rs.next()) {
+
+        String conflictSQLSuffix = null;
+        if (rqst.isSetReplPolicy()) {
+          rs = null;
+        } else {
+          conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
+                  quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
+          rs = stmt.executeQuery(sqlGenerator.addLimitClause(1,
+                  "tc_operation_type " + conflictSQLSuffix));
+        }
+        if (rs != null && rs.next()) {
           isUpdateDelete = true;
           close(rs);
           //if here it means currently committing txn performed update/delete and we should check WW conflict
@@ -984,23 +1024,52 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
            * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
            * If RO < W, then there is no reads-from relationship.
+           * In replication flow we don't expect any write write conflict as it should have been handled at source.
            */
         }
-        // Move the record from txn_components into completed_txn_components so that the compactor
-        // knows where to look to compact.
-        String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
-            "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " +
-            "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid;
-        LOG.debug("Going to execute insert <" + s + ">");
-        int modCount = 0;
-        if ((modCount = stmt.executeUpdate(s)) < 1) {
-          //this can be reasonable for an empty txn START/COMMIT or read-only txn
-          //also an IUD with DP that didn't match any rows.
-          LOG.info("Expected to move at least one record from txn_components to " +
-            "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
+
+        String s;
+        if (!rqst.isSetReplPolicy()) {
+          // Move the record from txn_components into completed_txn_components so that the compactor
+          // knows where to look to compact.
+          s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
+                  "ctc_table, ctc_partition, ctc_writeid) select tc_txnid, tc_database, tc_table, " +
+                  "tc_partition, tc_writeid from TXN_COMPONENTS where tc_txnid = " + txnid;
+          LOG.debug("Going to execute insert <" + s + ">");
+
+          if ((stmt.executeUpdate(s)) < 1) {
+            //this can be reasonable for an empty txn START/COMMIT or read-only txn
+            //also an IUD with DP that didn't match any rows.
+            LOG.info("Expected to move at least one record from txn_components to " +
+                    "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
+          }
+        } else {
+          if (rqst.isSetWriteEventInfos()) {
+            List<String> rows = new ArrayList<>();
+            for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
+              rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," +
+                      quoteString(writeEventInfo.getTable()) + "," +
+                      quoteString(writeEventInfo.getPartition()) + "," +
+                      writeEventInfo.getWriteId());
+            }
+            List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
+                    "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid)", rows);
+            for (String q : queries) {
+              LOG.debug("Going to execute insert  <" + q + "> ");
+              stmt.execute(q);
+            }
+          }
+
+          s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
+                  " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
+          LOG.info("Repl going to execute  <" + s + ">");
+          stmt.executeUpdate(s);
         }
+
         // Obtain information that we need to update registry
-        s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS where ctc_txnid = " + txnid;
+        s = "select ctc_database, ctc_table, ctc_writeid, ctc_timestamp from COMPLETED_TXN_COMPONENTS" +
+                " where ctc_txnid = " + txnid;
+
         LOG.debug("Going to extract table modification information for invalidation cache <" + s + ">");
         rs = stmt.executeQuery(s);
         while (rs.next()) {
@@ -1008,27 +1077,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           txnComponents.add(new TransactionRegistryInfo(rs.getString(1), rs.getString(2),
               rs.getLong(3), rs.getTimestamp(4, Calendar.getInstance(TimeZone.getTimeZone("UTC"))).getTime()));
         }
+
+        // cleanup all txn related metadata
         s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         s = "delete from TXNS where txn_id = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
-
-        if (rqst.isSetReplPolicy()) {
-          s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-                  " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-          LOG.info("Repl going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
-        }
-
         if (transactionalListeners != null) {
           MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
                   EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator);
@@ -1548,6 +1611,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   }
   @Override
+  @RetrySemantics.Idempotent
+  public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent)
+          throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        //Idempotent case is handled by notify Event
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                EventMessage.EventType.ACID_WRITE, acidWriteEvent, dbConn, sqlGenerator);
+        LOG.debug("Going to commit");
+        dbConn.commit();
+        return;
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        if (isDuplicateKeyError(e)) {
+          // in case of key duplicate error, retry as it might be because of race condition
+          if (waitForRetry("addWriteNotificationLog(" + acidWriteEvent + ")", e.getMessage())) {
+            throw new RetryException();
+          }
+          retryNum = 0;
+          throw new MetaException(e.getMessage());
+        }
+        checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent + ")");
+        throw new MetaException("Unable to add write notification event " + StringUtils.stringifyException(e));
+      } finally{
+        closeDbConn(dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      addWriteNotificationLog(acidWriteEvent);
+    }
+  }
+
+  @Override
   @RetrySemantics.SafeToRetry
   public void performWriteSetGC() {
     Connection dbConn = null;
@@ -3046,6 +3146,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     closeStmt(stmt);
     closeDbConn(dbConn);
   }
+
+  private boolean waitForRetry(String caller, String errMsg) {
+    if (retryNum++ < retryLimit) {
+      LOG.warn("Retryable error detected in " + caller + ".  Will wait " + retryInterval +
+              "ms and retry up to " + (retryLimit - retryNum + 1) + " times.  Error: " + errMsg);
+      try {
+        Thread.sleep(retryInterval);
+      } catch (InterruptedException ex) {
+        //
+      }
+      return true;
+    } else {
+      LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + errMsg);
+    }
+    return false;
+  }
   /**
    * Determine if an exception was such that it makes sense to retry.  Unfortunately there is no standard way to do
    * this, so we have to inspect the error messages and catch the telltale signs for each
@@ -3089,18 +3205,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
       } else if (isRetryable(conf, e)) {
         //in MSSQL this means Communication Link Failure
-        if (retryNum++ < retryLimit) {
-          LOG.warn("Retryable error detected in " + caller + ".  Will wait " + retryInterval +
-            "ms and retry up to " + (retryLimit - retryNum + 1) + " times.  Error: " + getMessage(e));
-          try {
-            Thread.sleep(retryInterval);
-          } catch (InterruptedException ex) {
-            //
-          }
-          sendRetrySignal = true;
-        } else {
-          LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
-        }
+        sendRetrySignal = waitForRetry(caller, e.getMessage());
       }
       else {
         //make sure we know we saw an error that we don't recognize

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index ef447e1..d972d10 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 
 import java.sql.SQLException;
 import java.util.Iterator;
@@ -86,6 +87,9 @@ public interface TxnStore extends Configurable {
   @RetrySemantics.Idempotent
   OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
 
+  @RetrySemantics.Idempotent
+  long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException;
+
   /**
    * Abort (rollback) a transaction.
    * @param rqst info on transaction to abort
@@ -476,4 +480,11 @@ public interface TxnStore extends Configurable {
    */
   @RetrySemantics.Idempotent
   void setHadoopJobId(String hadoopJobId, long id);
+
+  /**
+   * Add the ACID write event information to writeNotificationLog table.
+   * @param acidWriteEvent
+   */
+  @RetrySemantics.Idempotent
+  void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
index 963e12f..154db4b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
@@ -511,7 +511,6 @@ public class FileUtils {
     return new Path(scheme, authority, pathUri.getPath());
   }
 
-
   /**
    * Returns a BEST GUESS as to whether or not other is a subdirectory of parent. It does not
    * take into account any intricacies of the underlying file system, which is assumed to be
@@ -524,4 +523,15 @@ public class FileUtils {
   public static boolean isSubdirectory(String parent, String other) {
     return other.startsWith(parent.endsWith(Path.SEPARATOR) ? parent : parent + Path.SEPARATOR);
   }
+
+  public static Path getTransformedPath(String name, String subDir, String root) {
+    if (root != null) {
+      Path newPath = new Path(root);
+      if (subDir != null) {
+        newPath = new Path(newPath, subDir);
+      }
+      return new Path(newPath, name);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/resources/package.jdo
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/resources/package.jdo b/standalone-metastore/src/main/resources/package.jdo
index 1be3e98..5fb548c 100644
--- a/standalone-metastore/src/main/resources/package.jdo
+++ b/standalone-metastore/src/main/resources/package.jdo
@@ -1182,6 +1182,41 @@
       </field>
     </class>
 
+    <class name="MTxnWriteNotificationLog" table="TXN_WRITE_NOTIFICATION_LOG" identity-type="datastore" detachable="true">
+      <datastore-identity strategy="increment"/>
+      <datastore-identity key-cache-size="1"/>
+      <datastore-identity>
+        <column name="WNL_ID"/>
+      </datastore-identity>
+      <field name="txnId">
+        <column name="WNL_TXNID" jdbc-type="BIGINT" allows-null="false"/>
+      </field>
+      <field name="writeId">
+        <column name="WNL_WRITEID" jdbc-type="BIGINT" allows-null="false"/>
+      </field>
+      <field name="database">
+        <column name="WNL_DATABASE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+      </field>
+      <field name="table">
+        <column name="WNL_TABLE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+      </field>
+      <field name="partition">
+        <column name="WNL_PARTITION" length="1024" jdbc-type="VARCHAR" allows-null="false"/>
+      </field>
+      <field name="tableObject">
+        <column name="WNL_TABLE_OBJ" jdbc-type="LONGVARCHAR"/>
+      </field>
+      <field name="partObject">
+        <column name="WNL_PARTITION_OBJ" jdbc-type="LONGVARCHAR"/>
+      </field>
+      <field name="files">
+        <column name="WNL_FILES" jdbc-type="LONGVARCHAR"/>
+      </field>
+      <field name="eventTime">
+        <column name="WNL_EVENT_TIME" jdbc-type="INTEGER" allows-null="false"/>
+      </field>
+    </class>
+
     <class name="MWMResourcePlan" identity-type="datastore" table="WM_RESOURCEPLAN" detachable="true">
       <datastore-identity>
         <column name="RP_ID"/>

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
index 352b43e..a696d06 100644
--- a/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/hive-schema-3.1.0.derby.sql
@@ -689,6 +689,21 @@ CREATE TABLE "APP"."RUNTIME_STATS" (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ clob NOT NULL,
+  WNL_PARTITION_OBJ clob,
+  WNL_FILES clob,
+  WNL_EVENT_TIME integer NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index bb69105..7cab4fb 100644
--- a/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -689,6 +689,21 @@ CREATE TABLE "APP"."RUNTIME_STATS" (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ clob NOT NULL,
+  WNL_PARTITION_OBJ clob,
+  WNL_FILES clob,
+  WNL_EVENT_TIME integer NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
index 7b7a8a2..10f1373 100644
--- a/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/upgrade-2.3.0-to-3.0.0.derby.sql
@@ -244,7 +244,6 @@ CREATE TABLE MIN_HISTORY_LEVEL (
 
 CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID);
 
-
 CREATE TABLE "APP"."RUNTIME_STATS" (
   "RS_ID" bigint primary key,
   "CREATE_TIME" integer not null,

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql b/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
index 6621ef7..7058ab0 100644
--- a/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
+++ b/standalone-metastore/src/main/sql/derby/upgrade-3.0.0-to-3.1.0.derby.sql
@@ -29,5 +29,21 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE integer;
 
 CREATE INDEX "APP"."TAB_COL_STATS_IDX" ON "APP"."TAB_COL_STATS" ("CAT_NAME", "DB_NAME", "TABLE_NAME", "COLUMN_NAME");
 
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ clob NOT NULL,
+  WNL_PARTITION_OBJ clob,
+  WNL_FILES clob,
+  WNL_EVENT_TIME integer NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
index bc11b40..d7722dc 100644
--- a/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/hive-schema-3.1.0.mssql.sql
@@ -1248,6 +1248,23 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE nvarchar(128) NOT NULL,
+  WNL_TABLE nvarchar(128) NOT NULL,
+  WNL_PARTITION nvarchar(1024) NOT NULL,
+  WNL_TABLE_OBJ text NOT NULL,
+  WNL_PARTITION_OBJ text,
+  WNL_FILES text,
+  WNL_EVENT_TIME int NOT NULL
+);
+
+ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 922e8fe..a81fc40 100644
--- a/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1249,6 +1249,23 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE nvarchar(128) NOT NULL,
+  WNL_TABLE nvarchar(128) NOT NULL,
+  WNL_PARTITION nvarchar(1024) NOT NULL,
+  WNL_TABLE_OBJ text NOT NULL,
+  WNL_PARTITION_OBJ text,
+  WNL_FILES text,
+  WNL_EVENT_TIME int NOT NULL
+);
+
+ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql b/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
index abb80d6..41f23f7 100644
--- a/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
+++ b/standalone-metastore/src/main/sql/mssql/upgrade-3.0.0-to-3.1.0.mssql.sql
@@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD TXN_TYPE int NULL;
 
 CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME);
 
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE nvarchar(128) NOT NULL,
+  WNL_TABLE nvarchar(128) NOT NULL,
+  WNL_PARTITION nvarchar(1024) NOT NULL,
+  WNL_TABLE_OBJ text NOT NULL,
+  WNL_PARTITION_OBJ text,
+  WNL_FILES text,
+  WNL_EVENT_TIME int NOT NULL
+);
+ALTER TABLE TXN_WRITE_NOTIFICATION_LOG ADD CONSTRAINT TXN_WRITE_NOTIFICATION_LOG_PK PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS MESSAGE;

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
index c54df55..c65af1e 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.0.0.mysql.sql
@@ -1155,7 +1155,6 @@ CREATE TABLE REPL_TXN_MAP (
   PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
-
 CREATE TABLE RUNTIME_STATS (
   RS_ID bigint primary key,
   CREATE_TIME bigint NOT NULL,

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
index af955dc..29d4a43 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-3.1.0.mysql.sql
@@ -1173,6 +1173,22 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ longtext NOT NULL,
+  WNL_PARTITION_OBJ longtext,
+  WNL_FILES longtext,
+  WNL_EVENT_TIME INT(11) NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 6c40e6e..968f4a4 100644
--- a/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -1173,6 +1173,22 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ longtext NOT NULL,
+  WNL_PARTITION_OBJ longtext,
+  WNL_FILES longtext,
+  WNL_EVENT_TIME INT(11) NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+
+INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
index 9b87563..786e38a 100644
--- a/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/upgrade-2.3.0-to-3.0.0.mysql.sql
@@ -319,8 +319,8 @@ UPDATE COMPLETED_TXN_COMPONENTS SET CTC_WRITEID = CTC_TXNID;
 
 ALTER TABLE TXN_COMPONENTS MODIFY COLUMN TC_TABLE varchar(128) NULL;
 
+ALTER TABLE `TBLS` ADD COLUMN `OWNER_TYPE` VARCHAR(10) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS ' ';
-
-ALTER TABLE `TBLS` ADD COLUMN `OWNER_TYPE` VARCHAR(10) CHARACTER SET latin1 COLLATE latin1_bin DEFAULT NULL;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql b/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
index 305fa1d..e103bef 100644
--- a/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
+++ b/standalone-metastore/src/main/sql/mysql/upgrade-3.0.0-to-3.1.0.mysql.sql
@@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE int DEFAULT NULL;
 
 CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME) USING BTREE;
 
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID bigint NOT NULL,
+  WNL_TXNID bigint NOT NULL,
+  WNL_WRITEID bigint NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ longtext NOT NULL,
+  WNL_PARTITION_OBJ longtext,
+  WNL_FILES longtext,
+  WNL_EVENT_TIME INT(11) NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO `SEQUENCE_TABLE` (`SEQUENCE_NAME`, `NEXT_VAL`) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS ' ';

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
index 63cc1f7..3e2e282 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.0.0.oracle.sql
@@ -1134,7 +1134,6 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
-
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
index bc13703..9adea31 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-3.1.0.oracle.sql
@@ -1143,6 +1143,21 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID number(19) NOT NULL,
+  WNL_TXNID number(19) NOT NULL,
+  WNL_WRITEID number(19) NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ clob NOT NULL,
+  WNL_PARTITION_OBJ clob,
+  WNL_FILES clob,
+  WNL_EVENT_TIME number(10) NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index e12150a..faca669 100644
--- a/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -1143,6 +1143,21 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID number(19) NOT NULL,
+  WNL_TXNID number(19) NOT NULL,
+  WNL_WRITEID number(19) NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ clob NOT NULL,
+  WNL_PARTITION_OBJ clob,
+  WNL_FILES clob,
+  WNL_EVENT_TIME number(10) NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
index ce3437f..71f5034 100644
--- a/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/upgrade-2.3.0-to-3.0.0.oracle.sql
@@ -335,8 +335,8 @@ INSERT INTO TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID, T2W_WRITEID)
 UPDATE TXN_COMPONENTS SET TC_WRITEID = TC_TXNID;
 UPDATE COMPLETED_TXN_COMPONENTS SET CTC_WRITEID = CTC_TXNID;
 
+ALTER TABLE TBLS ADD OWNER_TYPE VARCHAR2(10) NULL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 2.3.0 to 3.0.0' AS Status from dual;
-
-ALTER TABLE TBLS ADD OWNER_TYPE VARCHAR2(10) NULL;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql b/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
index ccdea54..cf8699b 100644
--- a/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
+++ b/standalone-metastore/src/main/sql/oracle/upgrade-3.0.0-to-3.1.0.oracle.sql
@@ -30,6 +30,22 @@ ALTER TABLE TXNS ADD TXN_TYPE number(10) NULL;
 
 CREATE INDEX TAB_COL_STATS_IDX ON TAB_COL_STATS (CAT_NAME, DB_NAME, TABLE_NAME, COLUMN_NAME);
 
+-- HIVE-19267
+CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (
+  WNL_ID number(19) NOT NULL,
+  WNL_TXNID number(19) NOT NULL,
+  WNL_WRITEID number(19) NOT NULL,
+  WNL_DATABASE varchar(128) NOT NULL,
+  WNL_TABLE varchar(128) NOT NULL,
+  WNL_PARTITION varchar(1024) NOT NULL,
+  WNL_TABLE_OBJ clob NOT NULL,
+  WNL_PARTITION_OBJ clob,
+  WNL_FILES clob,
+  WNL_EVENT_TIME number(10) NOT NULL,
+  PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION)
+);
+INSERT INTO SEQUENCE_TABLE (SEQUENCE_NAME, NEXT_VAL) VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='3.1.0', VERSION_COMMENT='Hive release version 3.1.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0' AS Status from dual;

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
index 97697f8..b89c87f 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.0.0.postgres.sql
@@ -1812,7 +1812,6 @@ CREATE TABLE REPL_TXN_MAP (
   PRIMARY KEY (RTM_REPL_POLICY, RTM_SRC_TXN_ID)
 );
 
-
 CREATE TABLE RUNTIME_STATS (
  RS_ID bigint primary key,
  CREATE_TIME bigint NOT NULL,
@@ -1822,7 +1821,6 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
-
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script
 -- -----------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
index 36bab70..7a8a419 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-3.1.0.postgres.sql
@@ -1834,6 +1834,21 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" (
+  "WNL_ID" bigint NOT NULL,
+  "WNL_TXNID" bigint NOT NULL,
+  "WNL_WRITEID" bigint NOT NULL,
+  "WNL_DATABASE" varchar(128) NOT NULL,
+  "WNL_TABLE" varchar(128) NOT NULL,
+  "WNL_PARTITION" varchar(1024) NOT NULL,
+  "WNL_TABLE_OBJ" text NOT NULL,
+  "WNL_PARTITION_OBJ" text,
+  "WNL_FILES" text,
+  "WNL_EVENT_TIME" integer NOT NULL,
+  PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION")
+);
+
+INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index b73e1d1..2e7ac5a 100644
--- a/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1836,6 +1836,21 @@ CREATE TABLE RUNTIME_STATS (
 
 CREATE INDEX IDX_RUNTIME_STATS_CREATE_TIME ON RUNTIME_STATS(CREATE_TIME);
 
+CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" (
+  "WNL_ID" bigint NOT NULL,
+  "WNL_TXNID" bigint NOT NULL,
+  "WNL_WRITEID" bigint NOT NULL,
+  "WNL_DATABASE" varchar(128) NOT NULL,
+  "WNL_TABLE" varchar(128) NOT NULL,
+  "WNL_PARTITION" varchar(1024) NOT NULL,
+  "WNL_TABLE_OBJ" text NOT NULL,
+  "WNL_PARTITION_OBJ" text,
+  "WNL_FILES" text,
+  "WNL_EVENT_TIME" integer NOT NULL,
+  PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION")
+);
+
+INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
 
 -- -----------------------------------------------------------------
 -- Record schema version. Should be the last step in the init script

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql b/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
index 2c0eb31..445c3a2 100644
--- a/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
+++ b/standalone-metastore/src/main/sql/postgres/upgrade-3.0.0-to-3.1.0.postgres.sql
@@ -32,6 +32,22 @@ ALTER TABLE TXNS ADD COLUMN TXN_TYPE integer DEFAULT NULL;
 
 CREATE INDEX "TAB_COL_STATS_IDX" ON "TAB_COL_STATS" USING btree ("CAT_NAME", "DB_NAME","TABLE_NAME","COLUMN_NAME");
 
+-- HIVE-19267
+CREATE TABLE "TXN_WRITE_NOTIFICATION_LOG" (
+  "WNL_ID" bigint NOT NULL,
+  "WNL_TXNID" bigint NOT NULL,
+  "WNL_WRITEID" bigint NOT NULL,
+  "WNL_DATABASE" varchar(128) NOT NULL,
+  "WNL_TABLE" varchar(128) NOT NULL,
+  "WNL_PARTITION" varchar(1024) NOT NULL,
+  "WNL_TABLE_OBJ" text NOT NULL,
+  "WNL_PARTITION_OBJ" text,
+  "WNL_FILES" text,
+  "WNL_EVENT_TIME" integer NOT NULL,
+  PRIMARY KEY ("WNL_TXNID", "WNL_DATABASE", "WNL_TABLE", "WNL_PARTITION")
+);
+INSERT INTO "SEQUENCE_TABLE" ("SEQUENCE_NAME", "NEXT_VAL") VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', 1);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='3.1.0', "VERSION_COMMENT"='Hive release version 3.1.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.0.0 to 3.1.0';

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/main/thrift/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/thrift/hive_metastore.thrift b/standalone-metastore/src/main/thrift/hive_metastore.thrift
index 6e503eb..1ca6454 100644
--- a/standalone-metastore/src/main/thrift/hive_metastore.thrift
+++ b/standalone-metastore/src/main/thrift/hive_metastore.thrift
@@ -867,6 +867,18 @@ struct AbortTxnsRequest {
 struct CommitTxnRequest {
     1: required i64 txnid,
     2: optional string replPolicy,
+    // Information related to write operations done in this transaction.
+    3: optional list<WriteEventInfo> writeEventInfos,
+}
+
+struct WriteEventInfo {
+    1: required i64    writeId,
+    2: required string database,
+    3: required string table,
+    4: required string files,
+    5: optional string partition,
+    6: optional string tableObj, // repl txn task does not need table object for commit
+    7: optional string partitionObj,
 }
 
 struct ReplTblWriteIdStateRequest {
@@ -1102,6 +1114,8 @@ struct InsertEventRequestData {
     2: required list<string> filesAdded,
     // Checksum of files (hex string of checksum byte payload)
     3: optional list<string> filesAddedChecksum,
+    // Used by acid operation to create the sub directory
+    4: optional list<string> subDirectoryList,
 }
 
 union FireEventRequestData {
@@ -1122,7 +1136,20 @@ struct FireEventRequest {
 struct FireEventResponse {
     // NOP for now, this is just a place holder for future responses
 }
-    
+
+struct WriteNotificationLogRequest {
+    1: required i64 txnId,
+    2: required i64 writeId,
+    3: required string db,
+    4: required string table,
+    5: required InsertEventRequestData fileInfo,
+    6: optional list<string> partitionVals,
+}
+
+struct WriteNotificationLogResponse {
+    // NOP for now, this is just a place holder for future responses
+}
+
 struct MetadataPpdResult {
   1: optional binary metadata,
   2: optional binary includeBitset
@@ -2104,6 +2131,7 @@ service ThriftHiveMetastore extends fb303.FacebookService
   NotificationEventsCountResponse get_notification_events_count(1:NotificationEventsCountRequest rqst)
   FireEventResponse fire_listener_event(1:FireEventRequest rqst)
   void flushCache()
+  WriteNotificationLogResponse add_write_notification_log(WriteNotificationLogRequest rqst)
 
   // Repl Change Management api
   CmRecycleResponse cm_recycle(1:CmRecycleRequest request) throws(1:MetaException o1)

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
index c482d28..2454479 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreControlledCommit.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.ColStatsObjWithSourceInfo;
 import org.apache.thrift.TException;
@@ -1195,6 +1196,16 @@ public class DummyRawStoreControlledCommit implements RawStore, Configurable {
   }
 
   @Override
+  public void cleanWriteNotificationEvents(int olderThan) {
+    objectStore.cleanWriteNotificationEvents(olderThan);
+  }
+
+  @Override
+  public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+    return objectStore.getAllWriteEventInfo(txnId, dbName, tableName);
+  }
+
+  @Override
   public List<TableName> getTableNamesWithStats() throws MetaException,
       NoSuchObjectException {
     return null;

http://git-wip-us.apache.org/repos/asf/hive/blob/f519db7e/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
index d253005..9b79446 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/DummyRawStoreForJdoConnection.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownPartitionException;
 import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
@@ -1199,4 +1200,13 @@ public class DummyRawStoreForJdoConnection implements RawStore {
       NoSuchObjectException {
     return null;
   }
+
+  @Override
+  public void cleanWriteNotificationEvents(int olderThan) {
+  }
+
+  @Override
+  public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+    return null;
+  }
 }