You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/08/10 16:56:44 UTC
hive git commit: HIVE-20264: Bootstrap repl dump with concurrent
write and drop of ACID table makes target inconsistent (Sankar Hariappan,
reviewed by Mahesh Kumar Behera, Anishek Agarwal)
Repository: hive
Updated Branches:
refs/heads/master 8cd9d3f7f -> 81dff07cb
HIVE-20264: Bootstrap repl dump with concurrent write and drop of ACID table makes target inconsistent (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/81dff07c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/81dff07c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/81dff07c
Branch: refs/heads/master
Commit: 81dff07cba2106c5d19f57701c15b62a5b773e63
Parents: 8cd9d3f
Author: Sankar Hariappan <sa...@apache.org>
Authored: Fri Aug 10 22:26:24 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Fri Aug 10 22:26:24 2018 +0530
----------------------------------------------------------------------
.../TestReplicationScenariosAcidTables.java | 79 ++++++++++++++++++++
.../hive/metastore/txn/TestTxnHandler.java | 5 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 49 +++++++-----
3 files changed, 113 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
index 3040f6c..f074428 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/parse/TestReplicationScenariosAcidTables.java
@@ -364,6 +364,85 @@ public class TestReplicationScenariosAcidTables {
}
@Test
+ public void testAcidTablesBootstrapWithConcurrentDropTable() throws Throwable {
+ HiveConf primaryConf = primary.getConf();
+ primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(1)");
+
+ // Perform concurrent write + drop on the acid table t1 when bootstrap dump in progress. Bootstrap
+ // won't dump the table but the subsequent incremental repl with new table with same name should be seen.
+ BehaviourInjection<CallerArguments, Boolean> callerInjectedBehavior
+ = new BehaviourInjection<CallerArguments, Boolean>() {
+ @Nullable
+ @Override
+ public Boolean apply(@Nullable CallerArguments args) {
+ if (injectionPathCalled) {
+ nonInjectedPathCalled = true;
+ } else {
+ // Insert another row to t1 and drop the table from another txn when bootstrap dump in progress.
+ injectionPathCalled = true;
+ Thread t = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ LOG.info("Entered new thread");
+ IDriver driver = DriverFactory.newDriver(primaryConf);
+ SessionState.start(new CliSessionState(primaryConf));
+ CommandProcessorResponse ret = driver.run("insert into " + primaryDbName + ".t1 values(2)");
+ boolean success = (ret.getException() == null);
+ assertTrue(success);
+ ret = driver.run("drop table " + primaryDbName + ".t1");
+ success = (ret.getException() == null);
+ assertTrue(success);
+ LOG.info("Exit new thread success - {}", success, ret.getException());
+ }
+ });
+ t.start();
+ LOG.info("Created new thread {}", t.getName());
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+ };
+
+ InjectableBehaviourObjectStore.setCallerVerifier(callerInjectedBehavior);
+ WarehouseInstance.Tuple bootstrapDump = null;
+ try {
+ bootstrapDump = primary.dump(primaryDbName, null);
+ callerInjectedBehavior.assertInjectionsPerformed(true, true);
+ } finally {
+ InjectableBehaviourObjectStore.resetCallerVerifier(); // reset the behaviour
+ }
+
+ // Bootstrap dump has taken latest list of tables and hence won't see table t1 as it is dropped.
+ replica.load(replicatedDbName, bootstrapDump.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(bootstrapDump.lastReplicationId)
+ .run("show tables")
+ .verifyResult(null);
+
+ // Create another ACID table with same name and insert a row. It should be properly replicated.
+ WarehouseInstance.Tuple incrementalDump = primary.run("use " + primaryDbName)
+ .run("create table t1 (id int) clustered by(id) into 3 buckets stored as orc " +
+ "tblproperties (\"transactional\"=\"true\")")
+ .run("insert into t1 values(100)")
+ .dump(primaryDbName, bootstrapDump.lastReplicationId);
+
+ replica.load(replicatedDbName, incrementalDump.dumpLocation)
+ .run("use " + replicatedDbName)
+ .run("repl status " + replicatedDbName)
+ .verifyResult(incrementalDump.lastReplicationId)
+ .run("select id from t1 order by id")
+ .verifyResult("100");
+ }
+
+ @Test
public void testOpenTxnEvent() throws Throwable {
String tableName = testName.getMethodName();
WarehouseInstance.Tuple bootStrapDump = primary.dump(primaryDbName, null);
http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index c0725ad..be37b2a 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -1667,12 +1667,15 @@ public class TestTxnHandler {
allocMsg = new AllocateTableWriteIdsRequest("destdb", "tbl2");
allocMsg.setReplPolicy("destdb.*");
allocMsg.setSrcTxnToWriteIdList(srcTxnToWriteId);
+
+ // This is an idempotent case when repl flow forcefully allocate write id if it doesn't match
+ // the next write id.
try {
txnHandler.allocateTableWriteIds(allocMsg).getTxnToWriteIds();
} catch (IllegalStateException e) {
failed = true;
}
- assertTrue(failed);
+ assertFalse(failed);
replAbortTxnForTest(srcTxnIdList, "destdb.*");
http://git-wip-us.apache.org/repos/asf/hive/blob/81dff07c/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index f5e4905..8edc387 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -1404,7 +1404,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (srcTxnIds.size() != txnIds.size()) {
// Idempotent case where txn was already closed but gets allocate write id event.
// So, just ignore it and return empty list.
- LOG.info("Target txn id is missing for source txn id : " + srcTxnIds.toString() +
+ LOG.info("Idempotent case: Target txn id is missing for source txn id : " + srcTxnIds.toString() +
" and repl policy " + rqst.getReplPolicy());
return new AllocateTableWriteIdsResponse(txnToWriteIds);
}
@@ -1422,10 +1422,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new RuntimeException("This should never happen for txnIds: " + txnIds);
}
- long writeId;
- String s;
- long allocatedTxnsCount = 0;
- long txnId;
List<String> queries = new ArrayList<>();
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
@@ -1440,6 +1436,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
suffix.append("");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
txnIds, "t2w_txnid", false, false);
+
+ long allocatedTxnsCount = 0;
+ long txnId;
+ long writeId = 0;
for (String query : queries) {
LOG.debug("Going to execute query <" + query + ">");
rs = stmt.executeQuery(query);
@@ -1462,12 +1462,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return new AllocateTableWriteIdsResponse(txnToWriteIds);
}
+ long srcWriteId = 0;
+ if (rqst.isSetReplPolicy()) {
+ // In replication flow, we always need to allocate write ID equal to that of source.
+ assert(srcTxnToWriteIds != null);
+ srcWriteId = srcTxnToWriteIds.get(0).getWriteId();
+ }
+
handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
// There are some txns in the list which does not have write id allocated and hence go ahead and do it.
// Get the next write id for the given table and update it with new next write id.
// This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
- s = sqlGenerator.addForUpdateClause(
+ String s = sqlGenerator.addForUpdateClause(
"select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
+ " and nwi_table = " + quoteString(tblName));
LOG.debug("Going to execute query <" + s + ">");
@@ -1475,19 +1482,33 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (!rs.next()) {
// First allocation of write id should add the table to the next_write_id meta table
// The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
- writeId = 1;
+ // For repl flow, we need to force set the incoming write id.
+ writeId = (srcWriteId > 0) ? srcWriteId : 1;
s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
- + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")";
+ + quoteString(dbName) + "," + quoteString(tblName) + "," + (writeId + numOfWriteIds) + ")";
LOG.debug("Going to execute insert <" + s + ">");
stmt.execute(s);
} else {
- writeId = rs.getLong(1);
+ long nextWriteId = rs.getLong(1);
+ writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId;
+
// Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
+ " where nwi_database = " + quoteString(dbName)
+ " and nwi_table = " + quoteString(tblName);
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
+
+ // For repl flow, if the source write id is mismatching with target next write id, then current
+ // metadata in TXN_TO_WRITE_ID is stale for this table and hence need to clean-up TXN_TO_WRITE_ID.
+ // This is possible in case of first incremental repl after bootstrap where concurrent write
+ // and drop table was performed at source during bootstrap dump.
+ if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) {
+ s = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
+ + " and t2w_table = " + quoteString(tblName);
+ LOG.debug("Going to execute delete <" + s + ">");
+ stmt.executeUpdate(s);
+ }
}
// Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
@@ -1500,16 +1521,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
writeId++;
}
- if (rqst.isSetReplPolicy()) {
- int lastIdx = txnToWriteIds.size()-1;
- if ((txnToWriteIds.get(0).getWriteId() != srcTxnToWriteIds.get(0).getWriteId()) ||
- (txnToWriteIds.get(lastIdx).getWriteId() != srcTxnToWriteIds.get(lastIdx).getWriteId())) {
- LOG.error("Allocated write id range {} is not matching with the input write id range {}.",
- txnToWriteIds, srcTxnToWriteIds);
- throw new IllegalStateException("Write id allocation failed for: " + srcTxnToWriteIds);
- }
- }
-
// Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
List<String> inserts = sqlGenerator.createInsertValuesStmt(
"TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);