You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/24 11:43:01 UTC

[05/19] hive git commit: HIVE-19267: Replicate ACID/MM tables write operations (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index f7018c2..ac1d3c8 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -59,6 +59,7 @@ public class ReplChangeManager {
   static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
   private static final String URI_FRAGMENT_SEPARATOR = "#";
   public static final String SOURCE_OF_REPLICATION = "repl.source.for";
+  private static final String TXN_WRITE_EVENT_FILE_SEPARATOR = "]";
 
   public enum RecycleType {
     MOVE,
@@ -472,7 +473,6 @@ public class ReplChangeManager {
   }
 
   public static boolean isSourceOfReplication(Database db) {
-    // Can not judge, so assuming replication is not enabled.
     assert (db != null);
     String replPolicyIds = getReplPolicyIdString(db);
     return  !StringUtils.isEmpty(replPolicyIds);
@@ -490,4 +490,12 @@ public class ReplChangeManager {
     }
     return null;
   }
+
+  public static String joinWithSeparator(Iterable<?> strings) {
+    return org.apache.hadoop.util.StringUtils.join(TXN_WRITE_EVENT_FILE_SEPARATOR, strings);
+  }
+
+  public static String[] getListFromSeparatedString(String commaSeparatedString) {
+    return commaSeparatedString.split("\\s*" + TXN_WRITE_EVENT_FILE_SEPARATOR + "\\s*");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
index f6286ea..54c833d 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/cache/CachedStore.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.hive.metastore.api.UnknownTableException;
 import org.apache.hadoop.hive.metastore.api.WMFullResourcePlan;
 import org.apache.hadoop.hive.metastore.api.WMMapping;
 import org.apache.hadoop.hive.metastore.api.WMPool;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.partition.spec.PartitionSpecProxy;
@@ -2407,6 +2408,17 @@ public class CachedStore implements RawStore, Configurable {
     return sharedCache.getUpdateCount();
   }
 
+  @Override
+  public void cleanWriteNotificationEvents(int olderThan) {
+    rawStore.cleanWriteNotificationEvents(olderThan);
+  }
+
+
+  @Override
+  public List<WriteEventInfo> getAllWriteEventInfo(long txnId, String dbName, String tableName) throws MetaException {
+    return rawStore.getAllWriteEventInfo(txnId, dbName, tableName);
+  }
+
   static boolean isNotInBlackList(String catName, String dbName, String tblName) {
     String str = TableName.getQualified(catName, dbName, tblName);
     for (Pattern pattern : blacklistPatterns) {

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
new file mode 100644
index 0000000..001179a
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/events/AcidWriteEvent.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.events;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteNotificationLogRequest;
+import org.apache.hadoop.hive.metastore.utils.StringUtils;
+
+import java.util.List;
+
+/**
+ * AcidWriteEvent
+ * Event generated for acid write operations
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class AcidWriteEvent extends ListenerEvent {
+  private final WriteNotificationLogRequest writeNotificationLogRequest;
+  private final String partition;
+  private final Table tableObj;
+  private final Partition partitionObj;
+
+  public AcidWriteEvent(String partition, Table tableObj, Partition partitionObj,
+                        WriteNotificationLogRequest writeNotificationLogRequest) {
+    super(true, null);
+    this.writeNotificationLogRequest = writeNotificationLogRequest;
+    this.partition = partition;
+    this.tableObj = tableObj;
+    this.partitionObj = partitionObj;
+  }
+
+  public Long getTxnId() {
+    return writeNotificationLogRequest.getTxnId();
+  }
+
+  public List<String> getFiles() {
+    return writeNotificationLogRequest.getFileInfo().getFilesAdded();
+  }
+
+  public List<String> getChecksums() {
+    return writeNotificationLogRequest.getFileInfo().getFilesAddedChecksum();
+  }
+
+  public String getDatabase() {
+    return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getDb());
+  }
+
+  public String getTable() {
+    return StringUtils.normalizeIdentifier(writeNotificationLogRequest.getTable());
+  }
+
+  public String getPartition() {
+    return partition; //Don't normalize partition value, as its case sensitive.
+  }
+
+  public Long getWriteId() {
+    return writeNotificationLogRequest.getWriteId();
+  }
+
+  public Table getTableObj() {
+    return tableObj;
+  }
+
+  public Partition getPartitionObj() {
+    return partitionObj;
+  }
+
+  public List<String> getSubDirs() {
+    return writeNotificationLogRequest.getFileInfo().getSubDirectoryList();
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
new file mode 100644
index 0000000..e2c9ccf
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/AcidWriteMessage.java
@@ -0,0 +1,50 @@
+/* * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging;
+
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import java.util.List;
+
+/**
+ * HCat message sent when an ACID write is done.
+ */
+public abstract class AcidWriteMessage extends EventMessage {
+
+  protected AcidWriteMessage() {
+    super(EventType.ACID_WRITE);
+  }
+
+  public abstract Long getTxnId();
+
+  public abstract String getTable();
+
+  public abstract Long getWriteId();
+
+  public abstract String getPartition();
+
+  public abstract List<String> getFiles();
+
+  public abstract Table getTableObj() throws Exception;
+
+  public abstract Partition getPartitionObj() throws Exception;
+
+  public abstract String getTableObjStr();
+
+  public abstract String getPartitionObjStr();
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
index 49004f2..9733039 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/CommitTxnMessage.java
@@ -17,6 +17,12 @@
 
 package org.apache.hadoop.hive.metastore.messaging;
 
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
+
+import java.util.List;
+
 /**
  * HCat message sent when an commit transaction is done.
  */
@@ -33,4 +39,21 @@ public abstract class CommitTxnMessage extends EventMessage {
    */
   public abstract Long getTxnId();
 
+  public abstract List<Long> getWriteIds();
+
+  public abstract List<String> getDatabases();
+
+  public abstract List<String> getTables();
+
+  public abstract List<String> getPartitions();
+
+  public abstract Table getTableObj(int idx) throws Exception;
+
+  public abstract Partition getPartitionObj(int idx) throws Exception;
+
+  public abstract String getFiles(int idx);
+
+  public abstract List<String> getFilesList();
+
+  public abstract void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
index 969dd7b..f24b419 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/EventMessage.java
@@ -60,7 +60,8 @@ public abstract class EventMessage {
     COMMIT_TXN(MessageFactory.COMMIT_TXN_EVENT),
     ABORT_TXN(MessageFactory.ABORT_TXN_EVENT),
     ALLOC_WRITE_ID(MessageFactory.ALLOC_WRITE_ID_EVENT),
-    ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT);
+    ALTER_CATALOG(MessageFactory.ALTER_CATALOG_EVENT),
+    ACID_WRITE(MessageFactory.ACID_WRITE_EVENT);
 
     private String typeString;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
index ca33579..b701d84 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageDeserializer.java
@@ -70,6 +70,10 @@ public abstract class MessageDeserializer {
       return getCommitTxnMessage(messageBody);
     case ABORT_TXN:
       return getAbortTxnMessage(messageBody);
+    case ALLOC_WRITE_ID:
+      return getAllocWriteIdMessage(messageBody);
+    case ACID_WRITE:
+      return getAcidWriteMessage(messageBody);
     default:
       throw new IllegalArgumentException("Unsupported event-type: " + eventTypeString);
     }
@@ -186,6 +190,11 @@ public abstract class MessageDeserializer {
    */
   public abstract AllocWriteIdMessage getAllocWriteIdMessage(String messageBody);
 
+  /*
+   * Method to de-serialize AcidWriteMessage instance.
+   */
+  public abstract AcidWriteMessage getAcidWriteMessage(String messageBody);
+
   // Protection against construction.
   protected MessageDeserializer() {}
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index e0629ea..d529147 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 
 import java.util.Iterator;
@@ -74,6 +75,7 @@ public abstract class MessageFactory {
   public static final String ABORT_TXN_EVENT = "ABORT_TXN";
   public static final String ALLOC_WRITE_ID_EVENT = "ALLOC_WRITE_ID_EVENT";
   public static final String ALTER_CATALOG_EVENT = "ALTER_CATALOG";
+  public static final String ACID_WRITE_EVENT = "ACID_WRITE_EVENT";
 
   private static MessageFactory instance = null;
 
@@ -326,4 +328,14 @@ public abstract class MessageFactory {
   public abstract DropCatalogMessage buildDropCatalogMessage(Catalog catalog);
 
   public abstract AlterCatalogMessage buildAlterCatalogMessage(Catalog oldCat, Catalog newCat);
+
+  /**
+   * Factory method for building acid write message
+   *
+   *
+   * @param acidWriteEvent information related to the acid write operation
+   * @param files files added by this write operation
+   * @return instance of AcidWriteMessage
+   */
+  public abstract AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
new file mode 100644
index 0000000..515a2cb
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONAcidWriteMessage.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.messaging.json;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
+import org.apache.thrift.TException;
+import org.codehaus.jackson.annotate.JsonProperty;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * JSON implementation of AcidWriteMessage
+ */
+public class JSONAcidWriteMessage extends AcidWriteMessage {
+
+  @JsonProperty
+  private Long txnid, writeId, timestamp;
+
+  @JsonProperty
+  private String server, servicePrincipal, database, table, partition, tableObjJson, partitionObjJson;
+
+  @JsonProperty
+  private List<String> files;
+
+  /**
+   * Default constructor, needed for Jackson.
+   */
+  public JSONAcidWriteMessage() {
+  }
+
+  public JSONAcidWriteMessage(String server, String servicePrincipal, Long timestamp, AcidWriteEvent acidWriteEvent,
+                              Iterator<String> files) {
+    this.timestamp = timestamp;
+    this.txnid = acidWriteEvent.getTxnId();
+    this.server = server;
+    this.servicePrincipal = servicePrincipal;
+    this.database = acidWriteEvent.getDatabase();
+    this.table = acidWriteEvent.getTable();
+    this.writeId = acidWriteEvent.getWriteId();
+    this.partition = acidWriteEvent.getPartition();
+    try {
+      this.tableObjJson = JSONMessageFactory.createTableObjJson(acidWriteEvent.getTableObj());
+      if (acidWriteEvent.getPartitionObj() != null) {
+        this.partitionObjJson = JSONMessageFactory.createPartitionObjJson(acidWriteEvent.getPartitionObj());
+      } else {
+        this.partitionObjJson = null;
+      }
+    } catch (TException e) {
+      throw new IllegalArgumentException("Could not serialize JSONAcidWriteMessage : ", e);
+    }
+    this.files = Lists.newArrayList(files);
+  }
+
+  @Override
+  public Long getTxnId() {
+    return txnid;
+  }
+
+  @Override
+  public Long getTimestamp() {
+    return timestamp;
+  }
+
+  @Override
+  public String getDB() {
+    return database;
+  }
+
+  @Override
+  public String getServicePrincipal() {
+    return servicePrincipal;
+  }
+
+  @Override
+  public String getServer() {
+    return server;
+  }
+
+  @Override
+  public String getTable() {
+    return table;
+  }
+
+  @Override
+  public Long getWriteId() {
+    return writeId;
+  }
+
+  @Override
+  public String getPartition() {
+    return partition;
+  }
+
+  @Override
+  public List<String> getFiles() {
+    return files;
+  }
+
+  @Override
+  public Table getTableObj() throws Exception {
+    return (tableObjJson == null) ? null : (Table) JSONMessageFactory.getTObj(tableObjJson, Table.class);
+  }
+
+  @Override
+  public Partition getPartitionObj() throws Exception {
+    return ((partitionObjJson == null) ? null :
+            (Partition) JSONMessageFactory.getTObj(partitionObjJson, Partition.class));
+  }
+
+  @Override
+  public String getTableObjStr() {
+    return tableObjJson;
+  }
+
+  @Override
+  public String getPartitionObjStr() {
+    return partitionObjJson;
+  }
+
+  @Override
+  public String toString() {
+    try {
+      return JSONMessageDeserializer.mapper.writeValueAsString(this);
+    } catch (Exception exception) {
+      throw new IllegalArgumentException("Could not serialize: ", exception);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
index 595a3d1..6082b8e 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONCommitTxnMessage.java
@@ -18,9 +18,15 @@
  */
 
 package org.apache.hadoop.hive.metastore.messaging.json;
+import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.codehaus.jackson.annotate.JsonProperty;
 
+import java.util.List;
+
 /**
  * JSON implementation of CommitTxnMessage
  */
@@ -38,6 +44,12 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
   @JsonProperty
   private String servicePrincipal;
 
+  @JsonProperty
+  private List<Long> writeIds;
+
+  @JsonProperty
+  private List<String> databases, tables, partitions, tableObjs, partitionObjs, files;
+
   /**
    * Default constructor, needed for Jackson.
    */
@@ -49,6 +61,13 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
     this.txnid = txnid;
     this.server = server;
     this.servicePrincipal = servicePrincipal;
+    this.databases = null;
+    this.tables = null;
+    this.writeIds = null;
+    this.partitions = null;
+    this.tableObjs = null;
+    this.partitionObjs = null;
+    this.files = null;
   }
 
   @Override
@@ -77,6 +96,82 @@ public class JSONCommitTxnMessage extends CommitTxnMessage {
   }
 
   @Override
+  public List<Long> getWriteIds() {
+    return writeIds;
+  }
+
+  @Override
+  public List<String> getDatabases() {
+    return databases;
+  }
+
+  @Override
+  public List<String> getTables() {
+    return tables;
+  }
+
+  @Override
+  public List<String> getPartitions() {
+    return partitions;
+  }
+
+  @Override
+  public Table getTableObj(int idx) throws Exception {
+    return tableObjs == null ? null :  (Table) JSONMessageFactory.getTObj(tableObjs.get(idx), Table.class);
+  }
+
+  @Override
+  public Partition getPartitionObj(int idx) throws Exception {
+    return (partitionObjs == null ? null : (partitionObjs.get(idx) == null ? null :
+            (Partition)JSONMessageFactory.getTObj(partitionObjs.get(idx), Partition.class)));
+  }
+
+  @Override
+  public String getFiles(int idx) {
+    return files == null ? null : files.get(idx);
+  }
+
+  @Override
+  public List<String> getFilesList() {
+    return files;
+  }
+
+  @Override
+  public void addWriteEventInfo(List<WriteEventInfo> writeEventInfoList) {
+    if (this.databases == null) {
+      this.databases = Lists.newArrayList();
+    }
+    if (this.tables == null) {
+      this.tables = Lists.newArrayList();
+    }
+    if (this.writeIds == null) {
+      this.writeIds = Lists.newArrayList();
+    }
+    if (this.tableObjs == null) {
+      this.tableObjs = Lists.newArrayList();
+    }
+    if (this.partitions == null) {
+      this.partitions = Lists.newArrayList();
+    }
+    if (this.partitionObjs == null) {
+      this.partitionObjs = Lists.newArrayList();
+    }
+    if (this.files == null) {
+      this.files = Lists.newArrayList();
+    }
+
+    for (WriteEventInfo writeEventInfo : writeEventInfoList) {
+      this.databases.add(writeEventInfo.getDatabase());
+      this.tables.add(writeEventInfo.getTable());
+      this.writeIds.add(writeEventInfo.getWriteId());
+      this.partitions.add(writeEventInfo.getPartition());
+      this.tableObjs.add(writeEventInfo.getTableObj());
+      this.partitionObjs.add(writeEventInfo.getPartitionObj());
+      this.files.add(writeEventInfo.getFiles());
+    }
+  }
+
+  @Override
   public String toString() {
     try {
       return JSONMessageDeserializer.mapper.writeValueAsString(this);

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
index f54e24d..be6b751 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageDeserializer.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
 import org.codehaus.jackson.map.DeserializationConfig;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.SerializationConfig;
@@ -259,4 +260,12 @@ public class JSONMessageDeserializer extends MessageDeserializer {
       throw new IllegalArgumentException("Could not construct AllocWriteIdMessage", e);
     }
   }
+
+  public AcidWriteMessage getAcidWriteMessage(String messageBody) {
+    try {
+      return mapper.readValue(messageBody, JSONAcidWriteMessage.class);
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Could not construct AcidWriteMessage", e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index d64c3ff..07f51f0 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddForeignKeyMessage;
 import org.apache.hadoop.hive.metastore.messaging.AddNotNullConstraintMessage;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
@@ -66,6 +67,7 @@ import org.apache.hadoop.hive.metastore.messaging.OpenTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.CommitTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AbortTxnMessage;
 import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.metastore.messaging.AcidWriteMessage;
 import org.apache.thrift.TBase;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.TException;
@@ -230,11 +232,17 @@ public class JSONMessageFactory extends MessageFactory {
     return new JSONAbortTxnMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnId, now());
   }
 
+  @Override
   public AllocWriteIdMessage buildAllocWriteIdMessage(List<TxnToWriteId> txnToWriteIdList,
                                                       String dbName, String tableName) {
     return new JSONAllocWriteIdMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, txnToWriteIdList, dbName, tableName, now());
   }
 
+  @Override
+  public AcidWriteMessage buildAcidWriteMessage(AcidWriteEvent acidWriteEvent, Iterator<String> files) {
+    return new JSONAcidWriteMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, now(), acidWriteEvent, files);
+  }
+
   private long now() {
     return System.currentTimeMillis() / 1000;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
new file mode 100644
index 0000000..f5ca386
--- /dev/null
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/model/MTxnWriteNotificationLog.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.model;
+
+/**
+ * MTxnWriteNotificationLog
+ * DN table for ACID write events.
+ */
+public class MTxnWriteNotificationLog {
+  private long txnId;
+  private long writeId;
+  private int eventTime;
+  private String database;
+  private String table;
+  private String partition;
+  private String tableObject;
+  private String partObject;
+  private String files;
+
+  public MTxnWriteNotificationLog() {
+  }
+
+  public MTxnWriteNotificationLog(long txnId, long writeId, int eventTime, String database, String table,
+                               String partition, String tableObject, String partObject, String files) {
+    this.txnId = txnId;
+    this.writeId = writeId;
+    this.eventTime = eventTime;
+    this.database = database;
+    this.table = table;
+    this.partition = partition;
+    this.tableObject = tableObject;
+    this.partObject = partObject;
+    this.files = files;
+  }
+
+  public long getTxnId() {
+    return txnId;
+  }
+
+  public void setTxnId(long txnId) {
+    this.txnId = txnId;
+  }
+
+  public long getWriteId() {
+    return writeId;
+  }
+
+  public void setWriteId(long writeId) {
+    this.writeId = writeId;
+  }
+
+  public int getEventTime() {
+    return eventTime;
+  }
+
+  public void setEventTime(int eventTime) {
+    this.eventTime = eventTime;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getTable() {
+    return table;
+  }
+
+  public void setTable(String table) {
+    this.table = table;
+  }
+
+  public String getPartition() {
+    return partition;
+  }
+
+  public void setPartition(String partition) {
+    this.partition = partition;
+  }
+
+  public String getTableObject() {
+    return tableObject;
+  }
+
+  public void setTableObject(String tableObject) {
+    this.tableObject = tableObject;
+  }
+
+  public String getPartObject() {
+    return partObject;
+  }
+
+  public void setPartObject(String partObject) {
+    this.partObject = partObject;
+  }
+
+  public String getFiles() {
+    return files;
+  }
+
+  public void setFiles(String files) {
+    this.files = files;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index b23a6d7..d0ac7db 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -175,4 +175,13 @@ public final class SQLGenerator {
     return dbProduct;
   }
 
+  // This is required for SQL executed directly. If the SQL has double quotes then some dbs tend to
+  // remove the escape characters and store the variable without double quote.
+  public String addEscapeCharacters(String s) {
+    if (dbProduct ==  DatabaseProduct.MYSQL) {
+      return s.replaceAll("\\\\", "\\\\\\\\");
+    }
+    return s;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 8764c21..38cef62 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -253,6 +253,34 @@ public final class TxnDbUtil {
       stmt.execute("INSERT INTO \"APP\".\"NOTIFICATION_SEQUENCE\" (\"NNI_ID\", \"NEXT_EVENT_ID\")" +
               " SELECT * FROM (VALUES (1,1)) tmp_table WHERE NOT EXISTS ( SELECT " +
               "\"NEXT_EVENT_ID\" FROM \"APP\".\"NOTIFICATION_SEQUENCE\")");
+
+      try {
+        stmt.execute("CREATE TABLE TXN_WRITE_NOTIFICATION_LOG (" +
+                "WNL_ID bigint NOT NULL," +
+                "WNL_TXNID bigint NOT NULL," +
+                "WNL_WRITEID bigint NOT NULL," +
+                "WNL_DATABASE varchar(128) NOT NULL," +
+                "WNL_TABLE varchar(128) NOT NULL," +
+                "WNL_PARTITION varchar(1024) NOT NULL," +
+                "WNL_TABLE_OBJ clob NOT NULL," +
+                "WNL_PARTITION_OBJ clob," +
+                "WNL_FILES clob," +
+                "WNL_EVENT_TIME integer NOT NULL," +
+                "PRIMARY KEY (WNL_TXNID, WNL_DATABASE, WNL_TABLE, WNL_PARTITION))"
+        );
+      } catch (SQLException e) {
+        if (e.getMessage() != null && e.getMessage().contains("already exists")) {
+          LOG.info("TXN_WRITE_NOTIFICATION_LOG table already exist, ignoring");
+        } else {
+          throw e;
+        }
+      }
+
+      stmt.execute("INSERT INTO \"APP\".\"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") " +
+              "SELECT * FROM (VALUES ('org.apache.hadoop.hive.metastore.model.MTxnWriteNotificationLog', " +
+              "1)) tmp_table WHERE NOT EXISTS ( SELECT \"NEXT_VAL\" FROM \"APP\"" +
+              ".\"SEQUENCE_TABLE\" WHERE \"SEQUENCE_NAME\" = 'org.apache.hadoop.hive.metastore" +
+              ".model.MTxnWriteNotificationLog')");
     } catch (SQLException e) {
       try {
         conn.rollback();

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index b2a22f1..4f56eba 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -80,6 +80,7 @@ import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage;
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -702,6 +703,38 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   @Override
   @RetrySemantics.Idempotent
+  public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException {
+    try {
+      Connection dbConn = null;
+      Statement stmt = null;
+      try {
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        stmt = dbConn.createStatement();
+        List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt);
+        if (targetTxnIds.isEmpty()) {
+          LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy);
+          return -1;
+        }
+        assert (targetTxnIds.size() == 1);
+        return targetTxnIds.get(0);
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "getTargetTxnId(" + replPolicy + sourceTxnId + ")");
+        throw new MetaException("Unable to get target transaction id "
+                + StringUtils.stringifyException(e));
+      } finally {
+        close(null, stmt, dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      return getTargetTxnId(replPolicy, sourceTxnId);
+    }
+  }
+
+  @Override
+  @RetrySemantics.Idempotent
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
     long txnid = rqst.getTxnid();
     long sourceTxnId = -1;
@@ -893,10 +926,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           shouldNeverHappen(txnid);
           //dbConn is rolled back in finally{}
         }
-        String conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
-          quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
-        rs = stmt.executeQuery(sqlGenerator.addLimitClause(1, "tc_operation_type " + conflictSQLSuffix));
-        if (rs.next()) {
+
+        String conflictSQLSuffix = null;
+        if (rqst.isSetReplPolicy()) {
+          rs = null;
+        } else {
+          conflictSQLSuffix = "from TXN_COMPONENTS where tc_txnid=" + txnid + " and tc_operation_type IN(" +
+                  quoteChar(OpertaionType.UPDATE.sqlConst) + "," + quoteChar(OpertaionType.DELETE.sqlConst) + ")";
+          rs = stmt.executeQuery(sqlGenerator.addLimitClause(1,
+                   "tc_operation_type " + conflictSQLSuffix));
+        }
+
+        if (rs != null && rs.next()) {
           isUpdateDelete = 'Y';
           close(rs);
           //if here it means currently committing txn performed update/delete and we should check WW conflict
@@ -985,45 +1026,66 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            * Consider: if RO txn is after a W txn, then RO's openTxns() will be mutexed with W's
            * commitTxn() because both do S4U on NEXT_TXN_ID and thus RO will see result of W txn.
            * If RO < W, then there is no reads-from relationship.
+           * In replication flow we don't expect any write write conflict as it should have been handled at source.
            */
         }
-        // Move the record from txn_components into completed_txn_components so that the compactor
-        // knows where to look to compact.
-        String s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
-            "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid, tc_database, tc_table, " +
-            "tc_partition, tc_writeid, '" + isUpdateDelete + "' from TXN_COMPONENTS where tc_txnid = " + txnid;
-        LOG.debug("Going to execute insert <" + s + ">");
-        int modCount = 0;
-        if ((modCount = stmt.executeUpdate(s)) < 1) {
-          //this can be reasonable for an empty txn START/COMMIT or read-only txn
-          //also an IUD with DP that didn't match any rows.
-          LOG.info("Expected to move at least one record from txn_components to " +
-            "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
+
+        String s;
+        if (!rqst.isSetReplPolicy()) {
+          // Move the record from txn_components into completed_txn_components so that the compactor
+          // knows where to look to compact.
+          s = "insert into COMPLETED_TXN_COMPONENTS (ctc_txnid, ctc_database, " +
+                  "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid, tc_database, tc_table, " +
+                  "tc_partition, tc_writeid, '" + isUpdateDelete + "' from TXN_COMPONENTS where tc_txnid = " + txnid;
+          LOG.debug("Going to execute insert <" + s + ">");
+          int modCount = 0;
+          if ((modCount = stmt.executeUpdate(s)) < 1) {
+            //this can be reasonable for an empty txn START/COMMIT or read-only txn
+            //also an IUD with DP that didn't match any rows.
+            LOG.info("Expected to move at least one record from txn_components to " +
+                    "completed_txn_components when committing txn! " + JavaUtils.txnIdToString(txnid));
+          }
+        } else {
+          if (rqst.isSetWriteEventInfos()) {
+            List<String> rows = new ArrayList<>();
+            for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
+              rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," +
+                      quoteString(writeEventInfo.getTable()) + "," +
+                      quoteString(writeEventInfo.getPartition()) + "," +
+                      writeEventInfo.getWriteId() + "," +
+                      quoteChar(isUpdateDelete));
+            }
+            List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
+                   "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)", rows);
+            for (String q : queries) {
+              LOG.debug("Going to execute insert  <" + q + "> ");
+              stmt.execute(q);
+            }
+          }
+
+          s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
+                  " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
+          LOG.info("Repl going to execute  <" + s + ">");
+          stmt.executeUpdate(s);
         }
+
         s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         s = "delete from HIVE_LOCKS where hl_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         s = "delete from TXNS where txn_id = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         s = "delete from MIN_HISTORY_LEVEL where mhl_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
+        stmt.executeUpdate(s);
         LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL");
 
         s = "delete from MATERIALIZATION_REBUILD_LOCKS where mrl_txn_id = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
-        modCount = stmt.executeUpdate(s);
-
-        if (rqst.isSetReplPolicy()) {
-          s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-                  " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-          LOG.info("Repl going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
-        }
+        stmt.executeUpdate(s);
 
         if (transactionalListeners != null) {
           MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
@@ -1526,6 +1588,43 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   }
   @Override
+  @RetrySemantics.Idempotent
+  public void addWriteNotificationLog(AcidWriteEvent acidWriteEvent)
+          throws MetaException {
+    Connection dbConn = null;
+    try {
+      try {
+        //Idempotent case is handled by notify Event
+        lockInternal();
+        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+        MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
+                EventMessage.EventType.ACID_WRITE, acidWriteEvent, dbConn, sqlGenerator);
+        LOG.debug("Going to commit");
+        dbConn.commit();
+        return;
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        if (isDuplicateKeyError(e)) {
+          // in case of key duplicate error, retry as it might be because of race condition
+          if (waitForRetry("addWriteNotificationLog(" + acidWriteEvent + ")", e.getMessage())) {
+            throw new RetryException();
+          }
+          retryNum = 0;
+          throw new MetaException(e.getMessage());
+        }
+        checkRetryable(dbConn, e, "addWriteNotificationLog(" + acidWriteEvent + ")");
+        throw new MetaException("Unable to add write notification event " + StringUtils.stringifyException(e));
+      } finally{
+        closeDbConn(dbConn);
+        unlockInternal();
+      }
+    } catch (RetryException e) {
+      addWriteNotificationLog(acidWriteEvent);
+    }
+  }
+
+  @Override
   @RetrySemantics.SafeToRetry
   public void performWriteSetGC() {
     Connection dbConn = null;
@@ -3214,6 +3313,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     closeStmt(stmt);
     closeDbConn(dbConn);
   }
+
+  private boolean waitForRetry(String caller, String errMsg) {
+    if (retryNum++ < retryLimit) {
+      LOG.warn("Retryable error detected in " + caller + ".  Will wait " + retryInterval +
+              "ms and retry up to " + (retryLimit - retryNum + 1) + " times.  Error: " + errMsg);
+      try {
+        Thread.sleep(retryInterval);
+      } catch (InterruptedException ex) {
+        //
+      }
+      return true;
+    } else {
+      LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + errMsg);
+    }
+    return false;
+  }
   /**
    * Determine if an exception was such that it makes sense to retry.  Unfortunately there is no standard way to do
    * this, so we have to inspect the error messages and catch the telltale signs for each
@@ -3257,18 +3372,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
       } else if (isRetryable(conf, e)) {
         //in MSSQL this means Communication Link Failure
-        if (retryNum++ < retryLimit) {
-          LOG.warn("Retryable error detected in " + caller + ".  Will wait " + retryInterval +
-            "ms and retry up to " + (retryLimit - retryNum + 1) + " times.  Error: " + getMessage(e));
-          try {
-            Thread.sleep(retryInterval);
-          } catch (InterruptedException ex) {
-            //
-          }
-          sendRetrySignal = true;
-        } else {
-          LOG.error("Fatal error in " + caller + ". Retry limit (" + retryLimit + ") reached. Last error: " + getMessage(e));
-        }
+        sendRetrySignal = waitForRetry(caller, e.getMessage());
       }
       else {
         //make sure we know we saw an error that we don't recognize

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 2c47ee4..33f24fb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.common.classification.RetrySemantics;
 import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.events.AcidWriteEvent;
 
 import java.sql.SQLException;
 import java.util.Iterator;
@@ -88,6 +89,9 @@ public interface TxnStore extends Configurable {
   @RetrySemantics.Idempotent
   OpenTxnsResponse openTxns(OpenTxnRequest rqst) throws MetaException;
 
+  @RetrySemantics.Idempotent
+  long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException;
+
   /**
    * Abort (rollback) a transaction.
    * @param rqst info on transaction to abort
@@ -490,4 +494,11 @@ public interface TxnStore extends Configurable {
    */
   @RetrySemantics.Idempotent
   void setHadoopJobId(String hadoopJobId, long id);
+
+  /**
+   * Add the ACID write event information to writeNotificationLog table.
+   * @param acidWriteEvent
+   */
+  @RetrySemantics.Idempotent
+  void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException;
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
index ec9e9e2..565c72b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/utils/FileUtils.java
@@ -510,4 +510,15 @@ public class FileUtils {
 
     return new Path(scheme, authority, pathUri.getPath());
   }
+
+  public static Path getTransformedPath(String name, String subDir, String root) {
+   if (root != null) {
+      Path newPath = new Path(root);
+      if (subDir != null) {
+        newPath = new Path(newPath, subDir);
+      }
+      return new Path(newPath, name);
+    }
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/bf5791cd/standalone-metastore/src/main/resources/package.jdo
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/resources/package.jdo b/standalone-metastore/src/main/resources/package.jdo
index 1be3e98..5fb548c 100644
--- a/standalone-metastore/src/main/resources/package.jdo
+++ b/standalone-metastore/src/main/resources/package.jdo
@@ -1182,6 +1182,41 @@
       </field>
     </class>
 
+    <class name="MTxnWriteNotificationLog" table="TXN_WRITE_NOTIFICATION_LOG" identity-type="datastore" detachable="true">
+      <datastore-identity strategy="increment"/>
+      <datastore-identity key-cache-size="1"/>
+      <datastore-identity>
+        <column name="WNL_ID"/>
+      </datastore-identity>
+      <field name="txnId">
+        <column name="WNL_TXNID" jdbc-type="BIGINT" allows-null="false"/>
+      </field>
+      <field name="writeId">
+        <column name="WNL_WRITEID" jdbc-type="BIGINT" allows-null="false"/>
+      </field>
+      <field name="database">
+        <column name="WNL_DATABASE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+      </field>
+      <field name="table">
+        <column name="WNL_TABLE" length="128" jdbc-type="VARCHAR" allows-null="false"/>
+      </field>
+      <field name="partition">
+        <column name="WNL_PARTITION" length="1024" jdbc-type="VARCHAR" allows-null="false"/>
+      </field>
+      <field name="tableObject">
+        <column name="WNL_TABLE_OBJ" jdbc-type="LONGVARCHAR"/>
+      </field>
+      <field name="partObject">
+        <column name="WNL_PARTITION_OBJ" jdbc-type="LONGVARCHAR"/>
+      </field>
+      <field name="files">
+        <column name="WNL_FILES" jdbc-type="LONGVARCHAR"/>
+      </field>
+      <field name="eventTime">
+        <column name="WNL_EVENT_TIME" jdbc-type="INTEGER" allows-null="false"/>
+      </field>
+    </class>
+
     <class name="MWMResourcePlan" identity-type="datastore" table="WM_RESOURCEPLAN" detachable="true">
       <datastore-identity>
         <column name="RP_ID"/>