You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/04/17 07:24:50 UTC

[12/12] hive git commit: HIVE-19089: Create/Replicate Allocate write-id event (Mahesh Kumar Behera, reviewed by Sankar Hariappan)

HIVE-19089: Create/Replicate Allocate write-id event (Mahesh Kumar Behera, reviewed by Sankar Hariappan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/fa9e743e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/fa9e743e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/fa9e743e

Branch: refs/heads/master
Commit: fa9e743e7afbcd6409a51955e39e4e2bb3c109d2
Parents: 7fb088b
Author: Sankar Hariappan <sa...@apache.org>
Authored: Tue Apr 17 12:54:16 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Tue Apr 17 12:54:16 2018 +0530

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |   46 +-
 .../listener/TestDbNotificationListener.java    |    5 +
 .../TestReplicationScenariosAcidTables.java     |   25 +-
 .../apache/hadoop/hive/ql/exec/ReplTxnTask.java |   40 +-
 .../apache/hadoop/hive/ql/exec/ReplTxnWork.java |   55 +-
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    |    9 +
 .../hadoop/hive/ql/lockmgr/DummyTxnManager.java |    6 +
 .../hadoop/hive/ql/lockmgr/HiveTxnManager.java  |   12 +
 .../hadoop/hive/ql/metadata/HiveUtils.java      |   10 +
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   53 +-
 .../hadoop/hive/ql/parse/repl/DumpType.java     |    7 +
 .../repl/dump/events/AllocWriteIdHandler.java   |   41 +
 .../repl/dump/events/EventHandlerFactory.java   |    1 +
 .../repl/load/message/AbortTxnHandler.java      |    4 +-
 .../repl/load/message/AllocWriteIdHandler.java  |   64 +
 .../repl/load/message/CommitTxnHandler.java     |    7 +-
 .../parse/repl/load/message/OpenTxnHandler.java |    4 +-
 .../metastore/txn/TestCompactionTxnHandler.java |    5 +-
 .../hive/metastore/txn/TestTxnHandler.java      |  115 +-
 .../hive/ql/lockmgr/TestDbTxnManager2.java      |   21 +-
 .../hive/ql/txn/compactor/CompactorTest.java    |    6 +-
 .../gen/thrift/gen-cpp/ThriftHiveMetastore.cpp  | 2258 ++++++-------
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 3016 +++++++++---------
 .../gen/thrift/gen-cpp/hive_metastore_types.h   |   34 +-
 .../metastore/api/AddDynamicPartitions.java     |   32 +-
 .../api/AllocateTableWriteIdsRequest.java       |  519 ++-
 .../api/AllocateTableWriteIdsResponse.java      |   36 +-
 .../metastore/api/ClearFileMetadataRequest.java |   32 +-
 .../hive/metastore/api/ClientCapabilities.java  |   32 +-
 .../hive/metastore/api/CompactionRequest.java   |   44 +-
 .../hive/metastore/api/CreationMetadata.java    |   32 +-
 .../metastore/api/FindSchemasByColsResp.java    |   36 +-
 .../hive/metastore/api/FireEventRequest.java    |   32 +-
 .../metastore/api/GetAllFunctionsResponse.java  |   36 +-
 .../api/GetFileMetadataByExprRequest.java       |   32 +-
 .../api/GetFileMetadataByExprResult.java        |   48 +-
 .../metastore/api/GetFileMetadataRequest.java   |   32 +-
 .../metastore/api/GetFileMetadataResult.java    |   44 +-
 .../hive/metastore/api/GetTablesRequest.java    |   32 +-
 .../hive/metastore/api/GetTablesResult.java     |   36 +-
 .../api/HeartbeatTxnRangeResponse.java          |   64 +-
 .../metastore/api/InsertEventRequestData.java   |   64 +-
 .../hadoop/hive/metastore/api/LockRequest.java  |   36 +-
 .../hive/metastore/api/Materialization.java     |   32 +-
 .../api/NotificationEventResponse.java          |   36 +-
 .../metastore/api/PutFileMetadataRequest.java   |   64 +-
 .../hive/metastore/api/SchemaVersion.java       |   36 +-
 .../hive/metastore/api/ShowCompactResponse.java |   36 +-
 .../hive/metastore/api/ShowLocksResponse.java   |   36 +-
 .../hive/metastore/api/ThriftHiveMetastore.java | 2476 +++++++-------
 .../hive/metastore/api/WMFullResourcePlan.java  |  144 +-
 .../api/WMGetAllResourcePlanResponse.java       |   36 +-
 .../WMGetTriggersForResourePlanResponse.java    |   36 +-
 .../api/WMValidateResourcePlanResponse.java     |   64 +-
 .../gen-php/metastore/ThriftHiveMetastore.php   | 1394 ++++----
 .../src/gen/thrift/gen-php/metastore/Types.php  |  842 ++---
 .../hive_metastore/ThriftHiveMetastore.py       |  940 +++---
 .../gen/thrift/gen-py/hive_metastore/ttypes.py  |  543 ++--
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |   15 +-
 .../hadoop/hive/metastore/HiveMetaStore.java    |   11 +-
 .../hive/metastore/HiveMetaStoreClient.java     |   20 +-
 .../hadoop/hive/metastore/IMetaStoreClient.java |   13 +-
 .../hive/metastore/MetaStoreEventListener.java  |   34 +-
 .../metastore/MetaStoreListenerNotifier.java    |   85 +-
 .../hive/metastore/events/AbortTxnEvent.java    |   32 -
 .../metastore/events/AllocWriteIdEvent.java     |   57 +
 .../hive/metastore/events/CommitTxnEvent.java   |   32 -
 .../hive/metastore/events/ListenerEvent.java    |   16 -
 .../hive/metastore/events/OpenTxnEvent.java     |   32 -
 .../messaging/AllocWriteIdMessage.java          |   36 +
 .../hive/metastore/messaging/EventMessage.java  |    3 +-
 .../messaging/MessageDeserializer.java          |    5 +
 .../metastore/messaging/MessageFactory.java     |   13 +
 .../event/filters/DatabaseAndTableFilter.java   |    3 +-
 .../messaging/json/JSONAllocWriteIdMessage.java |  111 +
 .../messaging/json/JSONMessageDeserializer.java |    9 +
 .../messaging/json/JSONMessageFactory.java      |    7 +
 .../hadoop/hive/metastore/txn/TxnHandler.java   |  166 +-
 .../src/main/thrift/hive_metastore.thrift       |   11 +-
 .../HiveMetaStoreClientPreCatalog.java          |   19 +-
 80 files changed, 7789 insertions(+), 6719 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 7f21573..59d1e3a 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -18,6 +18,7 @@
 package org.apache.hive.hcatalog.listener;
 
 import java.io.IOException;
+import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -72,6 +73,7 @@ import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
 import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
+import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
@@ -449,7 +451,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   }
 
   @Override
