You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/27 06:15:28 UTC

hive git commit: HIVE-19927: Last Repl ID set by bootstrap dump is incorrect and may cause data loss if have ACID/MM tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)

Repository: hive
Updated Branches:
  refs/heads/master 1ad48825c -> 818569f6d


HIVE-19927: Last Repl ID set by bootstrap dump is incorrect and may cause data loss if have ACID/MM tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/818569f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/818569f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/818569f6

Branch: refs/heads/master
Commit: 818569f6d0bb282eed58c14c9041670ced3905ad
Parents: 1ad4882
Author: Sankar Hariappan <sa...@apache.org>
Authored: Fri Jul 27 11:44:32 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Fri Jul 27 11:44:32 2018 +0530

----------------------------------------------------------------------
 .../TestReplicationScenariosAcidTables.java     | 94 ++++++++++++++++++--
 ...TestReplicationScenariosAcrossInstances.java | 48 ++++++----
 .../java/org/apache/hadoop/hive/ql/Driver.java  | 24 ++++-
 .../hadoop/hive/ql/exec/FunctionTask.java       | 18 +++-
 .../hadoop/hive/ql/exec/repl/ReplDumpTask.java  | 17 +++-
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  1 +
 .../hive/metastore/txn/TestTxnHandler.java      |  5 +-
 .../hive/ql/exec/repl/ReplDumpTaskTest.java     |  2 +-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 10 ++-
 .../InjectableBehaviourObjectStore.java         | 13 ++-
 10 files changed, 193 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 6949606..6bef6b2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -18,7 +18,10 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
@@ -27,20 +30,18 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.shims.Utils;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
 import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.DriverFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.IDriver;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.junit.rules.TestName;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.Utils;
 
+import org.junit.rules.TestName;
 import org.junit.rules.TestRule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -48,16 +49,22 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.BeforeClass;
 import org.junit.AfterClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
-import javax.annotation.Nullable;
 import java.util.Collections;
 import com.google.common.collect.Lists;
 import org.junit.Ignore;
 
+import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
 /**
  * TestReplicationScenariosAcidTables - test replication for ACID tables
  */
@@ -287,6 +294,77 @@ public class TestReplicationScenariosAcidTables {
   }
 
   @Test
