You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/04/30 11:27:45 UTC
[hive] branch master updated: HIVE-23048: Use sequences for TXN_ID
generation (Peter Varga reviewed by Peter Vary, Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 252fcea HIVE-23048: Use sequences for TXN_ID generation (Peter Varga reviewed by Peter Vary, Denys Kuzmenko)
252fcea is described below
commit 252fceaa45fa5d987cbae4c3d594566198160ce1
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Thu Apr 30 13:24:14 2020 +0200
HIVE-23048: Use sequences for TXN_ID generation (Peter Varga reviewed by Peter Vary, Denys Kuzmenko)
---
.../hadoop/hive/ql/lockmgr/DbTxnManager.java | 4 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 2 +-
.../metastore/txn/TestCompactionTxnHandler.java | 15 +-
.../hadoop/hive/metastore/txn/TestTxnHandler.java | 32 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 26 +-
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 19 +-
.../hadoop/hive/ql/TestTxnCommandsForMmTable.java | 10 +-
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 9 +
.../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 78 +++
.../hadoop/hive/ql/lockmgr/ITestDbTxnManager.java | 3 +-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 64 +--
.../TestDbTxnManagerIsolationProperties.java | 237 ++++++++
.../hive/ql/txn/compactor/TestInitiator.java | 7 +-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 2 +
.../hadoop/hive/metastore/tools/SQLGenerator.java | 38 +-
.../hive/metastore/txn/CompactionTxnHandler.java | 44 +-
.../hadoop/hive/metastore/txn/TxnDbUtil.java | 158 ++++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 597 +++++++++++++++------
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 23 +-
.../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 16 +-
.../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 14 +
.../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 23 +-
.../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 49 +-
.../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 11 +-
.../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 17 +
.../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 11 +-
.../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 15 +
.../sql/postgres/hive-schema-4.0.0.postgres.sql | 10 +-
.../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 9 +
.../hive/metastore/dbinstall/DbInstallBase.java | 1 +
.../metastore/dbinstall/rules/DatabaseRule.java | 24 +-
.../hive/metastore/dbinstall/rules/Oracle.java | 13 +-
.../hadoop/hive/metastore/txn/TestOpenTxn.java | 128 +++++
33 files changed, 1338 insertions(+), 371 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index a08af7c..deaab89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -99,8 +99,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
private volatile DbLockManager lockMgr = null;
/**
- * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available
- * transaction id. Thus is 1 is first transaction id.
+ * The Metastore TXNS sequence is initialized to 1.
+ * Thus is 1 is first transaction id.
*/
private volatile long txnId = 0;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 37a5862..23512e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -151,7 +151,7 @@ public class Initiator extends MetaStoreCompactorThread {
recoverFailedCompactions(true);
// Clean anything from the txns table that has no components left in txn_components.
- txnHandler.cleanEmptyAbortedTxns();
+ txnHandler.cleanEmptyAbortedAndCommittedTxns();
// Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns.
txnHandler.cleanTxnToWriteIdTable();
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 7c8903f..7069dae 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -512,9 +512,14 @@ public class TestCompactionTxnHandler {
// Check that we are cleaning up the empty aborted transactions
GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
assertEquals(3, txnList.getOpen_txnsSize());
- txnHandler.cleanEmptyAbortedTxns();
+ // Create one aborted for low water mark
+ txnid = openTxn();
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ txnHandler.setOpenTxnTimeOutMillis(1);
+ txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnList = txnHandler.getOpenTxns();
- assertEquals(2, txnList.getOpen_txnsSize());
+ assertEquals(3, txnList.getOpen_txnsSize());
+ txnHandler.setOpenTxnTimeOutMillis(1000);
rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
rqst.setPartitionname("bar");
@@ -529,9 +534,13 @@ public class TestCompactionTxnHandler {
txnHandler.markCleaned(ci);
txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
- txnHandler.cleanEmptyAbortedTxns();
+ // The open txn will became the low water mark
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+ txnHandler.setOpenTxnTimeOutMillis(1);
+ txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnList = txnHandler.getOpenTxns();
assertEquals(3, txnList.getOpen_txnsSize());
+ txnHandler.setOpenTxnTimeOutMillis(1000);
}
@Test
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 3916e88..868da0c 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -193,30 +193,40 @@ public class TestTxnHandler {
boolean gotException = false;
try {
txnHandler.abortTxn(new AbortTxnRequest(2));
- }
- catch(NoSuchTxnException ex) {
+ } catch(NoSuchTxnException ex) {
gotException = true;
- //if this wasn't an empty txn, we'd get a better msg
- Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage());
+ // this is the last committed, so it is still in the txns table
+ Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " is already committed.", ex.getMessage());
}
Assert.assertTrue(gotException);
gotException = false;
txnHandler.commitTxn(new CommitTxnRequest(3));
try {
txnHandler.abortTxn(new AbortTxnRequest(3));
- }
- catch(NoSuchTxnException ex) {
+ } catch(NoSuchTxnException ex) {
gotException = true;
//txn 3 is not empty txn, so we get a better msg
Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage());
}
Assert.assertTrue(gotException);
+ txnHandler.setOpenTxnTimeOutMillis(1);
+ txnHandler.cleanEmptyAbortedAndCommittedTxns();
+ txnHandler.setOpenTxnTimeOutMillis(1000);
gotException = false;
try {
- txnHandler.abortTxn(new AbortTxnRequest(4));
+ txnHandler.abortTxn(new AbortTxnRequest(2));
+ } catch(NoSuchTxnException ex) {
+ gotException = true;
+ // now the second transaction is cleared and since it was empty, we do not recognize it anymore
+ Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage());
}
- catch(NoSuchTxnException ex) {
+ Assert.assertTrue(gotException);
+
+ gotException = false;
+ try {
+ txnHandler.abortTxn(new AbortTxnRequest(4));
+ } catch(NoSuchTxnException ex) {
gotException = true;
Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(4), ex.getMessage());
}
@@ -1672,9 +1682,11 @@ public class TestTxnHandler {
@Test
public void testReplOpenTxn() throws Exception {
int numTxn = 50000;
- String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n");
+ String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n");
long startTxnId = Long.parseLong(output[1].trim());
+ txnHandler.setOpenTxnTimeOutMillis(30000);
List<Long> txnList = replOpenTxnForTest(startTxnId, numTxn, "default.*");
+ txnHandler.setOpenTxnTimeOutMillis(1000);
assert(txnList.size() == numTxn);
txnHandler.abortTxns(new AbortTxnsRequest(txnList));
}
@@ -1682,7 +1694,7 @@ public class TestTxnHandler {
@Test
public void testReplAllocWriteId() throws Exception {
int numTxn = 2;
- String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n");
+ String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n");
long startTxnId = Long.parseLong(output[1].trim());
List<Long> srcTxnIdList = LongStream.rangeClosed(startTxnId, numTxn+startTxnId-1)
.boxed().collect(Collectors.toList());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2c13e8d..48bf852 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -89,6 +89,7 @@ public class TestTxnCommands2 {
protected HiveConf hiveConf;
protected Driver d;
+ private TxnStore txnHandler;
protected enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart", "p"),
@@ -151,6 +152,7 @@ public class TestTxnCommands2 {
TxnDbUtil.setConfValues(hiveConf);
TxnDbUtil.prepDb(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
File f = new File(TEST_WAREHOUSE_DIR);
if (f.exists()) {
FileUtil.fullyDelete(f);
@@ -322,7 +324,6 @@ public class TestTxnCommands2 {
// 3. Perform a major compaction.
runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
runWorker(hiveConf);
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
@@ -930,7 +931,6 @@ public class TestTxnCommands2 {
int[][] tableData = {{1,2},{3,4},{5,6}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
@@ -953,7 +953,6 @@ public class TestTxnCommands2 {
runStatementOnDriver("update t1" + " set b = -2 where a = 1");
runStatementOnDriver("alter table t1 " + " compact 'MAJOR'");
runWorker(hiveConf);
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
@@ -1029,7 +1028,6 @@ public class TestTxnCommands2 {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
AtomicBoolean stop = new AtomicBoolean(true);
//create failed compactions
for(int i = 0; i < numFailedCompactions; i++) {
@@ -1217,7 +1215,6 @@ public class TestTxnCommands2 {
runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
//run Worker to execute compaction
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
runWorker(hiveConf);
@@ -1277,7 +1274,6 @@ public class TestTxnCommands2 {
public void testOpenTxnsCounter() throws Exception {
hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3);
hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 10, TimeUnit.MILLISECONDS);
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService();
@@ -1319,7 +1315,6 @@ public class TestTxnCommands2 {
runStatementOnDriver("update " + Table.ACIDTBL + " set b = -2 where b = 2");
runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MINOR'");
runWorker(hiveConf);
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
@@ -1380,6 +1375,8 @@ public class TestTxnCommands2 {
// now compact and see if compaction still preserves the data correctness
runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'");
runWorker(hiveConf);
+ // create a low water mark aborted transaction and clean the older ones
+ createAbortLowWaterMark();
runCleaner(hiveConf); // Cleaner would remove the obsolete files.
// Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned.
@@ -1403,6 +1400,14 @@ public class TestTxnCommands2 {
Assert.assertEquals(Arrays.asList(expectedResult), rs);
}
+ protected void createAbortLowWaterMark() throws Exception{
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+ runStatementOnDriver("select * from " + Table.ACIDTBL);
+ // wait for metastore.txn.opentxn.timeout
+ Thread.sleep(1000);
+ runInitiator(hiveConf);
+ }
+
@Test
public void testETLSplitStrategyForACID() throws Exception {
hiveConf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, "ETL");
@@ -2007,7 +2012,6 @@ public class TestTxnCommands2 {
Assert.assertEquals("2\t2000\taa", res.get(1));
// Compact
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
CompactionRequest compactionRequest =
new CompactionRequest("default", tblName, CompactionType.MAJOR);
compactionRequest.setPartitionname("part=aa");
@@ -2054,7 +2058,6 @@ public class TestTxnCommands2 {
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause));
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
@@ -2096,7 +2099,7 @@ public class TestTxnCommands2 {
// The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as
// aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained.
// As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained.
- txnHandler.cleanEmptyAbortedTxns();
+ txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
@@ -2109,7 +2112,7 @@ public class TestTxnCommands2 {
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
- txnHandler.cleanEmptyAbortedTxns();
+ txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
@@ -2234,7 +2237,6 @@ public class TestTxnCommands2 {
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData3));
- TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MINOR));
runWorker(hiveConf);
runCleaner(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 51b0fa3..5b8c670 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -479,11 +479,15 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
- assertTableIsEmpty("TXNS");
+ // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+ hiveConf.set("metastore.txn.opentxn.timeout", "1");
+ runInitiator(hiveConf);
+ hiveConf.set("metastore.txn.opentxn.timeout", "1000");
+ assertOneTxn();
assertTableIsEmpty("TXN_COMPONENTS");
runCleaner(hiveConf);
- assertTableIsEmpty("TXNS");
+ assertOneTxn();
assertTableIsEmpty("TXN_COMPONENTS");
}
@@ -500,11 +504,14 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
- assertTableIsEmpty("TXNS");
+ // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+ hiveConf.set("metastore.txn.opentxn.timeout", "1");
+ runInitiator(hiveConf);
+ hiveConf.set("metastore.txn.opentxn.timeout", "1000");
assertTableIsEmpty("TXN_COMPONENTS");
runCleaner(hiveConf);
- assertTableIsEmpty("TXNS");
+ assertOneTxn();
assertTableIsEmpty("TXN_COMPONENTS");
}
@@ -512,4 +519,8 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0,
TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table));
}
+ private void assertOneTxn() throws Exception {
+ Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 1,
+ TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
+ }
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index 6525ffc..eac2c63 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -465,8 +465,11 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
+ // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+ Thread.sleep(1000);
+ runInitiator(hiveConf);
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
+ 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
// Initiate a major compaction request on the table.
runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'");
@@ -475,13 +478,16 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
runWorker(hiveConf);
verifyDirAndResult(2, true);
+ // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+ Thread.sleep(1000);
+ runInitiator(hiveConf);
// Run Cleaner.
runCleaner(hiveConf);
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
0,
TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
+ 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
verifyDirAndResult(0, true);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 1435269..3ff68a3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
@@ -57,6 +59,8 @@ public abstract class TxnCommandsBaseForTests {
public TestName testName = new TestName();
protected HiveConf hiveConf;
Driver d;
+ private TxnStore txnHandler;
+
public enum Table {
ACIDTBL("acidTbl"),
ACIDTBLPART("acidTblPart"),
@@ -75,6 +79,10 @@ public abstract class TxnCommandsBaseForTests {
}
}
+ public TxnStore getTxnStore() {
+ return txnHandler;
+ }
+
@Before
public void setUp() throws Exception {
setUpInternal();
@@ -106,6 +114,7 @@ public abstract class TxnCommandsBaseForTests {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
hiveConf.setBoolean("mapred.input.dir.recursive", true);
TxnDbUtil.setConfValues(hiveConf);
+ txnHandler = TxnUtils.getTxnStore(hiveConf);
TxnDbUtil.prepDb(hiveConf);
File f = new File(getWarehouseDir());
if (f.exists()) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
new file mode 100644
index 0000000..b435e79
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * Base class for "end-to-end" tests for DbTxnManager and simulate concurrent queries.
+ */
+public abstract class DbTxnManagerEndToEndTestBase {
+
+ protected static HiveConf conf = new HiveConf(Driver.class);
+ protected HiveTxnManager txnMgr;
+ protected Context ctx;
+ protected Driver driver, driver2;
+ protected TxnStore txnHandler;
+
+ public DbTxnManagerEndToEndTestBase() {
+ conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ TxnDbUtil.setConfValues(conf);
+ }
+ @BeforeClass
+ public static void setUpDB() throws Exception{
+ TxnDbUtil.prepDb(conf);
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
+ SessionState.start(conf);
+ ctx = new Context(conf);
+ driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
+ driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
+ TxnDbUtil.cleanDb(conf);
+ SessionState ss = SessionState.get();
+ ss.initTxnMgr(conf);
+ txnMgr = ss.getTxnMgr();
+ Assert.assertTrue(txnMgr instanceof DbTxnManager);
+ txnHandler = TxnUtils.getTxnStore(conf);
+
+ }
+ @After
+ public void tearDown() throws Exception {
+ driver.close();
+ driver2.close();
+ if (txnMgr != null) {
+ txnMgr.closeTxnManager();
+ }
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java
index a085e9f..12cbc2a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java
@@ -62,8 +62,7 @@ public class ITestDbTxnManager extends TestDbTxnManager2 {
MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY));
// Start the docker container and create the hive user
rule.before();
- rule.createUser();
- // We do not run the install script, it will be called anyway before every test in prepDb
+ rule.install();
}
@AfterClass
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index f0ab8af..1687425 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -32,22 +32,14 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.ql.TestTxnCommands2;
-import org.junit.After;
import org.junit.Assert;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.QueryState;
import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.Before;
-import org.junit.BeforeClass;
import org.junit.ComparisonFailure;
import org.junit.Rule;
import org.junit.Test;
@@ -82,47 +74,7 @@ import java.util.Map;
* using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the TM is associated with
* each thread.
*/
-public class TestDbTxnManager2 {
- protected static HiveConf conf = new HiveConf(Driver.class);
-
- private HiveTxnManager txnMgr;
- private Context ctx;
- private Driver driver;
- private TxnStore txnHandler;
-
- public TestDbTxnManager2() {
- conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
- "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
- conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
- TxnDbUtil.setConfValues(conf);
- }
-
- @BeforeClass
- public static void setUpDB() throws Exception{
- TxnDbUtil.prepDb(conf);
- }
-
- @Before
- public void setUp() throws Exception {
- conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
- SessionState.start(conf);
- ctx = new Context(conf);
- driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
- TxnDbUtil.cleanDb(conf);
- SessionState ss = SessionState.get();
- ss.initTxnMgr(conf);
- txnMgr = ss.getTxnMgr();
- Assert.assertTrue(txnMgr instanceof DbTxnManager);
- txnHandler = TxnUtils.getTxnStore(conf);
- }
-
- @After
- public void tearDown() throws Exception {
- driver.close();
- if (txnMgr != null) {
- txnMgr.closeTxnManager();
- }
- }
+public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
/**
* HIVE-16688
@@ -1237,6 +1189,13 @@ public class TestDbTxnManager2 {
locks = getLocks(txnMgr);
Assert.assertEquals("Unexpected lock count", 0, locks.size());
+ /**
+ * The last transaction will always remain in the transaction table, so we will open an other one,
+ * wait for the timeout period to exceed, then start the initiator that will clean
+ */
+ txnMgr.openTxn(ctx, "Long Running");
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+ // Now we can clean the write_set
houseKeeper.run();
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\""));
}
@@ -1311,6 +1270,13 @@ public class TestDbTxnManager2 {
Assert.assertEquals("Unexpected lock count", 1, locks.size());
checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks);
txnMgr.commitTxn();
+ /*
+ * The last transaction will always remain in the transaction table, so we will open an other one,
+ * wait for the timeout period to exceed, then start the initiator that will clean
+ */
+ txnMgr.openTxn(ctx, "Long Running");
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+ // Now we can clean the write_set
MetastoreTaskThread writeSetService = new AcidWriteSetService();
writeSetService.setConf(conf);
writeSetService.run();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java
new file mode 100644
index 0000000..6c30069
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to check the ACID properties and isolation level requirements.
+ */
+public class TestDbTxnManagerIsolationProperties extends DbTxnManagerEndToEndTestBase {
+
+ @Test
+ public void basicOpenTxnsNoDirtyRead() throws Exception {
+ driver.run(("drop table if exists gap"));
+ driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+ // Create one TXN to read and do not run it
+ driver.compileAndRespond("select * from gap");
+ long first = txnMgr.getCurrentTxnId();
+
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ driver2.compileAndRespond("insert into gap values(1,2)");
+ long second = txnMgr2.getCurrentTxnId();
+ Assert.assertTrue("Sequence number goes onward", second > first);
+ driver2.run();
+
+ // Now we run our read query it should not see the write results of the insert
+ swapTxnManager(txnMgr);
+ driver.run();
+
+ FetchTask fetchTask = driver.getFetchTask();
+ List res = new ArrayList();
+ fetchTask.fetch(res);
+ Assert.assertEquals("No dirty read", 0, res.size());
+
+ }
+ @Test
+ public void gapOpenTxnsNoDirtyRead() throws Exception {
+ driver.run(("drop table if exists gap"));
+ driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+ // Create one TXN to delete later
+ driver.compileAndRespond("select * from gap");
+ long first = txnMgr.getCurrentTxnId();
+ driver.run();
+ // The second one we use for Low water mark
+ driver.run("select * from gap");
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ // Make sure, that the time window is great enough to consider the gap open
+ txnHandler.setOpenTxnTimeOutMillis(30000);
+ // Create a gap
+ deleteTransactionId(first);
+ CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap");
+ long third = txnMgr2.getCurrentTxnId();
+ Assert.assertTrue("Sequence number goes onward", third > first);
+ ValidTxnList validTxns = txnMgr2.getValidTxns();
+ Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn());
+ txnHandler.setOpenTxnTimeOutMillis(1000);
+
+ // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call
+ setBackSequence(first);
+ swapTxnManager(txnMgr);
+ driver.compileAndRespond("insert into gap values(1,2)");
+ long forth = txnMgr.getCurrentTxnId();
+ Assert.assertEquals(first, forth);
+ driver.run();
+
+ // Now we run our read query it should not see the write results of the insert
+ swapTxnManager(txnMgr2);
+ driver2.run();
+
+ FetchTask fetchTask = driver2.getFetchTask();
+ List res = new ArrayList();
+ fetchTask.fetch(res);
+ Assert.assertEquals("No dirty read", 0, res.size());
+
+ }
+
+
+
+ @Test
+ public void multipleGapOpenTxnsNoDirtyRead() throws Exception {
+ driver.run(("drop table if exists gap"));
+ driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+ // Create some TXN to delete later
+ OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(10, "user", "local"));
+ openTxns.getTxn_ids().stream().forEach(txnId -> {
+ silentCommitTxn(new CommitTxnRequest(txnId));
+ });
+
+ long first = openTxns.getTxn_ids().get(0);
+ long last = openTxns.getTxn_ids().get(9);
+ // The next one we use for Low water mark
+ driver.run("select * from gap");
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ // Make sure, that the time window is great enough to consider the gap open
+ txnHandler.setOpenTxnTimeOutMillis(30000);
+ // Create a gap
+ deleteTransactionId(first, last);
+ CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap");
+ long next = txnMgr2.getCurrentTxnId();
+ Assert.assertTrue("Sequence number goes onward", next > last);
+ ValidTxnList validTxns = txnMgr2.getValidTxns();
+ Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn());
+ txnHandler.setOpenTxnTimeOutMillis(1000);
+
+ // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call
+ setBackSequence(first);
+ swapTxnManager(txnMgr);
+ driver.compileAndRespond("insert into gap values(1,2)");
+ next = txnMgr.getCurrentTxnId();
+ Assert.assertEquals(first, next);
+ driver.run();
+
+ // Now we run our read query it should not see the write results of the insert
+ swapTxnManager(txnMgr2);
+ driver2.run();
+
+ FetchTask fetchTask = driver2.getFetchTask();
+ List res = new ArrayList();
+ fetchTask.fetch(res);
+ Assert.assertEquals("No dirty read", 0, res.size());
+
+ }
+
+ @Test
+ public void gapOpenTxnsDirtyRead() throws Exception {
+ driver.run(("drop table if exists gap"));
+ driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+ // Create one TXN to delete later
+ driver.compileAndRespond("select * from gap");
+ long first = txnMgr.getCurrentTxnId();
+ driver.run();
+ //The second one we use for Low water mark
+ driver.run("select * from gap");
+ DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+ swapTxnManager(txnMgr2);
+ // Now we wait for the time window to move forward
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+ // Create a gap
+ deleteTransactionId(first);
+ CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap");
+ long third = txnMgr2.getCurrentTxnId();
+ Assert.assertTrue("Sequence number goes onward", third > first);
+ ValidTxnList validTxns = txnMgr2.getValidTxns();
+ Assert.assertNull("Expect to see no gap", validTxns.getMinOpenTxn());
+
+ // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call
+ // This should never happen
+ setBackSequence(first);
+ swapTxnManager(txnMgr);
+ driver.compileAndRespond("insert into gap values(1,2)");
+ long forth = txnMgr.getCurrentTxnId();
+ Assert.assertEquals(first, forth);
+ driver.run();
+
+ // Now we run our read query it should unfortunately see the results of the insert
+ swapTxnManager(txnMgr2);
+ driver2.run();
+
+ FetchTask fetchTask = driver2.getFetchTask();
+ List res = new ArrayList();
+ fetchTask.fetch(res);
+ Assert.assertEquals("Dirty read!", 1, res.size());
+
+ }
+ private void silentCommitTxn(CommitTxnRequest commitTxnRequest) {
+ try {
+ txnHandler.commitTxn(commitTxnRequest);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private void deleteTransactionId(long txnId) throws SQLException {
+ deleteTransactionId(txnId, txnId);
+ }
+
+ private void deleteTransactionId(long minTxnId, long maxTxnId) throws SQLException {
+ DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+ DataSource ds = dsp.create(conf);
+ Connection dbConn = ds.getConnection();
+ Statement stmt = dbConn.createStatement();
+ stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId);
+ dbConn.commit();
+ stmt.close();
+ dbConn.close();
+ }
+
+ private void setBackSequence(long txnId) throws SQLException {
+ DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+ DataSource ds = dsp.create(conf);
+ Connection dbConn = ds.getConnection();
+ Statement stmt = dbConn.createStatement();
+ stmt.executeUpdate("ALTER TABLE TXNS ALTER TXN_ID RESTART WITH " + txnId);
+ dbConn.commit();
+ stmt.close();
+ dbConn.close();
+ }
+
+ public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) {
+ return SessionState.get().setTxnMgr(txnMgr);
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 1151466..e4ff14a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -218,18 +218,19 @@ public class TestInitiator extends CompactorTest {
req.setTxnid(txnid);
LockResponse res = txnHandler.lock(req);
txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
+ txnHandler.setOpenTxnTimeOutMillis(30000);
conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50);
OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(
TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname"));
txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids()));
GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
-
+ txnHandler.setOpenTxnTimeOutMillis(1);
startInitiator();
openTxns = txnHandler.getOpenTxns();
- Assert.assertEquals(1, openTxns.getOpen_txnsSize());
+ // txnid:1 has txn_components, txnid:TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1 is the last
+ Assert.assertEquals(2, openTxns.getOpen_txnsSize());
}
@Test
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 9ce0085..842b7fe 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1200,6 +1200,8 @@ public class MetastoreConf {
"class is used to store and retrieve transactions and locks"),
TXN_TIMEOUT("metastore.txn.timeout", "hive.txn.timeout", 300, TimeUnit.SECONDS,
"time after which transactions are declared aborted if the client has not sent a heartbeat."),
+ TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS,
+ "Time before an open transaction operation should persist, otherwise it is considered invalid and rolled back"),
URI_RESOLVER("metastore.uri.resolver", "hive.metastore.uri.resolver", "",
"If set, fully qualified class name of resolver for hive metastore uri's"),
USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false,
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index 49b737e..08ef5e9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -99,7 +99,7 @@ public final class SQLGenerator {
}
/**
- * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+ * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB.
*
* @param tblColumns e.g. "T(a,b,c)"
* @param rows e.g. list of Strings like 3,4,'d'
@@ -110,7 +110,7 @@ public final class SQLGenerator {
}
/**
- * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+ * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB.
*
* @param tblColumns e.g. "T(a,b,c)"
* @param rows e.g. list of Strings like 3,4,'d'
@@ -263,7 +263,7 @@ public final class SQLGenerator {
* @throws SQLException
*/
public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List<String> parameters)
- throws SQLException {
+ throws SQLException {
PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql));
if ((parameters == null) || parameters.isEmpty()) {
return pst;
@@ -292,4 +292,36 @@ public final class SQLGenerator {
return s;
}
+
+ /**
+ * Creates a lock statement for open/commit transaction based on the dbProduct in shared read / exclusive mode.
+ * @param shared shared or exclusive lock
+ * @return sql statement to execute
+ * @throws MetaException if the dbProduct is unknown
+ */
+ public String createTxnLockStatement(boolean shared) throws MetaException{
+ String txnLockTable = "TXN_LOCK_TBL";
+ switch (dbProduct) {
+ case MYSQL:
+ // For Mysql we do not use lock table statement for two reasons
+ // It is not released automatically on commit/rollback
+ // It requires to lock every table that will be used by the statement
+ // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html
+ return "SELECT \"TXN_LOCK\" FROM \"" + txnLockTable + "\" " + (shared ? "LOCK IN SHARE MODE" : "FOR UPDATE");
+ case POSTGRES:
+ // https://www.postgresql.org/docs/9.4/sql-lock.html
+ case DERBY:
+ // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html
+ case ORACLE:
+ // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm
+ return "LOCK TABLE \"" + txnLockTable + "\" IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE";
+ case SQLSERVER:
+ // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15
+ return "SELECT * FROM \"" + txnLockTable + "\" WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)";
+ default:
+ String msg = "Unrecognized database product name <" + dbProduct + ">";
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 2344c2d..a1bc109 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -223,7 +223,7 @@ class CompactionTxnHandler extends TxnHandler {
stmt = dbConn.createStatement();
String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', "
+ "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = "
- + "(SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\")"
+ + "(SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\")"
+ " WHERE \"CQ_ID\" = " + info.id;
LOG.debug("Going to execute update <" + s + ">");
int updCnt = stmt.executeUpdate(s);
@@ -474,7 +474,7 @@ class CompactionTxnHandler extends TxnHandler {
}
/**
* Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
- * min(NEXT_TXN_ID.ntxn_next, min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
+ * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
*/
@Override
@RetrySemantics.SafeToRetry
@@ -492,9 +492,9 @@ class CompactionTxnHandler extends TxnHandler {
// First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
// If there are no txns which are currently open or aborted in the system, then current value of
- // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
+ // max(TXNS.txn_id) could be min_uncommitted_txnid.
String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
- "SELECT MIN(\"NTXN_NEXT\") AS \"ID\" FROM \"NEXT_TXN_ID\" " +
+ "SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\" " +
"UNION " +
"SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " +
"UNION " +
@@ -504,7 +504,7 @@ class CompactionTxnHandler extends TxnHandler {
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
if (!rs.next()) {
- throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
+ throw new MetaException("Transaction tables not properly initialized, no record found in TXNS");
}
long minUncommitedTxnid = rs.getLong(1);
@@ -533,25 +533,36 @@ class CompactionTxnHandler extends TxnHandler {
}
/**
- * Clean up aborted transactions from txns that have no components in txn_components. The reason such
- * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
- * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+ * Clean up aborted / committed transactions from txns that have no components in txn_components.
+ * The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally.
+ * The reason such aborted txns exist can be that now work was done in this txn
+ * (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work)
+ * or due to {@link #markCleaned(CompactionInfo)} being called.
*/
@Override
@RetrySemantics.SafeToRetry
- public void cleanEmptyAbortedTxns() throws MetaException {
+ public void cleanEmptyAbortedAndCommittedTxns() throws MetaException {
+ LOG.info("Start to clean empty aborted or committed TXNS");
try {
Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
try {
- //Aborted is a terminal state, so nothing about the txn can change
+ //Aborted and committed are terminal states, so nothing about the txn can change
//after that, so READ COMMITTED is sufficient.
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
+ /**
+ * Only delete aborted / committed transaction in a way that guarantees two things:
+ * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window
+ * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window
+ */
+ long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
+
String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " +
"\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " +
- "\"TXN_STATE\" = '" + TXN_ABORTED + "'";
+ " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND "
+ + " \"TXN_ID\" < " + lowWaterMark;
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
List<Long> txnids = new ArrayList<>();
@@ -568,16 +579,15 @@ class CompactionTxnHandler extends TxnHandler {
// Delete from TXNS.
prefix.append("DELETE FROM \"TXNS\" WHERE ");
- suffix.append("");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false);
for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
int rc = stmt.executeUpdate(query);
- LOG.info("Removed " + rc + " empty Aborted transactions from TXNS");
+ LOG.debug("Removed " + rc + " empty Aborted and Committed transactions from TXNS");
}
- LOG.info("Aborted transactions removed from TXNS: " + txnids);
+ LOG.info("Aborted and committed transactions removed from TXNS: " + txnids);
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
@@ -591,7 +601,7 @@ class CompactionTxnHandler extends TxnHandler {
close(rs, stmt, dbConn);
}
} catch (RetryException e) {
- cleanEmptyAbortedTxns();
+ cleanEmptyAbortedAndCommittedTxns();
}
}
@@ -1147,12 +1157,12 @@ class CompactionTxnHandler extends TxnHandler {
+ quoteChar(READY_FOR_CLEANING) +
") \"RES\"";
} else {
- query = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"";
+ query = "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"";
}
LOG.debug("Going to execute query <" + query + ">");
rs = stmt.executeQuery(query);
if (!rs.next()) {
- throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
+ throw new MetaException("Transaction tables not properly initialized, no record found in TXNS");
}
return rs.getLong(1);
} catch (SQLException e) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 97a0833..7a90316 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -218,63 +218,107 @@ public final class TxnDbUtil {
public static void cleanDb(Configuration conf) throws Exception {
LOG.info("Cleaning transactional tables");
- int retryCount = 0;
- while(++retryCount <= 3) {
- boolean success = true;
- Connection conn = null;
- Statement stmt = null;
+
+ boolean success = true;
+ Connection conn = null;
+ Statement stmt = null;
+ try {
+ conn = getConnection(conf);
+ stmt = conn.createStatement();
+ if (!checkDbPrepared(stmt)){
+ // Nothing to clean
+ return;
+ }
+
+ // We want to try these, whether they succeed or fail.
+ success &= truncateTable(conn, stmt, "TXN_COMPONENTS");
+ success &= truncateTable(conn, stmt, "COMPLETED_TXN_COMPONENTS");
+ success &= truncateTable(conn, stmt, "TXNS");
+ success &= truncateTable(conn, stmt, "TXN_TO_WRITE_ID");
+ success &= truncateTable(conn, stmt, "NEXT_WRITE_ID");
+ success &= truncateTable(conn, stmt, "HIVE_LOCKS");
+ success &= truncateTable(conn, stmt, "NEXT_LOCK_ID");
+ success &= truncateTable(conn, stmt, "COMPACTION_QUEUE");
+ success &= truncateTable(conn, stmt, "NEXT_COMPACTION_QUEUE_ID");
+ success &= truncateTable(conn, stmt, "COMPLETED_COMPACTIONS");
+ success &= truncateTable(conn, stmt, "AUX_TABLE");
+ success &= truncateTable(conn, stmt, "WRITE_SET");
+ success &= truncateTable(conn, stmt, "REPL_TXN_MAP");
+ success &= truncateTable(conn, stmt, "MATERIALIZATION_REBUILD_LOCKS");
try {
- conn = getConnection(conf);
- stmt = conn.createStatement();
-
- // We want to try these, whether they succeed or fail.
- success &= truncateTable(conn, stmt, "TXN_COMPONENTS");
- success &= truncateTable(conn, stmt, "COMPLETED_TXN_COMPONENTS");
- success &= truncateTable(conn, stmt, "TXNS");
- success &= truncateTable(conn, stmt, "NEXT_TXN_ID");
- success &= truncateTable(conn, stmt, "TXN_TO_WRITE_ID");
- success &= truncateTable(conn, stmt, "NEXT_WRITE_ID");
- success &= truncateTable(conn, stmt, "HIVE_LOCKS");
- success &= truncateTable(conn, stmt, "NEXT_LOCK_ID");
- success &= truncateTable(conn, stmt, "COMPACTION_QUEUE");
- success &= truncateTable(conn, stmt, "NEXT_COMPACTION_QUEUE_ID");
- success &= truncateTable(conn, stmt, "COMPLETED_COMPACTIONS");
- success &= truncateTable(conn, stmt, "AUX_TABLE");
- success &= truncateTable(conn, stmt, "WRITE_SET");
- success &= truncateTable(conn, stmt, "REPL_TXN_MAP");
- success &= truncateTable(conn, stmt, "MATERIALIZATION_REBUILD_LOCKS");
- try {
- stmt.executeUpdate("INSERT INTO \"NEXT_TXN_ID\" VALUES(1)");
- stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)");
- stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)");
- } catch (SQLException e) {
- if (!getTableNotExistsErrorCodes().contains(e.getSQLState())) {
- LOG.error("Error initializing NEXT_TXN_ID");
- success = false;
- }
+ resetTxnSequence(conn, stmt);
+ stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)");
+ stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)");
+ } catch (SQLException e) {
+ if (!getTableNotExistsErrorCodes().contains(e.getSQLState())) {
+ LOG.error("Error initializing sequence values", e);
+ success = false;
}
- /*
- * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other
- * table which are not txn related to generate primary key. So if these tables are dropped
- * and other tables are not dropped, then it will create key duplicate error while inserting
- * to other table.
- */
- } finally {
- closeResources(conn, stmt, null);
- }
- if(success) {
- return;
}
+ /*
+ * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other
+ * table which are not txn related to generate primary key. So if these tables are dropped
+ * and other tables are not dropped, then it will create key duplicate error while inserting
+ * to other table.
+ */
+ } finally {
+ closeResources(conn, stmt, null);
+ }
+ if(success) {
+ return;
}
throw new RuntimeException("Failed to clean up txn tables");
}
+ private static void resetTxnSequence(Connection conn, Statement stmt) throws SQLException, MetaException{
+ String dbProduct = conn.getMetaData().getDatabaseProductName();
+ DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct);
+ switch (databaseProduct) {
+
+ case DERBY:
+ stmt.execute("ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH 1");
+ stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+ + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+ + " VALUES(0, 'c', 0, 0, '', '')");
+ break;
+ case MYSQL:
+ stmt.execute("ALTER TABLE \"TXNS\" AUTO_INCREMENT=1");
+ stmt.execute("SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES'");
+ stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+ + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+ + " VALUES(0, 'c', 0, 0, '', '')");
+ break;
+ case POSTGRES:
+ stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+ + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+ + " VALUES(0, 'c', 0, 0, '', '')");
+ stmt.execute("ALTER SEQUENCE \"TXNS_TXN_ID_seq\" RESTART");
+ break;
+ case ORACLE:
+ stmt.execute("ALTER TABLE \"TXNS\" MODIFY \"TXN_ID\" GENERATED BY DEFAULT AS IDENTITY (START WITH 1)");
+ stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+ + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+ + " VALUES(0, 'c', 0, 0, '_', '_')");
+ break;
+ case SQLSERVER:
+ stmt.execute("DBCC CHECKIDENT ('txns', RESEED, 0)");
+ stmt.execute("SET IDENTITY_INSERT TXNS ON");
+ stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+ + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+ + " VALUES(0, 'c', 0, 0, '', '')");
+ break;
+ case OTHER:
+ default:
+ break;
+ }
+ }
+
private static boolean truncateTable(Connection conn, Statement stmt, String name) {
try {
// We can not use actual truncate due to some foreign keys, but we don't expect much data during tests
String dbProduct = conn.getMetaData().getDatabaseProductName();
DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct);
- if (databaseProduct == POSTGRES) {
+ if (databaseProduct == POSTGRES || databaseProduct == MYSQL) {
stmt.execute("DELETE FROM \"" + name + "\"");
} else {
stmt.execute("DELETE FROM " + name);
@@ -503,4 +547,28 @@ public final class TxnDbUtil {
}
return affectedRowsByQuery;
}
+
+ /**
+ + * Checks if the dbms supports the getGeneratedKeys for multiline insert statements.
+ + * @param dbProduct DBMS type
+ + * @return true if supports
+ + * @throws MetaException
+ + */
+ public static boolean supportsGetGeneratedKeys(DatabaseProduct dbProduct) throws MetaException {
+ switch (dbProduct) {
+ case DERBY:
+ case SQLSERVER:
+ // The getGeneratedKeys is not supported for multi line insert
+ return false;
+ case ORACLE:
+ case MYSQL:
+ case POSTGRES:
+ return true;
+ case OTHER:
+ default:
+ String msg = "Unknown database product: " + dbProduct.toString();
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 10a02b1..8fded60 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -55,6 +55,7 @@ import java.util.stream.Collectors;
import javax.sql.DataSource;
+import org.apache.commons.lang3.time.StopWatch;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.commons.lang3.tuple.Pair;
@@ -73,7 +74,59 @@ import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
import org.apache.hadoop.hive.metastore.LockTypeComparator;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.InitializeTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
@@ -174,8 +227,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
static final protected char MINOR_TYPE = 'i';
// Transaction states
- static final protected char TXN_ABORTED = 'a';
- static final protected char TXN_OPEN = 'o';
+ protected static final char TXN_ABORTED = 'a';
+ protected static final char TXN_OPEN = 'o';
+ protected static final char TXN_COMMITTED = 'c';
+
+ private static final char TXN_TMP = '_';
+
//todo: make these like OperationType and remove above char constants
enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN}
@@ -189,6 +246,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private static DataSource connPool;
private static DataSource connPoolMutex;
+ private static final String MANUAL_RETRY = "ManualRetry";
+
// Query definitions
private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " +
"\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " +
@@ -204,11 +263,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " +
"(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" +
" VALUES (%s, ?, ?, ?, ?, %s)";
+ private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " +
+ "(\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") "
+ + "VALUES(?,%s,%s,?,?,?)";
private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " +
- "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " +
- "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
+ "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " +
+ "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " +
- "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0";
+ "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0";
private List<TransactionalMetaStoreEventListener> transactionalListeners;
@@ -273,9 +335,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
protected Configuration conf;
private static DatabaseProduct dbProduct;
private static SQLGenerator sqlGenerator;
+ private static long openTxnTimeOutMillis;
// (End user) Transaction timeout, in milliseconds.
private long timeout;
+ // Timeout for opening a transaction
private int maxBatchSize;
private String identifierQuoteString; // quotes to use for quoting tables, where necessary
@@ -357,6 +421,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS);
maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE);
+ openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS);
+
try {
transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners(
TransactionalMetaStoreEventListener.class,
@@ -377,59 +443,74 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
@RetrySemantics.ReadOnly
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
try {
- // We need to figure out the current transaction number and the list of
- // open transactions. To avoid needing a transaction on the underlying
- // database we'll look at the current transaction number first. If it
- // subsequently shows up in the open list that's ok.
+ // We need to figure out the HighWaterMark and the list of open transactions.
Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
try {
- /**
- * This method can run at READ_COMMITTED as long as long as
- * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
- * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
- * adding corresponding entries into TXNS. The reason is that any txnid below HWM
- * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
+ /*
+ * This method need guarantees from
+ * {@link #openTxns(OpenTxnRequest)} and {@link #commitTxn(CommitTxnRequest)}.
+ * It will look at the TXNS table and find each transaction between the max(txn_id) as HighWaterMark
+ * and the max(txn_id) before the TXN_OPENTXN_TIMEOUT period as LowWaterMark.
+ * Every transaction that is not found between these will be considered as open, since it may appear later.
+ * openTxns must ensure, that no new transaction will be opened with txn_id below LWM and
+ * commitTxn must ensure, that no committed transaction will be removed before the time period expires.
*/
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"";
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction tables not properly " +
- "initialized, no record found in next_txn_id");
- }
- long hwm = rs.getLong(1);
- if (rs.wasNull()) {
- throw new MetaException("Transaction tables not properly " +
- "initialized, null record found in next_txn_id");
- }
- close(rs);
List<TxnInfo> txnInfos = new ArrayList<>();
- //need the WHERE clause below to ensure consistent results with READ_COMMITTED
- s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " +
- "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm;
+
+ String s =
+ "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", "
+ + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")"
+ + "FROM \"TXNS\" ORDER BY \"TXN_ID\"";
LOG.debug("Going to execute query<" + s + ">");
rs = stmt.executeQuery(s);
+ /*
+ * We can use the maximum txn_id from the TXNS table as high water mark, since the commitTxn and the Initiator
+ * guarantees, that the transaction with the highest txn_id will never be removed from the TXNS table.
+ * If there is a pending openTxns, that is already acquired it's sequenceId but not yet committed the insert
+ * into the TXNS table, will have either a lower txn_id than HWM and will be listed in the openTxn list,
+ * or will have a higher txn_id and don't effect this getOpenTxns() call.
+ */
+ long hwm = 0;
+ long openTxnLowBoundary = 0;
+
while (rs.next()) {
+ long txnId = rs.getLong(1);
+ long age = rs.getLong(7);
+ hwm = txnId;
+ if (age < getOpenTxnTimeOutMillis()) {
+ // We will consider every gap as an open transaction from the previous txnId
+ openTxnLowBoundary++;
+ while (txnId > openTxnLowBoundary) {
+ // Add an empty open transaction for every missing value
+ txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null));
+ openTxnLowBoundary++;
+ }
+ } else {
+ openTxnLowBoundary = txnId;
+ }
char c = rs.getString(2).charAt(0);
TxnState state;
switch (c) {
- case TXN_ABORTED:
- state = TxnState.ABORTED;
- break;
+ case TXN_COMMITTED:
+ // This is only here, to avoid adding this txnId as possible gap
+ continue;
- case TXN_OPEN:
- state = TxnState.OPEN;
- break;
+ case TXN_ABORTED:
+ state = TxnState.ABORTED;
+ break;
- default:
- throw new MetaException("Unexpected transaction state " + c +
- " found in txns table");
+ case TXN_OPEN:
+ state = TxnState.OPEN;
+ break;
+
+ default:
+ throw new MetaException("Unexpected transaction state " + c + " found in txns table");
}
- TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4));
+ TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4));
txnInfo.setStartedTime(rs.getLong(5));
txnInfo.setLastHeartbeatTime(rs.getLong(6));
txnInfos.add(txnInfo);
@@ -441,8 +522,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(dbConn, e, "getOpenTxnsInfo");
- throw new MetaException("Unable to select from transaction database: " + getMessage(e)
- + StringUtils.stringifyException(e));
+ throw new MetaException(
+ "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e));
} finally {
close(rs, stmt, dbConn);
}
@@ -455,42 +536,47 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
@RetrySemantics.ReadOnly
public GetOpenTxnsResponse getOpenTxns() throws MetaException {
try {
- // We need to figure out the current transaction number and the list of
- // open transactions. To avoid needing a transaction on the underlying
- // database we'll look at the current transaction number first. If it
- // subsequently shows up in the open list that's ok.
+ // We need to figure out the current transaction number and the list of open transactions.
Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
try {
- /**
+ /*
* This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
*/
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"";
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction tables not properly " +
- "initialized, no record found in next_txn_id");
- }
- long hwm = rs.getLong(1);
- if (rs.wasNull()) {
- throw new MetaException("Transaction tables not properly " +
- "initialized, null record found in next_txn_id");
- }
- close(rs);
+
List<Long> openList = new ArrayList<>();
- //need the WHERE clause below to ensure consistent results with READ_COMMITTED
- s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm + " ORDER BY \"TXN_ID\"";
+ String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", "
+ + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")"
+ + " FROM \"TXNS\" ORDER BY \"TXN_ID\"";
LOG.debug("Going to execute query<" + s + ">");
rs = stmt.executeQuery(s);
+ long hwm = 0;
+ long openTxnLowBoundary = 0;
long minOpenTxn = Long.MAX_VALUE;
BitSet abortedBits = new BitSet();
while (rs.next()) {
long txnId = rs.getLong(1);
+ long age = rs.getLong(4);
+ hwm = txnId;
+ if (age < getOpenTxnTimeOutMillis()) {
+ // We will consider every gap as an open transaction from the previous txnId
+ openTxnLowBoundary++;
+ while (txnId > openTxnLowBoundary) {
+ // Add an empty open transaction for every missing value
+ openList.add(openTxnLowBoundary);
+ minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary);
+ openTxnLowBoundary++;
+ }
+ } else {
+ openTxnLowBoundary = txnId;
+ }
char txnState = rs.getString(2).charAt(0);
+ if (txnState == TXN_COMMITTED) {
+ continue;
+ }
if (txnState == TXN_OPEN) {
minOpenTxn = Math.min(minOpenTxn, txnId);
}
@@ -506,7 +592,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
dbConn.rollback();
ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer);
- if(minOpenTxn < Long.MAX_VALUE) {
+ if (minOpenTxn < Long.MAX_VALUE) {
otr.setMin_open_txn(minOpenTxn);
}
return otr;
@@ -554,13 +640,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Connection dbConn = null;
Statement stmt = null;
try {
- lockInternal();
- /**
+ /*
* To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
- * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
- * Also, advancing the counter must work when multiple metastores are running.
- * SELECT ... FOR UPDATE is used to prevent
- * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
+ * that looking at the TXNS table every open transaction could be identified below a given High Water Mark.
+ * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause
+ * performance degradation with high transaction load.
+ * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every
+ * transaction missing from the TXNS table in that period open, and prevent opening transaction outside
+ * the period.
+ * Example: At t[0] there is one open transaction in the TXNS table, T[1].
+ * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10].
+ * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3].
+ * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s
+ * open transaction list. T[1] because it is presented as open in TXNS,
+ * T[2] because it is a missing sequence.
*
* In the current design, there can be several metastore instances running in a given Warehouse.
* This makes ideas like reserving a range of IDs to save trips to DB impossible. For example,
@@ -573,35 +666,67 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* set could support a write-through cache for added performance.
*/
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- // Make sure the user has not requested an insane amount of txns.
- int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
- if (numTxns > maxTxns) numTxns = maxTxns;
-
stmt = dbConn.createStatement();
- List<Long> txnIds = openTxns(dbConn, stmt, rqst);
+ /*
+ * The openTxn and commitTxn must be mutexed, when committing a not read only transaction.
+ * This is achieved by requesting a shared table lock here, and an exclusive one at commit.
+ * Since table locks are working in Derby, we don't need the lockInternal call here.
+ * Example: Suppose we have two transactions with update like x = x+1.
+ * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3]
+ * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2],
+ * and we will have a lost update problem
+ */
+ acquireTxnLock(stmt, true);
+ // Measure the time from acquiring the sequence value, till committing in the TXNS table
+ StopWatch generateTransactionWatch = new StopWatch();
+ generateTransactionWatch.start();
+
+ List<Long> txnIds = openTxns(dbConn, rqst);
LOG.debug("Going to commit");
dbConn.commit();
+ generateTransactionWatch.stop();
+ long elapsedMillis = generateTransactionWatch.getTime(TimeUnit.MILLISECONDS);
+ TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT;
+ if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis) {
+ /*
+ * The commit was too slow, we can not allow this to continue (except if it is read only,
+ * since that can not cause dirty reads).
+ * When calculating the snapshot for a given transaction, we look back for possible open transactions
+ * (that are not yet committed in the TXNS table), for TXN_OPENTXN_TIMEOUT period.
+ * We can not allow a write transaction, that was slower than TXN_OPENTXN_TIMEOUT to continue,
+ * because there can be other transactions running, that didn't considered this transactionId open,
+ * this could cause dirty reads.
+ */
+ LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting transactionIds: {}", elapsedMillis, txnIds);
+ deleteInvalidOpenTransactions(dbConn, txnIds);
+ /*
+ * We do not throw RetryException directly, to not circumvent the max retry limit
+ */
+ throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY);
+ }
return new OpenTxnsResponse(txnIds);
} catch (SQLException e) {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
- throw new MetaException("Unable to select from transaction database "
- + StringUtils.stringifyException(e));
+ throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e));
} finally {
close(null, stmt, dbConn);
- unlockInternal();
}
} catch (RetryException e) {
return openTxns(rqst);
}
}
- private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst)
+ private List<Long> openTxns(Connection dbConn, OpenTxnRequest rqst)
throws SQLException, MetaException {
int numTxns = rqst.getNum_txns();
- ResultSet rs = null;
+ // Make sure the user has not requested an insane amount of txns.
+ int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
+ if (numTxns > maxTxns) {
+ numTxns = maxTxns;
+ }
List<PreparedStatement> insertPreparedStmts = null;
TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT;
try {
@@ -620,51 +745,54 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
txnType = TxnType.REPL_CREATED;
}
- String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"");
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction database not properly " +
- "configured, can't find next transaction id.");
- }
- long first = rs.getLong(1);
- s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns);
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
-
List<Long> txnIds = new ArrayList<>(numTxns);
-
- List<String> rows = new ArrayList<>();
- List<String> params = new ArrayList<>();
- params.add(rqst.getUser());
- params.add(rqst.getHostname());
- List<List<String>> paramsList = new ArrayList<>(numTxns);
-
- for (long i = first; i < first + numTxns; i++) {
- txnIds.add(i);
- rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + ","
- + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue());
- paramsList.add(params);
- }
- insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
- "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", "
- + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")",
- rows, paramsList);
- for (PreparedStatement pst : insertPreparedStmts) {
- pst.execute();
+ /*
+ * The getGeneratedKeys are not supported in every dbms, after executing a multi line insert.
+ * But it is supported in every used dbms for single line insert, even if the metadata says otherwise.
+ * If the getGeneratedKeys are not supported first we insert a random batchId in the TXN_META_INFO field,
+ * then the keys are selected beck with that batchid.
+ */
+ boolean genKeySupport = TxnDbUtil.supportsGetGeneratedKeys(dbProduct);
+ genKeySupport = genKeySupport || (numTxns == 1);
+
+ String insertQuery = String.format(TXNS_INSERT_QRY, TxnDbUtil.getEpochFn(dbProduct),
+ TxnDbUtil.getEpochFn(dbProduct));
+ LOG.debug("Going to execute insert <" + insertQuery + ">");
+ try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new String[] {"TXN_ID"})) {
+ String state = genKeySupport ? Character.toString(TXN_OPEN) : Character.toString(TXN_TMP);
+ if (numTxns == 1) {
+ ps.setString(1, state);
+ ps.setString(2, rqst.getUser());
+ ps.setString(3, rqst.getHostname());
+ ps.setInt(4, txnType.getValue());
+ txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, false));
+ } else {
+ for (int i = 0; i < numTxns; ++i) {
+ ps.setString(1, state);
+ ps.setString(2, rqst.getUser());
+ ps.setString(3, rqst.getHostname());
+ ps.setInt(4, txnType.getValue());
+ ps.addBatch();
+
+ if ((i + 1) % maxBatchSize == 0) {
+ txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, true));
+ }
+ }
+ if (numTxns % maxBatchSize != 0) {
+ txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, true));
+ }
+ }
}
+ assert txnIds.size() == numTxns;
+
if (rqst.isSetReplPolicy()) {
- List<String> rowsRepl = new ArrayList<>();
- for (PreparedStatement pst : insertPreparedStmts) {
- pst.close();
- }
- insertPreparedStmts.clear();
- params.clear();
- paramsList.clear();
+ List<String> rowsRepl = new ArrayList<>(numTxns);
+ List<String> params = new ArrayList<>(1);
+ List<List<String>> paramsList = new ArrayList<>(numTxns);
params.add(rqst.getReplPolicy());
for (int i = 0; i < numTxns; i++) {
- rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+ rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
paramsList.add(params);
}
@@ -687,10 +815,125 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pst.close();
}
}
- close(rs);
}
}
+ private List<Long> executeTxnInsertBatchAndExtractGeneratedKeys(Connection dbConn, boolean genKeySupport,
+ PreparedStatement ps, boolean batch) throws SQLException {
+ List<Long> txnIds = new ArrayList<>();
+ if (batch) {
+ ps.executeBatch();
+ } else {
+ // For slight performance advantage we do not use the executeBatch, when we only have one row
+ ps.execute();
+ }
+ if (genKeySupport) {
+ try (ResultSet generatedKeys = ps.getGeneratedKeys()) {
+ while (generatedKeys.next()) {
+ txnIds.add(generatedKeys.getLong(1));
+ }
+ }
+ } else {
+ try (PreparedStatement pstmt =
+ dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = ?")) {
+ pstmt.setString(1, Character.toString(TXN_TMP));
+ try (ResultSet rs = pstmt.executeQuery()) {
+ while (rs.next()) {
+ txnIds.add(rs.getLong(1));
+ }
+ }
+ }
+ try (PreparedStatement pstmt = dbConn
+ .prepareStatement("UPDATE \"TXNS\" SET \"TXN_STATE\" = ? WHERE \"TXN_STATE\" = ?")) {
+ pstmt.setString(1, Character.toString(TXN_OPEN));
+ pstmt.setString(2, Character.toString(TXN_TMP));
+ pstmt.executeUpdate();
+ }
+ }
+ return txnIds;
+ }
+
+ private void deleteInvalidOpenTransactions(Connection dbConn, List<Long> txnIds) throws MetaException {
+ if (txnIds.size() == 0) {
+ return;
+ }
+ try {
+ Statement stmt = null;
+ try {
+ stmt = dbConn.createStatement();
+
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder();
+ StringBuilder suffix = new StringBuilder();
+ prefix.append("DELETE FROM \"TXNS\" WHERE ");
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"TXN_ID\"", false, false);
+ for (String s : queries) {
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ }
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(dbConn, e, "deleteInvalidOpenTransactions(" + txnIds + ")");
+ throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e));
+ } finally {
+ closeStmt(stmt);
+ }
+ } catch (RetryException ex) {
+ deleteInvalidOpenTransactions(dbConn, txnIds);
+ }
+ }
+
+ @Override
+ public long getOpenTxnTimeOutMillis() {
+ return openTxnTimeOutMillis;
+ }
+
+ @Override
+ public void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis) {
+ this.openTxnTimeOutMillis = openTxnTimeOutMillis;
+ }
+
+ protected long getOpenTxnTimeoutLowBoundaryTxnId(Connection dbConn) throws MetaException, SQLException {
+ long maxTxnId;
+ String s =
+ "SELECT MAX(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(dbProduct) + " - "
+ + openTxnTimeOutMillis + ")";
+ try (Statement stmt = dbConn.createStatement()) {
+ LOG.debug("Going to execute query <" + s + ">");
+ try (ResultSet maxTxnIdRs = stmt.executeQuery(s)) {
+ maxTxnIdRs.next();
+ maxTxnId = maxTxnIdRs.getLong(1);
+ if (maxTxnIdRs.wasNull()) {
+ /*
+ * TXNS always contains at least one transaction,
+ * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted
+ */
+ throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)");
+ }
+ }
+ }
+ return maxTxnId;
+ }
+
+ private long getHighWaterMark(Statement stmt) throws SQLException, MetaException {
+ String s = "SELECT MAX(\"TXN_ID\") FROM \"TXNS\"";
+ LOG.debug("Going to execute query <" + s + ">");
+ long maxOpenTxnId;
+ try (ResultSet maxOpenTxnIdRs = stmt.executeQuery(s)) {
+ maxOpenTxnIdRs.next();
+ maxOpenTxnId = maxOpenTxnIdRs.getLong(1);
+ if (maxOpenTxnIdRs.wasNull()) {
+ /*
+ * TXNS always contains at least one transaction,
+ * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted
+ */
+ throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)");
+ }
+ }
+ return maxOpenTxnId;
+ }
+
private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Connection dbConn)
throws SQLException {
PreparedStatement pst = null;
@@ -716,7 +959,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString());
return targetTxnIdList;
- } catch (SQLException e) {
+ } catch (SQLException e) {
LOG.warn("failed to get target txn ids " + e.getMessage());
throw e;
} finally {
@@ -1159,7 +1402,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* even if it includes all of its columns
*
* First insert into write_set using a temporary commitID, which will be updated in a separate call,
- * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}.
+ * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}.
* This should decrease the scope of the S4U lock on the next_txn_id table.
*/
Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
@@ -1173,13 +1416,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* at the same time and no new txns start until all 3 commit.
* We could've incremented the sequence for commitId as well but it doesn't add anything functionally.
*/
- try (ResultSet commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""))) {
- if (!commitIdRs.next()) {
- throw new IllegalStateException("No rows found in NEXT_TXN_ID");
- }
- commitId = commitIdRs.getLong(1);
- }
-
+ acquireTxnLock(stmt, false);
+ commitId = getHighWaterMark(stmt);
/**
* see if there are any overlapping txns that wrote the same element, i.e. have a conflict
* Since entire commit operation is mutexed wrt other start/commit ops,
@@ -1211,9 +1449,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new TxnAbortedException(msg);
}
}
- }
- else {
- /**
+ } else {
+ /*
* current txn didn't update/delete anything (may have inserted), so just proceed with commit
*
* We only care about commit id for write txns, so for RO (when supported) txns we don't
@@ -1223,6 +1460,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* If RO < W, then there is no reads-from relationship.
* In replication flow we don't expect any write write conflict as it should have been handled at source.
*/
+ assert true;
}
if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) {
@@ -1253,7 +1491,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
}
- updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId);
+ updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId);
if (rqst.isSetKeyValue()) {
updateKeyValueAssociatedWithTxn(rqst, stmt);
}
@@ -1272,8 +1510,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
- closeStmt(stmt);
- closeDbConn(dbConn);
+ close(null, stmt, dbConn);
unlockInternal();
}
} catch (RetryException e) {
@@ -1338,7 +1575,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
- private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, long tempId) throws SQLException {
+ private void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType,
+ Long commitId, long tempId) throws SQLException {
List<String> queryBatch = new ArrayList<>(5);
// update write_set with real commitId
if (commitId != null) {
@@ -1350,7 +1588,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
queryBatch.add("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid);
}
queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
- queryBatch.add("DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid);
+ // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate
+ queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid);
queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
// execute all in one batch
@@ -1431,7 +1670,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (numAbortedWrites > 0) {
// Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted.
- List<Long> txnIds = openTxns(dbConn, stmt,
+ // We don't use the txnLock, all of these transactions will be aborted in this one rdbm transaction
+ // So they will not effect the commitTxn in any way
+ List<Long> txnIds = openTxns(dbConn,
new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName()));
assert(numAbortedWrites == txnIds.size());
@@ -1491,7 +1732,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
closeStmt(pStmt);
close(rs, stmt, dbConn);
- if(handle != null) {
+ if (handle != null) {
handle.releaseLocks();
}
unlockInternal();
@@ -1533,7 +1774,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String[] names = TxnUtils.getDbTableName(fullTableName);
assert names.length == 2;
List<String> params = Arrays.asList(names[0], names[1]);
- String s = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId;
+ String s =
+ "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND "
+ + "\"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = "+ writeId;
pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]),
quoteString(names[1]));
@@ -2001,46 +2244,37 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
@Override
@RetrySemantics.SafeToRetry
- public void performWriteSetGC() {
+ public void performWriteSetGC() throws MetaException {
Connection dbConn = null;
Statement stmt = null;
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- rs = stmt.executeQuery("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"");
- if(!rs.next()) {
- throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
- }
- long highestAllocatedTxnId = rs.getLong(1);
- close(rs);
+
+ long minOpenTxn;
rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN));
- if(!rs.next()) {
+ if (!rs.next()) {
throw new IllegalStateException("Scalar query returned no rows?!?!!");
}
- long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
- long lowestOpenTxnId = rs.getLong(1);
- if(rs.wasNull()) {
- //if here then there are no Open txns and highestAllocatedTxnId must be
- //resolved (i.e. committed or aborted), either way
- //there are no open txns with id <= highestAllocatedTxnId
- //the +1 is there because "delete ..." below has < (which is correct for the case when
- //there is an open txn
- //Concurrency: even if new txn starts (or starts + commits) it is still true that
- //there are no currently open txns that overlap with any committed txn with
- //commitId <= commitHighWaterMark (as set on next line). So plain READ_COMMITTED is enough.
- commitHighWaterMark = highestAllocatedTxnId + 1;
- }
- else {
- commitHighWaterMark = lowestOpenTxnId;
+ minOpenTxn = rs.getLong(1);
+ if (rs.wasNull()) {
+ minOpenTxn = Long.MAX_VALUE;
}
+ long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
+ /**
+ * We try to find the highest transactionId below everything was committed or aborted.
+ * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary,
+ * because it is guaranteed there won't be open transactions below that.
+ */
+ long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1);
+ LOG.debug("Perform WriteSet GC with minOpenTxn {}, lowWaterMark {}", minOpenTxn, lowWaterMark);
int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark);
LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt);
dbConn.commit();
} catch (SQLException ex) {
LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
- }
- finally {
+ } finally {
close(rs, stmt, dbConn);
}
}
@@ -4154,7 +4388,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private void checkQFileTestHack(){
boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ||
- MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST);
+ MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST);
if (hackOn) {
LOG.info("Hacking in canned values for transaction manager");
// Set up the transaction/locking db in the derby metastore
@@ -4525,7 +4759,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
/**
- * Returns the state of the transaction iff it's able to determine it. Some cases where it cannot:
+ * Returns the state of the transaction if it's able to determine it. Some cases where it cannot:
* 1. txnid was Aborted/Committed and then GC'd (compacted)
* 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS)
*/
@@ -4550,6 +4784,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (txnState == TXN_ABORTED) {
return TxnStatus.ABORTED;
}
+ if (txnState == TXN_COMMITTED) {
+ return TxnStatus.COMMITTED;
+ }
assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN";
}
return TxnStatus.OPEN;
@@ -4929,11 +5166,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
static boolean isRetryable(Configuration conf, Exception ex) {
if(ex instanceof SQLException) {
SQLException sqlException = (SQLException)ex;
- if("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
+ if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) {
+ // Manual retry exception was thrown
+ return true;
+ }
+ if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
//in MSSQL this means Communication Link Failure
return true;
}
- if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
+ if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
sqlException.getMessage().contains("consistent read failure; rollback data not available")) {
return true;
}
@@ -5112,10 +5353,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return acquireLock(key);
}
}
+
+ @Override
public void acquireLock(String key, LockHandle handle) {
//the idea is that this will use LockHandle.dbConn
throw new NotImplementedException("acquireLock(String, LockHandle) is not implemented");
}
+
+ /**
+ * Acquire the global txn lock, used to mutex the openTxn and commitTxn.
+ * @param stmt Statement to execute the lock on
+ * @param shared either SHARED_READ or EXCLUSIVE
+ * @throws SQLException
+ */
+ private void acquireTxnLock(Statement stmt, boolean shared) throws SQLException, MetaException {
+ String sqlStmt = sqlGenerator.createTxnLockStatement(shared);
+ stmt.execute(sqlStmt);
+ LOG.debug("TXN lock locked by {} in mode {}", quoteString(TxnHandler.hostname), shared);
+ }
+
private static final class LockHandleImpl implements LockHandle {
private final Connection dbConn;
private final Statement stmt;
@@ -5152,6 +5408,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+
private static class NoPoolConnectionPool implements DataSource {
// Note that this depends on the fact that no-one in this class calls anything but
// getConnection. If you want to use any of the Logger or wrap calls you'll have to
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 87130a5..e8ac71a 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -44,7 +44,7 @@ public interface TxnStore extends Configurable {
/**
* Prefix for key when committing with a key/value.
*/
- public static final String TXN_KEY_START = "_meta";
+ String TXN_KEY_START = "_meta";
enum MUTEX_KEY {
Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
@@ -380,18 +380,19 @@ public interface TxnStore extends Configurable {
/**
* Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
- * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+ * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
*/
@RetrySemantics.SafeToRetry
void cleanTxnToWriteIdTable() throws MetaException;
/**
- * Clean up aborted transactions from txns that have no components in txn_components. The reson such
- * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
- * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+ * Clean up aborted or committed transactions from txns that have no components in txn_components. The reason such
+ * txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called,
+ * or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window.
*/
@RetrySemantics.SafeToRetry
- void cleanEmptyAbortedTxns() throws MetaException;
+ void cleanEmptyAbortedAndCommittedTxns() throws MetaException;
/**
* This will take all entries assigned to workers
@@ -442,7 +443,7 @@ public interface TxnStore extends Configurable {
* transaction metadata once it becomes unnecessary.
*/
@RetrySemantics.SafeToRetry
- void performWriteSetGC();
+ void performWriteSetGC() throws MetaException;
/**
* Determine if there are enough consecutive failures compacting a table or partition that no
@@ -461,6 +462,12 @@ public interface TxnStore extends Configurable {
@VisibleForTesting
long setTimeout(long milliseconds);
+ @VisibleForTesting
+ long getOpenTxnTimeOutMillis();
+
+ @VisibleForTesting
+ void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis);
+
@RetrySemantics.Idempotent
MutexAPI getMutexAPI();
@@ -473,7 +480,7 @@ public interface TxnStore extends Configurable {
*/
interface MutexAPI {
/**
- * The {@code key} is name of the lock. Will acquire and exclusive lock or block. It retuns
+ * The {@code key} is name of the lock. Will acquire an exclusive lock or block. It returns
* a handle which must be used to release the lock. Each invocation returns a new handle.
*/
LockHandle acquireLock(String key) throws MetaException;
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 366b6f0..727abce 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -234,9 +234,8 @@ CREATE TABLE "APP"."CTLGS" (
"CREATE_TIME" INTEGER);
-- Insert a default value. The location is TBD. Hive will fix this when it starts
-INSERT INTO "APP"."CTLGS"
- ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME")
- VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL);
+INSERT INTO "APP"."CTLGS" ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME")
+VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL);
-- ----------------------------------------------
-- DML Statements
@@ -524,7 +523,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED
-- Transaction and Lock Tables
-- ----------------------------
CREATE TABLE TXNS (
- TXN_ID bigint PRIMARY KEY,
+ TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
TXN_STATE char(1) NOT NULL,
TXN_STARTED bigint NOT NULL,
TXN_LAST_HEARTBEAT bigint NOT NULL,
@@ -536,6 +535,9 @@ CREATE TABLE TXNS (
TXN_TYPE integer
);
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ VALUES(0, 'c', 0, 0, '', '');
+
CREATE TABLE TXN_COMPONENTS (
TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID),
TC_DATABASE varchar(128) NOT NULL,
@@ -559,10 +561,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
-CREATE TABLE NEXT_TXN_ID (
- NTXN_NEXT bigint NOT NULL
+CREATE TABLE TXN_LOCK_TBL (
+ TXN_LOCK bigint NOT NULL
);
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
CREATE TABLE HIVE_LOCKS (
HL_LOCK_EXT_ID bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 8a3cd56..db2f43d 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -68,5 +68,19 @@ ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000);
ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
DROP TABLE MIN_HISTORY_LEVEL;
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ VALUES (1000000000, 'c', 0, 0, '_', '_');
+ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP bigint;
+UPDATE TXNS SET TXN_ID_TMP=TXN_ID;
+ALTER TABLE TXNS DROP COLUMN TXN_ID;
+ALTER TABLE TXNS ADD COLUMN TXN_ID BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY (START WITH 1000000001, INCREMENT BY 1);
+UPDATE TXNS SET TXN_ID=TXN_ID_TMP;
+ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP;
+
+RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL;
+
-- This needs to be the last thing done. Insert any changes above this line.
UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 2e01777..6906bdf 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1096,28 +1096,31 @@ CREATE TABLE NEXT_LOCK_ID(
INSERT INTO NEXT_LOCK_ID VALUES(1);
-CREATE TABLE NEXT_TXN_ID(
- NTXN_NEXT bigint NOT NULL
+CREATE TABLE TXN_LOCK_TBL(
+ TXN_LOCK bigint NOT NULL
);
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
CREATE TABLE TXNS(
- TXN_ID bigint NOT NULL,
- TXN_STATE char(1) NOT NULL,
- TXN_STARTED bigint NOT NULL,
- TXN_LAST_HEARTBEAT bigint NOT NULL,
- TXN_USER nvarchar(128) NOT NULL,
- TXN_HOST nvarchar(128) NOT NULL,
+ TXN_ID bigint NOT NULL IDENTITY(1,1),
+ TXN_STATE char(1) NOT NULL,
+ TXN_STARTED bigint NOT NULL,
+ TXN_LAST_HEARTBEAT bigint NOT NULL,
+ TXN_USER nvarchar(128) NOT NULL,
+ TXN_HOST nvarchar(128) NOT NULL,
TXN_AGENT_INFO nvarchar(128) NULL,
TXN_META_INFO nvarchar(128) NULL,
TXN_HEARTBEAT_COUNT int NULL,
TXN_TYPE int NULL,
PRIMARY KEY CLUSTERED
(
- TXN_ID ASC
+ TXN_ID ASC
)
);
+SET IDENTITY_INSERT TXNS ON;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ VALUES(0, 'c', 0, 0, '', '');
CREATE TABLE TXN_COMPONENTS(
TC_TXNID bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 9f39515..7583c5c 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -68,9 +68,56 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E
ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000);
-- HIVE-23107
-ALTER TABLE COMPACTION_QUEUE bigint CQ_NEXT_TXN_ID NOT NULL;
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint NOT NULL;
DROP TABLE MIN_HISTORY_LEVEL;
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS;
+
+CREATE TABLE TMP_TXNS(
+ TXN_ID bigint NOT NULL IDENTITY(1,1),
+ TXN_STATE char(1) NOT NULL,
+ TXN_STARTED bigint NOT NULL,
+ TXN_LAST_HEARTBEAT bigint NOT NULL,
+ TXN_USER nvarchar(128) NOT NULL,
+ TXN_HOST nvarchar(128) NOT NULL,
+ TXN_AGENT_INFO nvarchar(128) NULL,
+ TXN_META_INFO nvarchar(128) NULL,
+ TXN_HEARTBEAT_COUNT int NULL,
+ TXN_TYPE int NULL,
+PRIMARY KEY CLUSTERED
+(
+ TXN_ID ASC
+)
+);
+
+SET IDENTITY_INSERT TMP_TXNS ON;
+INSERT INTO TMP_TXNS (TXN_ID,TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE)
+SELECT TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE FROM TXNS TABLOCKX;
+
+SET IDENTITY_INSERT TMP_TXNS OFF;
+
+CREATE TABLE TMP_TXN_COMPONENTS(
+ TC_TXNID bigint NOT NULL,
+ TC_DATABASE nvarchar(128) NOT NULL,
+ TC_TABLE nvarchar(128) NULL,
+ TC_PARTITION nvarchar(767) NULL,
+ TC_OPERATION_TYPE char(1) NOT NULL,
+ TC_WRITEID bigint
+);
+INSERT INTO TMP_TXN_COMPONENTS SELECT * FROM TXN_COMPONENTS;
+
+DROP TABLE TXN_COMPONENTS;
+DROP TABLE TXNS;
+
+Exec sp_rename 'TMP_TXNS', 'TXNS';
+Exec sp_rename 'TMP_TXN_COMPONENTS', 'TXN_COMPONENTS';
+Exec sp_rename 'NEXT_TXN_ID', 'TXN_LOCK_TBL';
+
+ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
+CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 0512a45..b7f423c 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -989,7 +989,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING
-- Transaction and Lock Tables
-- ----------------------------
CREATE TABLE TXNS (
- TXN_ID bigint PRIMARY KEY,
+ TXN_ID bigint PRIMARY KEY AUTO_INCREMENT,
TXN_STATE char(1) NOT NULL,
TXN_STARTED bigint NOT NULL,
TXN_LAST_HEARTBEAT bigint NOT NULL,
@@ -1001,6 +1001,9 @@ CREATE TABLE TXNS (
TXN_TYPE int
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ VALUES(0, 'c', 0, 0, '', '');
+
CREATE TABLE TXN_COMPONENTS (
TC_TXNID bigint NOT NULL,
TC_DATABASE varchar(128) NOT NULL,
@@ -1025,10 +1028,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION) USING BTREE;
-CREATE TABLE NEXT_TXN_ID (
- NTXN_NEXT bigint NOT NULL
+CREATE TABLE TXN_LOCK_TBL (
+ TXN_LOCK bigint NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
CREATE TABLE HIVE_LOCKS (
HL_LOCK_EXT_ID bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 4b82e36..78a841a 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -72,6 +72,23 @@ ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET l
ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
DROP TABLE MIN_HISTORY_LEVEL;
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ SELECT IFNULL(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS;
+ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP BIGINT;
+UPDATE TXNS SET TXN_ID_TMP=TXN_ID;
+SET FOREIGN_KEY_CHECKS = 0;
+ALTER TABLE TXNS MODIFY TXN_ID BIGINT AUTO_INCREMENT;
+SET FOREIGN_KEY_CHECKS = 1;
+UPDATE TXNS SET TXN_ID=TXN_ID_TMP;
+ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP;
+SELECT MAX(TXN_ID) + 1 INTO @AutoInc FROM TXNS;
+SET @s:=CONCAT('ALTER TABLE TXNS AUTO_INCREMENT=', @AutoInc);
+PREPARE stmt FROM @s;
+EXECUTE stmt;
+DEALLOCATE PREPARE stmt;
+RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index db398e5..0082dcd 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -972,7 +972,7 @@ ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_FK2 FOREIGN KEY (TBL_ID
-- Transaction and lock tables
------------------------------
CREATE TABLE TXNS (
- TXN_ID NUMBER(19) PRIMARY KEY,
+ TXN_ID NUMBER(19) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
TXN_STATE char(1) NOT NULL,
TXN_STARTED NUMBER(19) NOT NULL,
TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL,
@@ -984,6 +984,9 @@ CREATE TABLE TXNS (
TXN_TYPE number(10)
) ROWDEPENDENCIES;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ VALUES(0, 'c', 0, 0, '_', '_');
+
CREATE TABLE TXN_COMPONENTS (
TC_TXNID NUMBER(19) NOT NULL REFERENCES TXNS (TXN_ID),
TC_DATABASE VARCHAR2(128) NOT NULL,
@@ -1007,10 +1010,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
-CREATE TABLE NEXT_TXN_ID (
- NTXN_NEXT NUMBER(19) NOT NULL
+CREATE TABLE TXN_LOCK_TBL (
+ TXN_LOCK NUMBER(19) NOT NULL
);
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
CREATE TABLE HIVE_LOCKS (
HL_LOCK_EXT_ID NUMBER(19) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index 1be83fc..51f52a4 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -72,6 +72,21 @@ ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL;
ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID NUMBER(19);
DROP TABLE MIN_HISTORY_LEVEL;
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+ VALUES (1000000000, 'c', 0, 0, '_', '_');
+-- DECLARE max_txn NUMBER;
+-- BEGIN
+-- SELECT MAX(TXN_ID) + 1 INTO max_txn FROM TXNS;
+-- EXECUTE IMMEDIATE 'CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH ' || max_txn || ' CACHE 20';
+-- END;
+CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH 1000000001 CACHE 20;
+ALTER TABLE TXNS MODIFY TXN_ID default TXNS_TXN_ID_SEQ.nextval;
+
+RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL;
+
-- These lines need to be last. Insert any changes above.
UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual;
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index e6e3016..717e707 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1658,7 +1658,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC;
-- Transaction and lock tables
------------------------------
CREATE TABLE "TXNS" (
- "TXN_ID" bigint PRIMARY KEY,
+ "TXN_ID" bigserial PRIMARY KEY,
"TXN_STATE" char(1) NOT NULL,
"TXN_STARTED" bigint NOT NULL,
"TXN_LAST_HEARTBEAT" bigint NOT NULL,
@@ -1669,6 +1669,8 @@ CREATE TABLE "TXNS" (
"TXN_HEARTBEAT_COUNT" integer,
"TXN_TYPE" integer
);
+INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST")
+ VALUES(0, 'c', 0, 0, '', '');
CREATE TABLE "TXN_COMPONENTS" (
"TC_TXNID" bigint NOT NULL REFERENCES "TXNS" ("TXN_ID"),
@@ -1693,10 +1695,10 @@ CREATE TABLE "COMPLETED_TXN_COMPONENTS" (
CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON "COMPLETED_TXN_COMPONENTS" USING btree ("CTC_DATABASE", "CTC_TABLE", "CTC_PARTITION");
-CREATE TABLE "NEXT_TXN_ID" (
- "NTXN_NEXT" bigint NOT NULL
+CREATE TABLE "TXN_LOCK_TBL" (
+ "TXN_LOCK" bigint NOT NULL
);
-INSERT INTO "NEXT_TXN_ID" VALUES(1);
+INSERT INTO "TXN_LOCK_TBL" VALUES(1);
CREATE TABLE "HIVE_LOCKS" (
"HL_LOCK_EXT_ID" bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index b90cecb..63a5c44 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -203,6 +203,15 @@ ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000);
ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NEXT_TXN_ID" bigint;
DROP TABLE "MIN_HISTORY_LEVEL";
+-- HIVE-23048
+INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST")
+ SELECT COALESCE(MAX("CTC_TXNID"),0), 'c', 0, 0, '', '' FROM "COMPLETED_TXN_COMPONENTS";
+CREATE SEQUENCE "TXNS_TXN_ID_SEQ" MINVALUE 0 OWNED BY "TXNS"."TXN_ID";
+select setval('"TXNS_TXN_ID_SEQ"', (SELECT MAX("TXN_ID") FROM "TXNS"));
+ALTER TABLE "TXNS" ALTER "TXN_ID" SET DEFAULT nextval('"TXNS_TXN_ID_SEQ"');
+
+ALTER TABLE "NEXT_TXN_ID" RENAME TO "TXN_LOCK_TBL";
+
-- These lines need to be last. Insert any changes above.
UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;
SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java
index c1a1629..2ebba39 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java
@@ -36,6 +36,7 @@ public abstract class DbInstallBase {
Assert.assertEquals(0, getRule().createUser());
Assert.assertEquals(0, getRule().installAVersion(FIRST_VERSION));
Assert.assertEquals(0, getRule().upgradeToLatest());
+ Assert.assertEquals(0, getRule().validateSchema());
}
protected abstract DatabaseRule getRule();
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java
index a6d22d1..e06011f 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java
@@ -68,11 +68,11 @@ public abstract class DatabaseRule extends ExternalResource {
public DatabaseRule setVerbose(boolean verbose) {
this.verbose = verbose;
return this;
- };
+ }
public String getDb() {
return HIVE_DB;
- };
+ }
/**
* URL to use when connecting as root rather than Hive
@@ -147,7 +147,7 @@ public abstract class DatabaseRule extends ExternalResource {
protected String getDockerContainerName(){
return String.format("metastore-test-%s-install", getDbType());
- };
+ }
private ProcessResults runCmd(String[] cmd, long secondsToWait)
throws IOException, InterruptedException {
@@ -275,7 +275,7 @@ public abstract class DatabaseRule extends ExternalResource {
"-dbType",
getDbType(),
"-userName",
- HIVE_USER,
+ getHiveUser(),
"-passWord",
getHivePassword(),
"-url",
@@ -289,4 +289,20 @@ public abstract class DatabaseRule extends ExternalResource {
createUser();
installLatest();
}
+
+ public int validateSchema() {
+ return new MetastoreSchemaTool().setVerbose(verbose).run(buildArray(
+ "-validate",
+ "-dbType",
+ getDbType(),
+ "-userName",
+ getHiveUser(),
+ "-passWord",
+ getHivePassword(),
+ "-url",
+ getJdbcUrl(),
+ "-driver",
+ getJdbcDriver()
+ ));
+ }
}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java
index 0b070e1..21c5de1 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java
@@ -24,7 +24,7 @@ public class Oracle extends DatabaseRule {
@Override
public String getDockerImageName() {
- return "orangehrm/oracle-xe-11g";
+ return "pvargacl/oracle-xe-18.4.0";
}
@Override
@@ -32,10 +32,6 @@ public class Oracle extends DatabaseRule {
return buildArray(
"-p",
"1521:1521",
- "-e",
- "DEFAULT_SYS_PASS=" + getDbRootPassword(),
- "-e",
- "ORACLE_ALLOW_REMOTE=true",
"-d"
);
}
@@ -72,11 +68,16 @@ public class Oracle extends DatabaseRule {
@Override
public boolean isContainerReady(String logOutput) {
- return logOutput.contains("Oracle started successfully!");
+ return logOutput.contains("DATABASE IS READY TO USE!");
}
@Override
public String getHivePassword() {
return HIVE_PASSWORD;
}
+
+ @Override
+ public String getHiveUser() {
+ return "c##"+ super.getHiveUser();
+ }
}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java
new file mode 100644
index 0000000..8e1794a
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * Test openTxn and getOpenTxnList calls on TxnStore.
+ */
+public class TestOpenTxn {
+
+ private Configuration conf = MetastoreConf.newMetastoreConf();
+ private TxnStore txnHandler;
+
+ @Before
+ public void setUp() throws Exception {
+ // This will init the metastore db
+ txnHandler = TxnUtils.getTxnStore(conf);
+ TxnDbUtil.prepDb(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TxnDbUtil.cleanDb(conf);
+ }
+
+ @Test
+ public void testSingleOpen() throws MetaException {
+ OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+ long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+ Assert.assertEquals(1, txnId);
+ }
+
+ @Test
+ public void testGap() throws Exception {
+ OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+ txnHandler.openTxns(openTxnRequest);
+ long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+ deleteTransaction(second);
+ txnHandler.openTxns(openTxnRequest);
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(3, openTxns.getOpen_txnsSize());
+
+ }
+
+ @Test
+ public void testGapWithOldOpen() throws Exception {
+ OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+ txnHandler.openTxns(openTxnRequest);
+ Thread.sleep(1000);
+ long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+ deleteTransaction(second);
+ txnHandler.openTxns(openTxnRequest);
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(3, openTxns.getOpen_txnsSize());
+ }
+
+ @Test
+ public void testGapWithOldCommit() throws Exception {
+ OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+ long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+ txnHandler.commitTxn(new CommitTxnRequest(first));
+ long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+ deleteTransaction(second);
+ txnHandler.openTxns(openTxnRequest);
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(2, openTxns.getOpen_txnsSize());
+ }
+
+ @Test
+ public void testMultiGapWithOldCommit() throws Exception {
+ OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+ long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+ txnHandler.commitTxn(new CommitTxnRequest(first));
+ long second = txnHandler.openTxns(new OpenTxnRequest(10, "me", "localhost")).getTxn_ids().get(0);
+ deleteTransaction(second, second + 9);
+ txnHandler.openTxns(openTxnRequest);
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(11, openTxns.getOpen_txnsSize());
+ }
+
+ private void deleteTransaction(long txnId) throws SQLException {
+ deleteTransaction(txnId, txnId);
+ }
+
+ private void deleteTransaction(long minTxnId, long maxTxnId) throws SQLException {
+ DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+ DataSource ds = dsp.create(conf);
+ Connection dbConn = ds.getConnection();
+ Statement stmt = dbConn.createStatement();
+ stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId);
+ dbConn.commit();
+ stmt.close();
+ dbConn.close();
+ }
+
+}