-  public void onOpenTxn(OpenTxnEvent openTxnEvent) throws MetaException {
+  public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException {
     int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1;
     OpenTxnMessage msg = msgFactory.buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
             openTxnEvent.getTxnIds().get(lastTxnIdx));
@@ -457,35 +459,37 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
             new NotificationEvent(0, now(), EventType.OPEN_TXN.toString(), msg.toString());
 
     try {
-      addNotificationLog(event, openTxnEvent);
+      addNotificationLog(event, openTxnEvent, dbConn, sqlGenerator);
     } catch (SQLException e) {
       throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
     }
   }
 
   @Override
-  public void onCommitTxn(CommitTxnEvent commitTxnEvent) throws MetaException {
+  public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
+          throws MetaException {
     NotificationEvent event =
             new NotificationEvent(0, now(), EventType.COMMIT_TXN.toString(), msgFactory.buildCommitTxnMessage(
                     commitTxnEvent.getTxnId())
                     .toString());
 
     try {
-      addNotificationLog(event, commitTxnEvent);
+      addNotificationLog(event, commitTxnEvent, dbConn, sqlGenerator);
     } catch (SQLException e) {
       throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
     }
   }
 
   @Override
-  public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
+  public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
+          throws MetaException {
     NotificationEvent event =
         new NotificationEvent(0, now(), EventType.ABORT_TXN.toString(), msgFactory.buildAbortTxnMessage(
             abortTxnEvent.getTxnId())
             .toString());
 
     try {
-      addNotificationLog(event, abortTxnEvent);
+      addNotificationLog(event, abortTxnEvent, dbConn, sqlGenerator);
     } catch (SQLException e) {
       throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
     }
@@ -591,6 +595,27 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     process(event, dropConstraintEvent);
   }
 
+  /***
+   * @param allocWriteIdEvent Alloc write id event
+   * @throws MetaException
+   */
+  @Override
+  public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent, Connection dbConn, SQLGenerator sqlGenerator)
+          throws MetaException {
+    String tableName = allocWriteIdEvent.getTableName();
+    String dbName = allocWriteIdEvent.getDbName();
+    NotificationEvent event =
+            new NotificationEvent(0, now(), EventType.ALLOC_WRITE_ID.toString(), msgFactory
+                    .buildAllocWriteIdMessage(allocWriteIdEvent.getTxnToWriteIdList(), dbName, tableName).toString());
+    event.setTableName(tableName);
+    event.setDbName(dbName);
+    try {
+      addNotificationLog(event, allocWriteIdEvent, dbConn, sqlGenerator);
+    } catch (SQLException e) {
+      throw new MetaException("Unable to execute direct SQL " + StringUtils.stringifyException(e));
+    }
+  }
+
   private int now() {
     long millis = System.currentTimeMillis();
     millis /= 1000;
@@ -606,9 +631,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     return "'" + input + "'";
   }
 