+  public void testAcidTablesBootstrapWithConcurrentWrites() throws Throwable {
+    HiveConf primaryConf = primary.getConf();
+    primary.run("use " + primaryDbName)
+            .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+                    "tblproperties (\"transactional\"=\"true\")")
+            .run("insert into t1 values(1)");
+
+    // Perform concurrent write on the acid table t1 when bootstrap dump in progress. Bootstrap
+    // won't see the written data but the subsequent incremental repl should see it.
+    BehaviourInjection<CallerArguments, Boolean> callerInjectedBehavior
+            = new BehaviourInjection<CallerArguments, Boolean>() {
+      @Nullable
+      @Override
+      public Boolean apply(@Nullable CallerArguments args) {
+        if (injectionPathCalled) {
+          nonInjectedPathCalled = true;
+        } else {
+          // Insert another row to t1 from another txn when bootstrap dump in progress.
+          injectionPathCalled = true;
+          Thread t = new Thread(new Runnable() {
+            @Override
+            public void run() {
+              LOG.info("Entered new thread");
+              IDriver driver = DriverFactory.newDriver(primaryConf);
+              SessionState.start(new CliSessionState(primaryConf));
+              CommandProcessorResponse ret = driver.run("insert into " + primaryDbName + ".t1 values(2)");
+              boolean success = (ret.getException() == null);
+              assertTrue(success);
+              LOG.info("Exit new thread success - {}", success, ret.getException());
+            }
+          });
+          t.start();
+          LOG.info("Created new thread {}", t.getName());
+          try {
+            t.join();
+          } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+          }
+        }
+        return true;
+      }
+    };
+
+    InjectableBehaviourObjectStore.setCallerVerifier(callerInjectedBehavior);
+    WarehouseInstance.Tuple bootstrapDump = null;
+    try {
+      bootstrapDump = primary.dump(primaryDbName, null);
+      callerInjectedBehavior.assertInjectionsPerformed(true, true);
+    } finally {
+      InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+    }
+
+    // Bootstrap dump has taken snapshot before concurrent tread performed write. So, it won't see data "2".
+    replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(bootstrapDump.lastReplicationId)
+            .run("select id from t1 order by id")
+            .verifyResults(new String[]{"1" });
+
+    // Incremental should include the concurrent write of data "2" from another txn.
+    WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId);
+    replica.load(replicatedDbName, incrementalDump.dumpLocation)
+            .run("use " + replicatedDbName)
+            .run("repl status " + replicatedDbName)
+            .verifyResult(incrementalDump.lastReplicationId)
+            .run("select id from t1 order by id")
+            .verifyResults(new String[]{"1", "2" });
+  }
+
+  @Test
   public void testOpenTxnEvent() throws Throwable {
     String tableName = testName.getMethodName();
     WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 16c124c..b2e1e25 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1132,9 +1132,12 @@ public class TestReplicationScenariosAcrossInstances {
 
     // Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded.
     List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
-    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
-    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
-    callerVerifier.assertInjectionsPerformed(true, false);
+    try {
+      replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+      callerVerifier.assertInjectionsPerformed(true, false);
+    } finally {
+      InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+    }
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
@@ -1169,11 +1172,14 @@ public class TestReplicationScenariosAcrossInstances {
     };
     InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
 
-    // Retry with same dump with which it was already loaded should resume the bootstrap load.
-    // This time, it fails when try to load the foreign key constraints. All other constraints are loaded.
-    replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
-    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
-    callerVerifier.assertInjectionsPerformed(true, false);
+    try {
+      // Retry with same dump with which it was already loaded should resume the bootstrap load.
+      // This time, it fails when try to load the foreign key constraints. All other constraints are loaded.
+      replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+      callerVerifier.assertInjectionsPerformed(true, false);
+    } finally {
+      InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+    }
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
@@ -1205,11 +1211,14 @@ public class TestReplicationScenariosAcrossInstances {
     };
     InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
 
-    // Retry with same dump with which it was already loaded should resume the bootstrap load.
-    // This time, it completes by adding just foreign key constraints for table t2.
-    replica.load(replicatedDbName, tuple.dumpLocation);
-    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
-    callerVerifier.assertInjectionsPerformed(true, false);
+    try {
+      // Retry with same dump with which it was already loaded should resume the bootstrap load.
+      // This time, it completes by adding just foreign key constraints for table t2.
+      replica.load(replicatedDbName, tuple.dumpLocation);
+      callerVerifier.assertInjectionsPerformed(true, false);
+    } finally {
+      InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+    }
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)
@@ -1292,11 +1301,14 @@ public class TestReplicationScenariosAcrossInstances {
     };
     InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
 
-    // Retry with same dump with which it was already loaded should resume the bootstrap load.
-    // This time, it completes by adding remaining partitions.
-    replica.load(replicatedDbName, tuple.dumpLocation);
-    InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
-    callerVerifier.assertInjectionsPerformed(false, false);
+    try {
+      // Retry with same dump with which it was already loaded should resume the bootstrap load.
+      // This time, it completes by adding remaining partitions.
+      replica.load(replicatedDbName, tuple.dumpLocation);
+      callerVerifier.assertInjectionsPerformed(false, false);
+    } finally {
+      InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+    }
 
     replica.run("use " + replicatedDbName)
             .run("repl status " + replicatedDbName)

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 96d7300..039f991 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -118,6 +118,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.ParseException;
 import org.apache.hadoop.hive.ql.parse.ParseUtils;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
 import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
@@ -145,6 +146,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.apache.hive.common.util.TxnIdUtils;
+import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -650,6 +652,10 @@ public class Driver implements IDriver {
       BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
 
       if (!retrial) {
+        if ((queryState.getHiveOperation() != null)
+                && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) {
+          setLastReplIdForDump(queryState.getConf());
+        }
         openTransaction();
         generateValidTxnList();
       }
@@ -896,6 +902,22 @@ public class Driver implements IDriver {
     ctx.setWmContext(wmContext);
   }
 
+  /**
+   * Last repl id should be captured before opening txn by current REPL DUMP operation.
+   * This is needed to avoid losing data which are added/modified by concurrent txns when bootstrap
+   * dump in progress.
+   * @param conf Query configurations
+   * @throws HiveException
+   * @throws TException
+   */
+  private void setLastReplIdForDump(HiveConf conf) throws HiveException, TException {
+    // Last logged notification event id would be the last repl Id for the current REPl DUMP.
+    Hive hiveDb = Hive.get();
+    Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+    conf.setLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, lastReplId);
+    LOG.debug("Setting " + ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY + " = " + lastReplId);
+  }
+
   private void openTransaction() throws LockException, CommandProcessorResponse {
     if (checkConcurrency() && startImplicitTxn(queryTxnMgr)) {
       String userFromUGI = getUserFromUGI();
@@ -903,7 +925,7 @@ public class Driver implements IDriver {
         if (userFromUGI == null) {
           throw createProcessorResponse(10);
         }
-        long txnid = queryTxnMgr.openTxn(ctx, userFromUGI);
+        queryTxnMgr.openTxn(ctx, userFromUGI);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
index b9d6f58..cbd503f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import static org.apache.hadoop.util.StringUtils.stringifyException;
-
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -31,7 +29,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.PrincipalType;
 import org.apache.hadoop.hive.metastore.api.ResourceType;
 import org.apache.hadoop.hive.metastore.api.ResourceUri;
@@ -93,6 +93,13 @@ public class FunctionTask extends Task<FunctionWork> {
           }
           return createPermanentFunction(Hive.get(conf), createFunctionDesc);
         } catch (Exception e) {
+          // For repl load flow, function may exist for first incremental phase. So, just return success.
+          if (createFunctionDesc.getReplicationSpec().isInReplicationScope()
+                  && (e.getCause() instanceof AlreadyExistsException)) {
+            LOG.info("Create function is idempotent as function: "
+                    + createFunctionDesc.getFunctionName() + " already exists.");
+            return 0;
+          }
           setException(e);
           LOG.error("Failed to create function", e);
           return 1;
@@ -262,6 +269,13 @@ public class FunctionTask extends Task<FunctionWork> {
 
       return 0;
     } catch (Exception e) {
+      // For repl load flow, function may not exist for first incremental phase. So, just return success.
+      if (dropFunctionDesc.getReplicationSpec().isInReplicationScope()
+              && (e.getCause() instanceof NoSuchObjectException)) {
+        LOG.info("Drop function is idempotent as function: "
+                + dropFunctionDesc.getFunctionName() + " doesn't exist.");
+        return 0;
+      }
       LOG.info("drop function: ", e);
       console.printError("FAILED: error during drop function: " + StringUtils.stringifyException(e));
       return 1;

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 79ee80a..9a5e6df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -219,8 +220,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
 
   Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
     // bootstrap case
+    // Last repl id would've been captured during compile phase in queryState configs before opening txn.
+    // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from
+    // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore.
     Hive hiveDb = getHive();
-    Long bootDumpBeginReplId = currentNotificationId(hiveDb);
+    Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L);
+    assert (bootDumpBeginReplId >= 0L);
+
     String validTxnList = getValidTxnListForReplDump(hiveDb);
     for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
       LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
@@ -237,7 +243,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
         for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
           LOG.debug(
               "analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
-          dumpTable(dbName, tblName, validTxnList, dbRoot);
+          dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId);
           dumpConstraintMetadata(dbName, tblName, dbRoot);
         }
       } catch (Exception e) {
@@ -308,7 +314,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
     return dbRoot;
   }
 
-  void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception {
+  void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) throws Exception {
     try {
       Hive db = getHive();
       HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName);
@@ -319,6 +325,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
       tuple.replicationSpec.setIsReplace(true);  // by default for all other objects this is false
       if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
         tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
+
+        // For transactional table, data would be valid snapshot for current txn and doesn't include data
+        // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
+        // as bootstrap dump's last repl Id.
+        tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
       }
       MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
       new TableExport(

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 6ed792c..adaa3d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -74,6 +74,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
   private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
   private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
 
+  public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id";
   public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
   public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
 

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 9294c2b..c0725ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1676,14 +1676,15 @@ public class TestTxnHandler {
 
     replAbortTxnForTest(srcTxnIdList, "destdb.*");
 
-    // Test for aborted transactions
+    // Test for aborted transactions. Idempotent case where allocate write id when txn is already
+    // aborted should do nothing.
     failed = false;
     try {
       txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
     } catch (RuntimeException e) {
       failed = true;
     }
-    assertTrue(failed);
+    assertFalse(failed);
   }
 
   private void updateTxns(Connection conn) throws SQLException {

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
index 7bd035e..e719a08 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
@@ -101,7 +101,7 @@ public class ReplDumpTaskTest {
       private int tableDumpCount = 0;
 
       @Override
-      void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot)
+      void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId)
           throws Exception {
         tableDumpCount++;
         if (tableDumpCount > 1) {

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7fd0642..f5e4905 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -747,6 +747,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
                   Collections.singletonList(sourceTxnId), stmt);
           if (targetTxnIds.isEmpty()) {
+            // Idempotent case where txn was already closed or abort txn event received without
+            // corresponding open txn event.
             LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
                     " and repl policy " + rqst.getReplPolicy());
             return;
@@ -888,6 +890,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
                   Collections.singletonList(sourceTxnId), stmt);
           if (targetTxnIds.isEmpty()) {
+            // Idempotent case where txn was already closed or commit txn event received without
+            // corresponding open txn event.
             LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
                     " and repl policy " + rqst.getReplPolicy());
             return;
@@ -1398,9 +1402,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
           txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
           if (srcTxnIds.size() != txnIds.size()) {
-            LOG.warn("Target txn id is missing for source txn id : " + srcTxnIds.toString() +
+            // Idempotent case where txn was already closed but gets allocate write id event.
+            // So, just ignore it and return empty list.
+            LOG.info("Target txn id is missing for source txn id : " + srcTxnIds.toString() +
                     " and repl policy " + rqst.getReplPolicy());
-            throw new RuntimeException("This should never happen for txnIds: " + txnIds);
+            return new AllocateTableWriteIdsResponse(txnToWriteIds);
           }
         } else {
           assert (!rqst.isSetSrcTxnToWriteIdList());

http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
index 481d1d2..6ca3e5d 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
@@ -19,7 +19,7 @@
 package org.apache.hadoop.hive.metastore;
 
 import java.util.List;
-
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
 
 import static org.junit.Assert.assertEquals;
 
-
 /**
  * A wrapper around {@link ObjectStore} that allows us to inject custom behaviour
  * on to some of the methods for testing.
@@ -214,4 +213,14 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
     }
     return super.addForeignKeys(fks);
   }
+
+  @Override
+  public boolean alterDatabase(String catalogName, String dbname, Database db)
+          throws NoSuchObjectException, MetaException {
+    if (callerVerifier != null) {
+      CallerArguments args = new CallerArguments(dbname);
+      callerVerifier.apply(args);
+    }
+    return super.alterDatabase(catalogName, dbname, db);
+  }
 }