You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/07/27 06:15:28 UTC
hive git commit: HIVE-19927: Last Repl ID set by bootstrap dump is
incorrect and may cause data loss if have ACID/MM tables (Sankar Hariappan,
reviewed by Mahesh Kumar Behera, Anishek Agarwal)
Repository: hive
Updated Branches:
refs/heads/master 1ad48825c -> 818569f6d
HIVE-19927: Last Repl ID set by bootstrap dump is incorrect and may cause data loss if have ACID/MM tables (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/818569f6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/818569f6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/818569f6
Branch: refs/heads/master
Commit: 818569f6d0bb282eed58c14c9041670ced3905ad
Parents: 1ad4882
Author: Sankar Hariappan <sa...@apache.org>
Authored: Fri Jul 27 11:44:32 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Fri Jul 27 11:44:32 2018 +0530
----------------------------------------------------------------------
.../TestReplicationScenariosAcidTables.java | 94 ++++++++++++++++++--
...TestReplicationScenariosAcrossInstances.java | 48 ++++++----
.../java/org/apache/hadoop/hive/ql/Driver.java | 24 ++++-
.../hadoop/hive/ql/exec/FunctionTask.java | 18 +++-
.../hadoop/hive/ql/exec/repl/ReplDumpTask.java | 17 +++-
.../ql/parse/ReplicationSemanticAnalyzer.java | 1 +
.../hive/metastore/txn/TestTxnHandler.java | 5 +-
.../hive/ql/exec/repl/ReplDumpTaskTest.java | 2 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 10 ++-
.../InjectableBehaviourObjectStore.java | 13 ++-
10 files changed, 193 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 6949606..6bef6b2 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -18,7 +18,10 @@
package org.apache.hadoop.hive.ql.parse;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hive.cli.CliSessionState;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
@@ -27,20 +30,18 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
-import org.apache.hadoop.hive.shims.Utils;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.CallerArguments;
import org.apache.hadoop.hive.metastore.InjectableBehaviourObjectStore.BehaviourInjection;
-import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.DriverFactory;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.IDriver;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
-import org.junit.rules.TestName;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.hive.shims.Utils;
+import org.junit.rules.TestName;
import org.junit.rules.TestRule;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -48,16 +49,22 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.BeforeClass;
import org.junit.AfterClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
-import javax.annotation.Nullable;
import java.util.Collections;
import com.google.common.collect.Lists;
import org.junit.Ignore;
+import static org.junit.Assert.assertTrue;
+import static org.apache.hadoop.hive.metastore.ReplChangeManager.SOURCE_OF_REPLICATION;
+
/**
* TestReplicationScenariosAcidTables - test replication for ACID tables
*/
@@ -287,6 +294,77 @@ public class TestReplicationScenariosAcidTables {
}
@Test
+ public void testAcidTablesBootstrapWithConcurrentWrites() throws Throwable {
+ HiveConf primaryConf = primary.getConf();
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)");
+
+ // Perform concurrent write on the acid table t1 when bootstrap dump in progress. Bootstrap
+ // won't see the written data but the subsequent incremental repl should see it.
+ BehaviourInjection<CallerArguments, Boolean> callerInjectedBehavior
+ = new BehaviourInjection<CallerArguments, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable CallerArguments args) {
+ if (injectionPathCalled) {
+ nonInjectedPathCalled = true;
+ } else {
+ // Insert another row to t1 from another txn when bootstrap dump in progress.
+ injectionPathCalled = true;
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Entered new thread");
+ IDriver driver = DriverFactory.newDriver(primaryConf);
+ SessionState.start(new CliSessionState(primaryConf));
+ CommandProcessorResponse ret = driver.run("insert into " + primaryDbName + ".t1 values(2)");
+ boolean success = (ret.getException() == null);
+ assertTrue(success);
+ LOG.info("Exit new thread success - {}", success, ret.getException());
+ }
+ });
+ t.start();
+ LOG.info("Created new thread {}", t.getName());
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+ };
+
+ InjectableBehaviourObjectStore.setCallerVerifier(callerInjectedBehavior);
+ WarehouseInstance.Tuple bootstrapDump = null;
+ try {
+ bootstrapDump = primary.dump(primaryDbName, null);
+ callerInjectedBehavior.assertInjectionsPerformed(true, true);
+ } finally {
+ InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+ }
+
+ // Bootstrap dump has taken snapshot before concurrent tread performed write. So, it won't see data "2".
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(bootstrapDump.lastReplicationId)
+ .run("select id from t1 order by id")
+ .verifyResults(new String[]{"1" });
+
+ // Incremental should include the concurrent write of data "2" from another txn.
+ WarehouseInstance.Tuple incrementalDump = primary.dump(primaryDbName, bootstrapDump.lastReplicationId);
+ replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(incrementalDump.lastReplicationId)
+ .run("select id from t1 order by id")
+ .verifyResults(new String[]{"1", "2" });
+ }
+
+ @Test
public void testOpenTxnEvent() throws Throwable {
String tableName = testName.getMethodName();
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
index 16c124c..b2e1e25 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcrossInstances.java
@@ -1132,9 +1132,12 @@ public class TestReplicationScenariosAcrossInstances {
// Trigger bootstrap dump which just creates table t1 and other tables (t2, t3) and constraints not loaded.
List<String> withConfigs = Arrays.asList("'hive.repl.approx.max.load.tasks'='1'");
- replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
- InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
- callerVerifier.assertInjectionsPerformed(true, false);
+ try {
+ replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+ callerVerifier.assertInjectionsPerformed(true, false);
+ } finally {
+ InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+ }
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
@@ -1169,11 +1172,14 @@ public class TestReplicationScenariosAcrossInstances {
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
- // Retry with same dump with which it was already loaded should resume the bootstrap load.
- // This time, it fails when try to load the foreign key constraints. All other constraints are loaded.
- replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
- InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
- callerVerifier.assertInjectionsPerformed(true, false);
+ try {
+ // Retry with same dump with which it was already loaded should resume the bootstrap load.
+ // This time, it fails when try to load the foreign key constraints. All other constraints are loaded.
+ replica.loadFailure(replicatedDbName, tuple.dumpLocation, withConfigs);
+ callerVerifier.assertInjectionsPerformed(true, false);
+ } finally {
+ InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+ }
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
@@ -1205,11 +1211,14 @@ public class TestReplicationScenariosAcrossInstances {
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
- // Retry with same dump with which it was already loaded should resume the bootstrap load.
- // This time, it completes by adding just foreign key constraints for table t2.
- replica.load(replicatedDbName, tuple.dumpLocation);
- InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
- callerVerifier.assertInjectionsPerformed(true, false);
+ try {
+ // Retry with same dump with which it was already loaded should resume the bootstrap load.
+ // This time, it completes by adding just foreign key constraints for table t2.
+ replica.load(replicatedDbName, tuple.dumpLocation);
+ callerVerifier.assertInjectionsPerformed(true, false);
+ } finally {
+ InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+ }
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
@@ -1292,11 +1301,14 @@ public class TestReplicationScenariosAcrossInstances {
};
InjectableBehaviourObjectStore.setCallerVerifier(callerVerifier);
- // Retry with same dump with which it was already loaded should resume the bootstrap load.
- // This time, it completes by adding remaining partitions.
- replica.load(replicatedDbName, tuple.dumpLocation);
- InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
- callerVerifier.assertInjectionsPerformed(false, false);
+ try {
+ // Retry with same dump with which it was already loaded should resume the bootstrap load.
+ // This time, it completes by adding remaining partitions.
+ replica.load(replicatedDbName, tuple.dumpLocation);
+ callerVerifier.assertInjectionsPerformed(false, false);
+ } finally {
+ InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+ }
replica.run("use " + replicatedDbName)
.run("repl status " + replicatedDbName)
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 96d7300..039f991 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -118,6 +118,7 @@ import org.apache.hadoop.hive.ql.parse.ParseContext;
import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.ParseUtils;
import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.SemanticAnalyzerFactory;
import org.apache.hadoop.hive.ql.plan.DDLDesc.DDLDescWithWriteId;
@@ -145,6 +146,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hive.common.util.ShutdownHookManager;
import org.apache.hive.common.util.TxnIdUtils;
+import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -650,6 +652,10 @@ public class Driver implements IDriver {
BaseSemanticAnalyzer sem = SemanticAnalyzerFactory.get(queryState, tree);
if (!retrial) {
+ if ((queryState.getHiveOperation() != null)
+ && queryState.getHiveOperation().equals(HiveOperation.REPLDUMP)) {
+ setLastReplIdForDump(queryState.getConf());
+ }
openTransaction();
generateValidTxnList();
}
@@ -896,6 +902,22 @@ public class Driver implements IDriver {
ctx.setWmContext(wmContext);
}
+ /**
+ * Last repl id should be captured before opening txn by current REPL DUMP operation.
+ * This is needed to avoid losing data which are added/modified by concurrent txns when bootstrap
+ * dump in progress.
+ * @param conf Query configurations
+ * @throws HiveException
+ * @throws TException
+ */
+ private void setLastReplIdForDump(HiveConf conf) throws HiveException, TException {
+ // Last logged notification event id would be the last repl Id for the current REPl DUMP.
+ Hive hiveDb = Hive.get();
+ Long lastReplId = hiveDb.getMSC().getCurrentNotificationEventId().getEventId();
+ conf.setLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, lastReplId);
+ LOG.debug("Setting " + ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY + " = " + lastReplId);
+ }
+
private void openTransaction() throws LockException, CommandProcessorResponse {
if (checkConcurrency() && startImplicitTxn(queryTxnMgr)) {
String userFromUGI = getUserFromUGI();
@@ -903,7 +925,7 @@ public class Driver implements IDriver {
if (userFromUGI == null) {
throw createProcessorResponse(10);
}
- long txnid = queryTxnMgr.openTxn(ctx, userFromUGI);
+ queryTxnMgr.openTxn(ctx, userFromUGI);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
index b9d6f58..cbd503f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionTask.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import static org.apache.hadoop.util.StringUtils.stringifyException;
-
import java.io.IOException;
import java.util.List;
import java.util.Map;
@@ -31,7 +29,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.PrincipalType;
import org.apache.hadoop.hive.metastore.api.ResourceType;
import org.apache.hadoop.hive.metastore.api.ResourceUri;
@@ -93,6 +93,13 @@ public class FunctionTask extends Task<FunctionWork> {
}
return createPermanentFunction(Hive.get(conf), createFunctionDesc);
} catch (Exception e) {
+ // For repl load flow, function may exist for first incremental phase. So, just return success.
+ if (createFunctionDesc.getReplicationSpec().isInReplicationScope()
+ && (e.getCause() instanceof AlreadyExistsException)) {
+ LOG.info("Create function is idempotent as function: "
+ + createFunctionDesc.getFunctionName() + " already exists.");
+ return 0;
+ }
setException(e);
LOG.error("Failed to create function", e);
return 1;
@@ -262,6 +269,13 @@ public class FunctionTask extends Task<FunctionWork> {
return 0;
} catch (Exception e) {
+ // For repl load flow, function may not exist for first incremental phase. So, just return success.
+ if (dropFunctionDesc.getReplicationSpec().isInReplicationScope()
+ && (e.getCause() instanceof NoSuchObjectException)) {
+ LOG.info("Drop function is idempotent as function: "
+ + dropFunctionDesc.getFunctionName() + " doesn't exist.");
+ return 0;
+ }
LOG.info("drop function: ", e);
console.printError("FAILED: error during drop function: " + StringUtils.stringifyException(e));
return 1;
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
index 79ee80a..9a5e6df 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTask.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.InvalidTableException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.TableSpec;
import org.apache.hadoop.hive.ql.parse.EximUtil;
+import org.apache.hadoop.hive.ql.parse.ReplicationSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.repl.DumpType;
@@ -219,8 +220,13 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
Long bootStrapDump(Path dumpRoot, DumpMetaData dmd, Path cmRoot) throws Exception {
// bootstrap case
+ // Last repl id would've been captured during compile phase in queryState configs before opening txn.
+ // This is needed as we dump data on ACID/MM tables based on read snapshot or else we may lose data from
+ // concurrent txns when bootstrap dump in progress. If it is not available, then get it from metastore.
Hive hiveDb = getHive();
- Long bootDumpBeginReplId = currentNotificationId(hiveDb);
+ Long bootDumpBeginReplId = queryState.getConf().getLong(ReplicationSemanticAnalyzer.LAST_REPL_ID_KEY, -1L);
+ assert (bootDumpBeginReplId >= 0L);
+
String validTxnList = getValidTxnListForReplDump(hiveDb);
for (String dbName : Utils.matchesDb(hiveDb, work.dbNameOrPattern)) {
LOG.debug("ReplicationSemanticAnalyzer: analyzeReplDump dumping db: " + dbName);
@@ -237,7 +243,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
for (String tblName : Utils.matchesTbl(hiveDb, dbName, work.tableNameOrPattern)) {
LOG.debug(
"analyzeReplDump dumping table: " + tblName + " to db root " + dbRoot.toUri());
- dumpTable(dbName, tblName, validTxnList, dbRoot);
+ dumpTable(dbName, tblName, validTxnList, dbRoot, bootDumpBeginReplId);
dumpConstraintMetadata(dbName, tblName, dbRoot);
}
} catch (Exception e) {
@@ -308,7 +314,7 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
return dbRoot;
}
- void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot) throws Exception {
+ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId) throws Exception {
try {
Hive db = getHive();
HiveWrapper.Tuple<Table> tuple = new HiveWrapper(db, dbName).table(tblName);
@@ -319,6 +325,11 @@ public class ReplDumpTask extends Task<ReplDumpWork> implements Serializable {
tuple.replicationSpec.setIsReplace(true); // by default for all other objects this is false
if (AcidUtils.isTransactionalTable(tableSpec.tableHandle)) {
tuple.replicationSpec.setValidWriteIdList(getValidWriteIdList(dbName, tblName, validTxnList));
+
+ // For transactional table, data would be valid snapshot for current txn and doesn't include data
+ // added/modified by concurrent txns which are later than current txn. So, need to set last repl Id of this table
+ // as bootstrap dump's last repl Id.
+ tuple.replicationSpec.setCurrentReplicationState(String.valueOf(lastReplId));
}
MmContext mmCtx = MmContext.createIfNeeded(tableSpec.tableHandle);
new TableExport(
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 6ed792c..adaa3d3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -74,6 +74,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer {
private static String testInjectDumpDir = null; // unit tests can overwrite this to affect default dump behaviour
private static final String dumpSchema = "dump_dir,last_repl_id#string,string";
+ public static final String LAST_REPL_ID_KEY = "hive.repl.last.repl.id";
public static final String FUNCTIONS_ROOT_DIR_NAME = "_functions";
public static final String CONSTRAINTS_ROOT_DIR_NAME = "_constraints";
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 9294c2b..c0725ad 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1676,14 +1676,15 @@ public class TestTxnHandler {
replAbortTxnForTest(srcTxnIdList, "destdb.*");
- // Test for aborted transactions
+ // Test for aborted transactions. Idempotent case where allocate write id when txn is already
+ // aborted should do nothing.
failed = false;
try {
txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
} catch (RuntimeException e) {
failed = true;
}
- assertTrue(failed);
+ assertFalse(failed);
}
private void updateTxns(Connection conn) throws SQLException {
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
index 7bd035e..e719a08 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/repl/ReplDumpTaskTest.java
@@ -101,7 +101,7 @@ public class ReplDumpTaskTest {
private int tableDumpCount = 0;
@Override
- void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot)
+ void dumpTable(String dbName, String tblName, String validTxnList, Path dbRoot, long lastReplId)
throws Exception {
tableDumpCount++;
if (tableDumpCount > 1) {
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7fd0642..f5e4905 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -747,6 +747,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
Collections.singletonList(sourceTxnId), stmt);
if (targetTxnIds.isEmpty()) {
+ // Idempotent case where txn was already closed or abort txn event received without
+ // corresponding open txn event.
LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
" and repl policy " + rqst.getReplPolicy());
return;
@@ -888,6 +890,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
Collections.singletonList(sourceTxnId), stmt);
if (targetTxnIds.isEmpty()) {
+ // Idempotent case where txn was already closed or commit txn event received without
+ // corresponding open txn event.
LOG.info("Target txn id is missing for source txn id : " + sourceTxnId +
" and repl policy " + rqst.getReplPolicy());
return;
@@ -1398,9 +1402,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
if (srcTxnIds.size() != txnIds.size()) {
- LOG.warn("Target txn id is missing for source txn id : " + srcTxnIds.toString() +
+ // Idempotent case where txn was already closed but gets allocate write id event.
+ // So, just ignore it and return empty list.
+ LOG.info("Target txn id is missing for source txn id : " + srcTxnIds.toString() +
" and repl policy " + rqst.getReplPolicy());
- throw new RuntimeException("This should never happen for txnIds: " + txnIds);
+ return new AllocateTableWriteIdsResponse(txnToWriteIds);
}
} else {
assert (!rqst.isSetSrcTxnToWriteIdList());
http://git-wip-us.apache.org/repos/asf/hive/blob/818569f6/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
index 481d1d2..6ca3e5d 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/InjectableBehaviourObjectStore.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hive.metastore;
import java.util.List;
-
+import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Function;
import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
import org.apache.hadoop.hive.metastore.api.MetaException;
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.Table;
import static org.junit.Assert.assertEquals;
-
/**
* A wrapper around {@link ObjectStore} that allows us to inject custom behaviour
* on to some of the methods for testing.
@@ -214,4 +213,14 @@ public class InjectableBehaviourObjectStore extends ObjectStore {
}
return super.addForeignKeys(fks);
}
+
+ @Override
+ public boolean alterDatabase(String catalogName, String dbname, Database db)
+ throws NoSuchObjectException, MetaException {
+ if (callerVerifier != null) {
+ CallerArguments args = new CallerArguments(dbname);
+ callerVerifier.apply(args);
+ }
+ return super.alterDatabase(catalogName, dbname, db);
+ }
}