-  private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent)
-          throws MetaException, SQLException {
-    if ((listenerEvent.getConnection() == null) || (listenerEvent.getSqlGenerator() == null)) {
+  private void addNotificationLog(NotificationEvent event, ListenerEvent listenerEvent, Connection dbConn,
+                                  SQLGenerator sqlGenerator) throws MetaException, SQLException {
+    if ((dbConn == null) || (sqlGenerator == null)) {
       LOG.info("connection or sql generator is not set so executing sql via DN");
       process(event, listenerEvent);
       return;
@@ -616,8 +641,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
     Statement stmt = null;
     ResultSet rs = null;
     try {
-      stmt = listenerEvent.getConnection().createStatement();
-      SQLGenerator sqlGenerator = listenerEvent.getSqlGenerator();
+      stmt = dbConn.createStatement();
       event.setMessageFormat(msgFactory.getMessageFormat());
 
       if (sqlGenerator.getDbProduct() == MYSQL) {

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 70c6a94..eef917e 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -74,6 +74,7 @@ import org.apache.hadoop.hive.metastore.events.OpenTxnEvent;
 import org.apache.hadoop.hive.metastore.events.CommitTxnEvent;
 import org.apache.hadoop.hive.metastore.events.AbortTxnEvent;
 import org.apache.hadoop.hive.metastore.events.ListenerEvent;
+import org.apache.hadoop.hive.metastore.events.AllocWriteIdEvent;
 import org.apache.hadoop.hive.metastore.messaging.AddPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.AlterTableMessage;
@@ -233,6 +234,10 @@ public class TestDbNotificationListener {
     public void onAbortTxn(AbortTxnEvent abortTxnEvent) throws MetaException {
       pushEventId(EventType.ABORT_TXN, abortTxnEvent);
     }
+
+    public void onAllocWriteId(AllocWriteIdEvent allocWriteIdEvent) throws MetaException {
+      pushEventId(EventType.ALLOC_WRITE_ID, allocWriteIdEvent);
+    }
   }
 
   @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index cac9922..2ad83b6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -100,7 +100,8 @@ public class TestReplicationScenariosAcidTables {
             .verifyResult(bootStrapDump.lastReplicationId);
 
     // create table will start and coomit the transaction
-    primary.run("CREATE TABLE " + tableName +
+    primary.run("use " + primaryDbName)
+           .run("CREATE TABLE " + tableName +
             " (key int, value int) PARTITIONED BY (load_date date) " +
             "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')")
             .run("SHOW TABLES LIKE '" + tableName + "'")
@@ -113,16 +114,12 @@ public class TestReplicationScenariosAcidTables {
             primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + replicatedDbName)
-            .verifyResult(incrementalDump.lastReplicationId)
-            .run("SHOW TABLES LIKE '" + tableName + "'")
-            .verifyResult(tableName);
+            .verifyResult(incrementalDump.lastReplicationId);
 
     // Test the idempotent behavior of Open and Commit Txn
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + replicatedDbName)
-            .verifyResult(incrementalDump.lastReplicationId)
-            .run("SHOW TABLES LIKE '" + tableName + "'")
-            .verifyResult(tableName);
+            .verifyResult(incrementalDump.lastReplicationId);
   }
 
   @Test
@@ -135,7 +132,8 @@ public class TestReplicationScenariosAcidTables {
             .verifyResult(bootStrapDump.lastReplicationId);
 
     // this should fail
-    primary.runFailure("CREATE TABLE " + tableNameFail +
+    primary.run("use " + primaryDbName)
+            .runFailure("CREATE TABLE " + tableNameFail +
             " (key int, value int) PARTITIONED BY (load_date date) " +
             "CLUSTERED BY(key) ('transactional'='true')")
             .run("SHOW TABLES LIKE '" + tableNameFail + "'")
@@ -145,16 +143,12 @@ public class TestReplicationScenariosAcidTables {
             primary.dump(primaryDbName, bootStrapDump.lastReplicationId);
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + replicatedDbName)
-            .verifyResult(incrementalDump.lastReplicationId)
-            .run("SHOW TABLES LIKE '" + tableNameFail + "'")
-            .verifyFailure(new String[]{tableNameFail});
+            .verifyResult(incrementalDump.lastReplicationId);
 
     // Test the idempotent behavior of Abort Txn
     replica.load(replicatedDbName, incrementalDump.dumpLocation)
             .run("REPL STATUS " + replicatedDbName)
-            .verifyResult(incrementalDump.lastReplicationId)
-            .run("SHOW TABLES LIKE '" + tableNameFail + "'")
-            .verifyFailure(new String[]{tableNameFail});
+            .verifyResult(incrementalDump.lastReplicationId);
   }
 
   @Test
@@ -165,7 +159,8 @@ public class TestReplicationScenariosAcidTables {
             .run("REPL STATUS " + replicatedDbName)
             .verifyResult(bootStrapDump.lastReplicationId);
 
-    primary.run("CREATE TABLE " + tableName +
+    primary.run("use " + primaryDbName)
+            .run("CREATE TABLE " + tableName +
             " (key int, value int) PARTITIONED BY (load_date date) " +
             "CLUSTERED BY(key) INTO 3 BUCKETS STORED AS ORC TBLPROPERTIES ('transactional'='true')")
             .run("SHOW TABLES LIKE '" + tableName + "'")

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
index 1cdeeb6..2615072 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnTask.java
@@ -18,8 +18,14 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -44,11 +50,32 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
       Utilities.FILE_OP_LOGGER.trace("Executing ReplTxnTask " + work.getOperationType().toString() +
               " for txn ids : " + work.getTxnIds().toString() + " replPolicy : " + replPolicy);
     }
+
+    String tableName = work.getTableName() == null || work.getTableName().isEmpty() ? null : work.getTableName();
+    if (tableName != null) {
+      Table tbl;
+      try {
+        tbl = Hive.get().getTable(work.getDbName(), tableName);
+        ReplicationSpec replicationSpec = work.getReplicationSpec();
+        if (replicationSpec != null && !replicationSpec.allowReplacementInto(tbl.getParameters())) {
+          // if the event is already replayed, then no need to replay it again.
+          LOG.debug("ReplTxnTask: Event is skipped as it is already replayed. Event Id: " +
+                  replicationSpec.getReplicationState() + "Event Type: " + work.getOperationType());
+          return 0;
+        }
+      } catch (InvalidTableException e) {
+        LOG.info("Table does not exist so, ignoring the operation as it might be a retry(idempotent) case.");
+        return 0;
+      } catch (HiveException e) {
+        LOG.error("Get table failed with exception " + e.getMessage());
+        return 1;
+      }
+    }
+
     try {
       HiveTxnManager txnManager = driverContext.getCtx().getHiveTxnManager();
       String user = UserGroupInformation.getCurrentUser().getUserName();
-      LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " +
-              replPolicy + " with srcTxn " + work.getTxnIds().toString());
+      LOG.debug("Replaying " + work.getOperationType().toString() + " Event for policy " + replPolicy);
       switch(work.getOperationType()) {
       case REPL_OPEN_TXN:
         List<Long> txnIds = txnManager.replOpenTxn(replPolicy, work.getTxnIds(), user);
@@ -68,6 +95,15 @@ public class ReplTxnTask extends Task<ReplTxnWork> {
           LOG.info("Replayed CommitTxn Event for policy " + replPolicy + " with srcTxn " + txnId);
         }
         return 0;
+      case REPL_ALLOC_WRITE_ID:
+        assert work.getTxnToWriteIdList() != null;
+        String dbName = work.getDbName();
+        String tblName = work.getTableName();
+        List <TxnToWriteId> txnToWriteIdList = work.getTxnToWriteIdList();
+        txnManager.replAllocateTableWriteIdsBatch(dbName, tblName, replPolicy, txnToWriteIdList);
+        LOG.info("Replayed alloc write Id Event for repl policy: " + replPolicy + " db Name : " + dbName +
+                " txnToWriteIdList: " +txnToWriteIdList.toString() + " table name: " + tblName);
+        return 0;
       default:
         LOG.error("Operation Type " + work.getOperationType() + " is not supported ");
         return 1;

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
index 9467415..530e9be 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplTxnWork.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 
-import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -33,38 +35,49 @@ public class ReplTxnWork implements Serializable {
   private static final long serialVersionUID = 1L;
   private String dbName;
   private String tableName;
+  private String replPolicy;
   private List<Long> txnIds;
+  private List<TxnToWriteId> txnToWriteIdList;
+  private ReplicationSpec replicationSpec;
 
   /**
    * OperationType.
    * Different kind of events supported for replaying.
    */
   public enum OperationType {
-    REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN
+    REPL_OPEN_TXN, REPL_ABORT_TXN, REPL_COMMIT_TXN, REPL_ALLOC_WRITE_ID
   }
 
   OperationType operation;
 
-  public ReplTxnWork(String dbName, String tableName, List<Long> txnIds, OperationType type) {
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
+                     List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
     this.txnIds = txnIds;
     this.dbName = dbName;
     this.tableName = tableName;
     this.operation = type;
+    this.replPolicy = replPolicy;
+    this.txnToWriteIdList = txnToWriteIdList;
+    this.replicationSpec = replicationSpec;
   }
 
-  public ReplTxnWork(String dbName, String tableName, Long txnId, OperationType type) {
-    this.txnIds = Lists.newArrayList(txnId);
-    this.dbName = dbName;
-    this.tableName = tableName;
-    this.operation = type;
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, List<Long> txnIds, OperationType type,
+                     ReplicationSpec replicationSpec) {
+    this(replPolicy, dbName, tableName, txnIds, type, null, replicationSpec);
   }
 
-  public List<Long> getTxnIds() {
-    return txnIds;
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, Long txnId,
+                     OperationType type, ReplicationSpec replicationSpec) {
+    this(replPolicy, dbName, tableName, Collections.singletonList(txnId), type, null, replicationSpec);
   }
 
-  public Long getTxnId(int idx) {
-    return txnIds.get(idx);
+  public ReplTxnWork(String replPolicy, String dbName, String tableName, OperationType type,
+                     List<TxnToWriteId> txnToWriteIdList, ReplicationSpec replicationSpec) {
+    this(replPolicy, dbName, tableName, null, type, txnToWriteIdList, replicationSpec);
+  }
+
+  public List<Long> getTxnIds() {
+    return txnIds;
   }
 
   public String getDbName() {
@@ -75,17 +88,19 @@ public class ReplTxnWork implements Serializable {
     return tableName;
   }
 
-  public String getReplPolicy() {
-    if ((dbName == null) || (dbName.isEmpty())) {
-      return null;
-    } else if ((tableName == null) || (tableName.isEmpty())) {
-      return dbName.toLowerCase() + ".*";
-    } else {
-      return dbName.toLowerCase() + "." + tableName.toLowerCase();
-    }
+  public String getReplPolicy()  {
+    return replPolicy;
   }
 
   public OperationType getOperationType() {
     return operation;
   }
+
+  public List<TxnToWriteId> getTxnToWriteIdList() {
+    return txnToWriteIdList;
+  }
+
+  public ReplicationSpec getReplicationSpec() {
+    return replicationSpec;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index f566842..7b7fd5d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -1013,6 +1013,15 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
     }
   }
+  
+  public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
+                                             List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
+    try {
+      getMS().replAllocateTableWriteIdsBatch(dbName, tableName, replPolicy, srcTxnToWriteIdList);
+    } catch (TException e) {
+      throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(), e);
+    }
+  }
 
   private static long getHeartbeatInterval(Configuration conf) throws LockException {
     // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
index 9057bb9..78eedd3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DummyTxnManager.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.lockmgr;
 
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -77,6 +78,11 @@ class DummyTxnManager extends HiveTxnManagerImpl {
     return 0L;
   }
   @Override
+  public void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
+                                             List<TxnToWriteId> srcTxnToWriteIdList) throws LockException {
+    return;
+  }
+  @Override
   public HiveLockManager getLockManager() throws LockException {
     if (lockMgr == null) {
       boolean supportConcurrency =

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
index e0e235b..ec11fec 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/HiveTxnManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.Driver.LockedDriverState;
 import org.apache.hadoop.hive.ql.QueryPlan;
@@ -265,6 +266,17 @@ public interface HiveTxnManager {
   long getTableWriteId(String dbName, String tableName) throws LockException;
 
   /**
+   * Allocates write id for each transaction in the list.
+   * @param dbName database name
+   * @param tableName the name of the table to allocate the write id
+   * @param replPolicy used by replication task to identify the source cluster
+   * @param srcTxnToWriteIdList List of txn id to write id Map
+   * @throws LockException
+   */
+  void replAllocateTableWriteIdsBatch(String dbName, String tableName, String replPolicy,
+                                      List<TxnToWriteId> srcTxnToWriteIdList) throws LockException;
+
+  /**
    * Should be though of more as a unique write operation ID in a given txn (at QueryPlan level).
    * Each statement writing data within a multi statement txn should have a unique WriteId.
    * Even a single statement, (e.g. Merge, multi-insert may generates several writes).

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
index dae18fb..f1c4d98 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
@@ -428,4 +428,14 @@ public final class HiveUtils {
 
     return null;
   }
+
+  public static String getReplPolicy(String dbName, String tableName) {
+    if ((dbName == null) || (dbName.isEmpty())) {
+      return null;
+    } else if ((tableName == null) || (tableName.isEmpty())) {
+      return dbName.toLowerCase() + ".*";
+    } else {
+      return dbName.toLowerCase() + "." + tableName.toLowerCase();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index f7c78e2..1fda478 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -244,16 +244,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
     }
   }
 
-  private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException {
-    // This functions filters out all the events which are already replayed. This can be done only
-    // for transaction related events as for other kind of events we can not gurantee that the last
-    // repl id stored in the database/table is valid.
-    if ((dumpType != DumpType.EVENT_ABORT_TXN) &&
-            (dumpType != DumpType.EVENT_OPEN_TXN) &&
-            (dumpType != DumpType.EVENT_COMMIT_TXN)) {
-      return true;
+  private boolean isEventNotReplayed(Map<String, String> params, FileStatus dir, DumpType dumpType) {
+    if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
+      String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
+      if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
+        LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
+                + " is already replayed. LastReplId - " +  Long.parseLong(replLastId));
+        return false;
+      }
     }
+    return true;
+  }
 
+  private boolean shouldReplayEvent(FileStatus dir, DumpType dumpType) throws SemanticException {
     // if database itself is null then we can not filter out anything.
     if (dbNameOrPattern == null || dbNameOrPattern.isEmpty()) {
       return true;
@@ -261,41 +264,23 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
       Database database;
       try {
         database = Hive.get().getDatabase(dbNameOrPattern);
+        return isEventNotReplayed(database.getParameters(), dir, dumpType);
       } catch (HiveException e) {
-        LOG.error("failed to get the database " + dbNameOrPattern);
-        throw new SemanticException(e);
-      }
-      String replLastId;
-      Map<String, String> params = database.getParameters();
-      if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
-        replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
-        if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
-          LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
-                  + " is already replayed. LastReplId - " +  Long.parseLong(replLastId));
-          return false;
-        }
+        //may be the db is getting created in this load
+        LOG.debug("failed to get the database " + dbNameOrPattern);
+        return true;
       }
     } else {
       Table tbl;
       try {
         tbl = Hive.get().getTable(dbNameOrPattern, tblNameOrPattern);
+        return isEventNotReplayed(tbl.getParameters(), dir, dumpType);
       } catch (HiveException e) {
-        LOG.error("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern);
-        throw new SemanticException(e);
-      }
-      if (tbl != null) {
-        Map<String, String> params = tbl.getParameters();
-        if (params != null && (params.containsKey(ReplicationSpec.KEY.CURR_STATE_ID.toString()))) {
-          String replLastId = params.get(ReplicationSpec.KEY.CURR_STATE_ID.toString());
-          if (Long.parseLong(replLastId) >= Long.parseLong(dir.getPath().getName())) {
-            LOG.debug("Event " + dumpType + " with replId " + Long.parseLong(dir.getPath().getName())
-                    + " is already replayed. LastReplId - " +  Long.parseLong(replLastId));
-            return false;
-          }
-        }
+        // may be the table is getting created in this load
+        LOG.debug("failed to get the table " + dbNameOrPattern + "." + tblNameOrPattern);
+        return true;
       }
     }
-    return true;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
index 5fab0d2..2e42267 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/DumpType.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.ql.parse.repl.load.message.TruncateTableHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.OpenTxnHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.CommitTxnHandler;
 import org.apache.hadoop.hive.ql.parse.repl.load.message.AbortTxnHandler;
+import org.apache.hadoop.hive.ql.parse.repl.load.message.AllocWriteIdHandler;
 
 public enum DumpType {
 
@@ -204,6 +205,12 @@ public enum DumpType {
     public MessageHandler handler() {
       return new AbortTxnHandler();
     }
+  },
+  EVENT_ALLOC_WRITE_ID("EVENT_ALLOC_WRITE_ID") {
+    @Override
+    public MessageHandler handler() {
+      return new AllocWriteIdHandler();
+    }
   };
 
   String type = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
new file mode 100644
index 0000000..38efbd7
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/AllocWriteIdHandler.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.dump.events;
+
+import org.apache.hadoop.hive.metastore.api.NotificationEvent;
+import org.apache.hadoop.hive.ql.parse.repl.DumpType;
+import org.apache.hadoop.hive.ql.parse.repl.load.DumpMetaData;
+
+class AllocWriteIdHandler extends AbstractEventHandler {
+  AllocWriteIdHandler(NotificationEvent event) {
+    super(event);
+  }
+
+  @Override
+  public void handle(Context withinContext) throws Exception {
+    LOG.info("Processing#{} ALLOC_WRITE_ID message : {}", fromEventId(), event.getMessage());
+    DumpMetaData dmd = withinContext.createDmd(this);
+    dmd.setPayload(event.getMessage());
+    dmd.write();
+  }
+
+  @Override
+  public DumpType dumpType() {
+    return DumpType.EVENT_ALLOC_WRITE_ID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
index 10ff21c..a1d61f9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/dump/events/EventHandlerFactory.java
@@ -53,6 +53,7 @@ public class EventHandlerFactory {
     register(MessageFactory.OPEN_TXN_EVENT, OpenTxnHandler.class);
     register(MessageFactory.COMMIT_TXN_EVENT, CommitTxnHandler.class);
     register(MessageFactory.ABORT_TXN_EVENT, AbortTxnHandler.class);
+    register(MessageFactory.ALLOC_WRITE_ID_EVENT, AllocWriteIdHandler.class);
   }
 
   static void register(String event, Class<? extends EventHandler> handlerClazz) {

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
index 5b2c85b..3c28f84 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AbortTxnHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -43,7 +44,8 @@ public class AbortTxnHandler extends AbstractMessageHandler {
     AbortTxnMessage msg = deserializer.getAbortTxnMessage(context.dmd.getPayload());
 
     Task<ReplTxnWork> abortTxnTask = TaskFactory.get(
-        new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN),
+        new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
+                msg.getTxnId(), ReplTxnWork.OperationType.REPL_ABORT_TXN, context.eventOnlyReplicationSpec()),
         context.hiveConf
     );
     updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
new file mode 100644
index 0000000..ef51396
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/AllocWriteIdHandler.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.parse.repl.load.message;
+
+import org.apache.hadoop.hive.metastore.messaging.AllocWriteIdMessage;
+import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.exec.TaskFactory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * AllocWriteIdHandler
+ * Target(Load) side handler for alloc write id event.
+ */
+public class AllocWriteIdHandler extends AbstractMessageHandler {
+  @Override
+  public List<Task<? extends Serializable>> handle(Context context)
+      throws SemanticException {
+    if (!AcidUtils.isAcidEnabled(context.hiveConf)) {
+      context.log.error("Cannot load alloc write id event as acid is not enabled");
+      throw new SemanticException("Cannot load alloc write id event as acid is not enabled");
+    }
+
+    AllocWriteIdMessage msg =
+        deserializer.getAllocWriteIdMessage(context.dmd.getPayload());
+
+    String dbName = (context.dbName != null && !context.dbName.isEmpty() ? context.dbName : msg.getDB());
+
+    // The context table name can be null if repl load is done on a full db.
+    // But we need table name for alloc write id and that is received from source.
+    String tableName = (context.tableName != null && !context.tableName.isEmpty() ? context.tableName : msg
+            .getTableName());
+
+    // Repl policy should be created based on the table name in context.
+    ReplTxnWork work = new ReplTxnWork(HiveUtils.getReplPolicy(dbName, context.tableName), dbName, tableName,
+        ReplTxnWork.OperationType.REPL_ALLOC_WRITE_ID, msg.getTxnToWriteIdList(), context.eventOnlyReplicationSpec());
+
+    Task<? extends Serializable> allocWriteIdTask = TaskFactory.get(work, context.hiveConf);
+    context.log.info("Added alloc write id task : {}", allocWriteIdTask.getId());
+    updatedMetadata.set(context.dmd.getEventTo().toString(), dbName, tableName, null);
+    return Collections.singletonList(allocWriteIdTask);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
index 461a0f1..1274213 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/CommitTxnHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -41,10 +42,10 @@ public class CommitTxnHandler extends AbstractMessageHandler {
     }
 
     CommitTxnMessage msg = deserializer.getCommitTxnMessage(context.dmd.getPayload());
-
     Task<ReplTxnWork> commitTxnTask = TaskFactory.get(
-        new ReplTxnWork(context.dbName, context.tableName, msg.getTxnId(),
-                ReplTxnWork.OperationType.REPL_COMMIT_TXN), context.hiveConf
+        new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
+              msg.getTxnId(), ReplTxnWork.OperationType.REPL_COMMIT_TXN, context.eventOnlyReplicationSpec()),
+        context.hiveConf
     );
     updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);
     context.log.debug("Added Commit txn task : {}", commitTxnTask.getId());

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
index c6349ea..a502117 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/repl/load/message/OpenTxnHandler.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hive.ql.exec.ReplTxnWork;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.TaskFactory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import java.io.Serializable;
 import java.util.Collections;
@@ -42,7 +43,8 @@ public class OpenTxnHandler extends AbstractMessageHandler {
     OpenTxnMessage msg = deserializer.getOpenTxnMessage(context.dmd.getPayload());
 
     Task<ReplTxnWork> openTxnTask = TaskFactory.get(
-        new ReplTxnWork(context.dbName, context.tableName, msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN),
+        new ReplTxnWork(HiveUtils.getReplPolicy(context.dbName, context.tableName), context.dbName, context.tableName,
+                msg.getTxnIds(), ReplTxnWork.OperationType.REPL_OPEN_TXN, context.eventOnlyReplicationSpec()),
         context.hiveConf
     );
     updatedMetadata.set(context.dmd.getEventTo().toString(), context.dbName, context.tableName, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 45890ed..f27a5b0 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -481,8 +481,9 @@ public class TestCompactionTxnHandler {
     OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
     long txnId = openTxns.getTxn_ids().get(0);
 
-    AllocateTableWriteIdsResponse writeIds
-            = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(openTxns.getTxn_ids(), dbName, tableName));
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest(dbName, tableName);
+    rqst.setTxnIds(openTxns.getTxn_ids());
+    AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
     long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
     assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId());
     assertEquals(1, writeId);

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index f1da15b..372c709 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.TxnOpenException;
 import org.apache.hadoop.hive.metastore.api.TxnState;
 import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.logging.log4j.Level;
 import org.apache.logging.log4j.LogManager;
@@ -156,8 +157,9 @@ public class TestTxnHandler {
     List<String> parts = new ArrayList<String>();
     parts.add("p=1");
 
-    AllocateTableWriteIdsResponse writeIds
-            = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(3L), "default", "T"));
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest("default", "T");
+    rqst.setTxnIds(Collections.singletonList(3L));
+    AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
     long writeId = writeIds.getTxnToWriteIds().get(0).getWriteId();
     assertEquals(3, writeIds.getTxnToWriteIds().get(0).getTxnId());
     assertEquals(1, writeId);
@@ -1544,24 +1546,117 @@ public class TestTxnHandler {
     Assert.assertTrue("regex should be retryable", result);
   }
 
-  @Test
-  public void testReplOpenTxn() throws Exception {
-    int numTxn = 50000;
+  private List<Long> replOpenTxnForTest(long startId, int numTxn, String replPolicy)
+          throws Exception {
     conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, numTxn);
+    long lastId = startId + numTxn - 1;
     OpenTxnRequest rqst = new OpenTxnRequest(numTxn, "me", "localhost");
-    rqst.setReplPolicy("default.*");
-    rqst.setReplSrcTxnIds(LongStream.rangeClosed(1, numTxn)
+    rqst.setReplPolicy(replPolicy);
+    rqst.setReplSrcTxnIds(LongStream.rangeClosed(startId, lastId)
             .boxed().collect(Collectors.toList()));
+
     OpenTxnsResponse openedTxns = txnHandler.openTxns(rqst);
     List<Long> txnList = openedTxns.getTxn_ids();
     assertEquals(txnList.size(), numTxn);
-    for (long i = 0; i < numTxn; i++) {
-      long txnId = txnList.get((int) i);
-      assertEquals(i+1, txnId);
+    int numTxnPresentNow = TxnDbUtil.countQueryAgent(conf, "select count(*) from TXNS where TXN_ID >= " +
+            txnList.get(0) + " and TXN_ID <= " + txnList.get(numTxn - 1));
+    assertEquals(numTxn, numTxnPresentNow);
+
+    checkReplTxnForTest(startId, lastId, replPolicy, txnList);
+    return txnList;
+  }
+
+  private void replAbortTxnForTest(List<Long> txnList, String replPolicy)
+          throws Exception {
+    for (Long txnId : txnList) {
+      AbortTxnRequest rqst = new AbortTxnRequest(txnId);
+      rqst.setReplPolicy(replPolicy);
+      txnHandler.abortTxn(rqst);
     }
+    checkReplTxnForTest(txnList.get(0), txnList.get(txnList.size() - 1), replPolicy, new ArrayList<>());
+  }
+
+  private void checkReplTxnForTest(Long startTxnId, Long endTxnId, String replPolicy, List<Long> targetTxnId)
+          throws Exception {
+    String[] output = TxnDbUtil.queryToString(conf, "select RTM_TARGET_TXN_ID from REPL_TXN_MAP where " +
+            " RTM_SRC_TXN_ID >=  " + startTxnId + "and RTM_SRC_TXN_ID <=  " + endTxnId +
+            " and RTM_REPL_POLICY = \'" + replPolicy + "\'").split("\n");
+    assertEquals(output.length - 1, targetTxnId.size());
+    for (int idx = 1; idx < output.length; idx++) {
+      long txnId = Long.parseLong(output[idx].trim());
+      assertEquals(txnId, targetTxnId.get(idx-1).longValue());
+    }
+  }
+
+  @Test
+  public void testReplOpenTxn() throws Exception {
+    int numTxn = 50000;
+    String[] output = TxnDbUtil.queryToString(conf, "select ntxn_next from NEXT_TXN_ID").split("\n");
+    long startTxnId = Long.parseLong(output[1].trim());
+    List<Long> txnList = replOpenTxnForTest(startTxnId, numTxn, "default.*");
+    assert(txnList.size() == numTxn);
     txnHandler.abortTxns(new AbortTxnsRequest(txnList));
   }
 
+  @Test
+  public void testReplAllocWriteId() throws Exception {
+    int numTxn = 2;
+    String[] output = TxnDbUtil.queryToString(conf, "select ntxn_next from NEXT_TXN_ID").split("\n");
+    long startTxnId = Long.parseLong(output[1].trim());
+    List<Long> srcTxnIdList = LongStream.rangeClosed(startTxnId, numTxn+startTxnId-1)
+            .boxed().collect(Collectors.toList());
+    List<Long> targetTxnList = replOpenTxnForTest(startTxnId, numTxn, "destdb.*");
+    assert(targetTxnList.size() == numTxn);
+
+    List<TxnToWriteId> srcTxnToWriteId;
+    List<TxnToWriteId> targetTxnToWriteId;
+    srcTxnToWriteId = new ArrayList<>();
+
+    for (int idx = 0; idx < numTxn; idx++) {
+      srcTxnToWriteId.add(new TxnToWriteId(startTxnId+idx, idx+1));
+    }
+    AllocateTableWriteIdsRequest allocMsg = new AllocateTableWriteIdsRequest("destdb", "tbl1");
+    allocMsg.setReplPolicy("destdb.*");
+    allocMsg.setSrcTxnToWriteIdList(srcTxnToWriteId);
+    targetTxnToWriteId = txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+    for (int idx = 0; idx < targetTxnList.size(); idx++) {
+      assertEquals(targetTxnToWriteId.get(idx).getWriteId(), srcTxnToWriteId.get(idx).getWriteId());
+      assertEquals(Long.valueOf(targetTxnToWriteId.get(idx).getTxnId()), targetTxnList.get(idx));
+    }
+
+    // idempotent case for destdb db
+    targetTxnToWriteId = txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+    for (int idx = 0; idx < targetTxnList.size(); idx++) {
+      assertEquals(targetTxnToWriteId.get(idx).getWriteId(), srcTxnToWriteId.get(idx).getWriteId());
+      assertEquals(Long.valueOf(targetTxnToWriteId.get(idx).getTxnId()), targetTxnList.get(idx));
+    }
+
+    //invalid case
+    boolean failed = false;
+    srcTxnToWriteId = new ArrayList<>();
+    srcTxnToWriteId.add(new TxnToWriteId(startTxnId, 2*numTxn+1));
+    allocMsg = new AllocateTableWriteIdsRequest("destdb", "tbl2");
+    allocMsg.setReplPolicy("destdb.*");
+    allocMsg.setSrcTxnToWriteIdList(srcTxnToWriteId);
+    try {
+      txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+    } catch (IllegalStateException e) {
+      failed = true;
+    }
+    assertTrue(failed);
+
+    replAbortTxnForTest(srcTxnIdList, "destdb.*");
+
+    // Test for aborted transactions
+    failed = false;
+    try {
+      txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
+    } catch (RuntimeException e) {
+      failed = true;
+    }
+    assertTrue(failed);
+  }
+
   private void updateTxns(Connection conn) throws SQLException {
     Statement stmt = conn.createStatement();
     stmt.executeUpdate("update TXNS set txn_last_heartbeat = txn_last_heartbeat + 1");

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 8406caa..0926663 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -1031,9 +1031,9 @@ public class TestDbTxnManager2 {
     //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET"));
 
-    AllocateTableWriteIdsResponse writeIds
-            = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr2.getCurrentTxnId()),
-            "default", "tab2"));
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest("default", "tab2");
+    rqst.setTxnIds(Collections.singletonList(txnMgr2.getCurrentTxnId()));
+    AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
     Assert.assertEquals(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getTxnId());
 
     AddDynamicPartitions adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getWriteId(),
@@ -1054,8 +1054,9 @@ public class TestDbTxnManager2 {
     //update stmt has p=blah, thus nothing is actually update and we generate empty dyn part list
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from WRITE_SET"));
 
-    writeIds = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr2.getCurrentTxnId()),
-            "default", "tab2"));
+    rqst = new AllocateTableWriteIdsRequest("default", "tab2");
+    rqst.setTxnIds(Collections.singletonList(txnMgr2.getCurrentTxnId()));
+    writeIds = txnHandler.allocateTableWriteIds(rqst);
     Assert.assertEquals(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getTxnId());
 
     adp = new AddDynamicPartitions(txnMgr2.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getWriteId(),
@@ -1074,8 +1075,9 @@ public class TestDbTxnManager2 {
     checkCmdOnDriver(driver.compileAndRespond("update TAB2 set b = 17 where a = 1", true));//no rows match
     txnMgr.acquireLocks(driver.getPlan(), ctx, "Long Running");
 
-    writeIds = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnMgr.getCurrentTxnId()),
-            "default", "tab2"));
+    rqst = new AllocateTableWriteIdsRequest("default", "tab2");
+    rqst.setTxnIds(Collections.singletonList(txnMgr.getCurrentTxnId()));
+    writeIds = txnHandler.allocateTableWriteIds(rqst);
     Assert.assertEquals(txnMgr.getCurrentTxnId(), writeIds.getTxnToWriteIds().get(0).getTxnId());
 
     //so generate empty Dyn Part call
@@ -1119,8 +1121,9 @@ public class TestDbTxnManager2 {
     checkLock(LockType.SHARED_WRITE, LockState.WAITING, "default", "TAB_PART", "p=blah", locks);
     txnMgr.rollbackTxn();
 
-    AllocateTableWriteIdsResponse writeIds
-            = txnHandler.allocateTableWriteIds(new AllocateTableWriteIdsRequest(Collections.singletonList(txnId), "default", "TAB_PART"));
+    AllocateTableWriteIdsRequest rqst = new AllocateTableWriteIdsRequest("default", "TAB_PART");
+    rqst.setTxnIds(Collections.singletonList(txnId));
+    AllocateTableWriteIdsResponse writeIds = txnHandler.allocateTableWriteIds(rqst);
     Assert.assertEquals(txnId, writeIds.getTxnToWriteIds().get(0).getTxnId());
 
     AddDynamicPartitions adp = new AddDynamicPartitions(txnId, writeIds.getTxnToWriteIds().get(0).getWriteId(),

http://git-wip-us.apache.org/repos/asf/hive/blob/fa9e743e/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 083c671..124c97e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -202,7 +202,8 @@ public abstract class CompactorTest {
   protected long allocateWriteId(String dbName, String tblName, long txnid)
           throws MetaException, TxnAbortedException, NoSuchTxnException {
     AllocateTableWriteIdsRequest awiRqst
-            = new AllocateTableWriteIdsRequest(Collections.singletonList(txnid), dbName, tblName);
+            = new AllocateTableWriteIdsRequest(dbName, tblName);
+    awiRqst.setTxnIds(Collections.singletonList(txnid));
     AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);
     return awiResp.getTxnToWriteIds().get(0).getWriteId();
   }
@@ -254,7 +255,8 @@ public abstract class CompactorTest {
   protected void burnThroughTransactions(String dbName, String tblName, int num, Set<Long> open, Set<Long> aborted)
       throws MetaException, NoSuchTxnException, TxnAbortedException {
     OpenTxnsResponse rsp = txnHandler.openTxns(new OpenTxnRequest(num, "me", "localhost"));
-    AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(rsp.getTxn_ids(), dbName, tblName);
+    AllocateTableWriteIdsRequest awiRqst = new AllocateTableWriteIdsRequest(dbName, tblName);
+    awiRqst.setTxnIds(rsp.getTxn_ids());
     AllocateTableWriteIdsResponse awiResp = txnHandler.allocateTableWriteIds(awiRqst);
     int i = 0;
     for (long tid : rsp.getTxn_ids()) {