You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/04/30 11:27:45 UTC

[hive] branch master updated: HIVE-23048: Use sequences for TXN_ID generation (Peter Varga reviewed by Peter Vary, Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 252fcea  HIVE-23048: Use sequences for TXN_ID generation (Peter Varga reviewed by Peter Vary, Denys Kuzmenko)
252fcea is described below

commit 252fceaa45fa5d987cbae4c3d594566198160ce1
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Thu Apr 30 13:24:14 2020 +0200

    HIVE-23048: Use sequences for TXN_ID generation (Peter Varga reviewed by Peter Vary, Denys Kuzmenko)
---
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java       |   4 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   2 +-
 .../metastore/txn/TestCompactionTxnHandler.java    |  15 +-
 .../hadoop/hive/metastore/txn/TestTxnHandler.java  |  32 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  26 +-
 .../apache/hadoop/hive/ql/TestTxnCommands3.java    |  19 +-
 .../hadoop/hive/ql/TestTxnCommandsForMmTable.java  |  10 +-
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   9 +
 .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java   |  78 +++
 .../hadoop/hive/ql/lockmgr/ITestDbTxnManager.java  |   3 +-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  |  64 +--
 .../TestDbTxnManagerIsolationProperties.java       | 237 ++++++++
 .../hive/ql/txn/compactor/TestInitiator.java       |   7 +-
 .../hadoop/hive/metastore/conf/MetastoreConf.java  |   2 +
 .../hadoop/hive/metastore/tools/SQLGenerator.java  |  38 +-
 .../hive/metastore/txn/CompactionTxnHandler.java   |  44 +-
 .../hadoop/hive/metastore/txn/TxnDbUtil.java       | 158 ++++--
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 597 +++++++++++++++------
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |  23 +-
 .../src/main/sql/derby/hive-schema-4.0.0.derby.sql |  16 +-
 .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql     |  14 +
 .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql |  23 +-
 .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql     |  49 +-
 .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql |  11 +-
 .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql     |  17 +
 .../main/sql/oracle/hive-schema-4.0.0.oracle.sql   |  11 +-
 .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql   |  15 +
 .../sql/postgres/hive-schema-4.0.0.postgres.sql    |  10 +-
 .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql   |   9 +
 .../hive/metastore/dbinstall/DbInstallBase.java    |   1 +
 .../metastore/dbinstall/rules/DatabaseRule.java    |  24 +-
 .../hive/metastore/dbinstall/rules/Oracle.java     |  13 +-
 .../hadoop/hive/metastore/txn/TestOpenTxn.java     | 128 +++++
 33 files changed, 1338 insertions(+), 371 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index a08af7c..deaab89 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -99,8 +99,8 @@ public final class DbTxnManager extends HiveTxnManagerImpl {
 
   private volatile DbLockManager lockMgr = null;
   /**
-   * The Metastore NEXT_TXN_ID.NTXN_NEXT is initialized to 1; it contains the next available
-   * transaction id.  Thus is 1 is first transaction id.
+   * The Metastore TXNS sequence is initialized to 1.
+   * Thus is 1 is first transaction id.
    */
   private volatile long txnId = 0;
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 37a5862..23512e2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -151,7 +151,7 @@ public class Initiator extends MetaStoreCompactorThread {
           recoverFailedCompactions(true);
 
           // Clean anything from the txns table that has no components left in txn_components.
-          txnHandler.cleanEmptyAbortedTxns();
+          txnHandler.cleanEmptyAbortedAndCommittedTxns();
 
           // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns.
           txnHandler.cleanTxnToWriteIdTable();
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 7c8903f..7069dae 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -512,9 +512,14 @@ public class TestCompactionTxnHandler {
     // Check that we are cleaning up the empty aborted transactions
     GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
     assertEquals(3, txnList.getOpen_txnsSize());
-    txnHandler.cleanEmptyAbortedTxns();
+    // Create one aborted for low water mark
+    txnid = openTxn();
+    txnHandler.abortTxn(new AbortTxnRequest(txnid));
+    txnHandler.setOpenTxnTimeOutMillis(1);
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
     txnList = txnHandler.getOpenTxns();
-    assertEquals(2, txnList.getOpen_txnsSize());
+    assertEquals(3, txnList.getOpen_txnsSize());
+    txnHandler.setOpenTxnTimeOutMillis(1000);
 
     rqst = new CompactionRequest("mydb", "foo", CompactionType.MAJOR);
     rqst.setPartitionname("bar");
@@ -529,9 +534,13 @@ public class TestCompactionTxnHandler {
     txnHandler.markCleaned(ci);
 
     txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
-    txnHandler.cleanEmptyAbortedTxns();
+    // The open txn will became the low water mark
+    Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+    txnHandler.setOpenTxnTimeOutMillis(1);
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
     txnList = txnHandler.getOpenTxns();
     assertEquals(3, txnList.getOpen_txnsSize());
+    txnHandler.setOpenTxnTimeOutMillis(1000);
   }
 
   @Test
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index 3916e88..868da0c 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -193,30 +193,40 @@ public class TestTxnHandler {
     boolean gotException = false;
     try {
       txnHandler.abortTxn(new AbortTxnRequest(2));
-    }
-    catch(NoSuchTxnException ex) {
+    } catch(NoSuchTxnException ex) {
       gotException = true;
-      //if this wasn't an empty txn, we'd get a better msg
-      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage());
+      // this is the last committed, so it is still in the txns table
+      Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(2) + " is already committed.", ex.getMessage());
     }
     Assert.assertTrue(gotException);
     gotException = false;
     txnHandler.commitTxn(new CommitTxnRequest(3));
     try {
       txnHandler.abortTxn(new AbortTxnRequest(3));
-    }
-    catch(NoSuchTxnException ex) {
+    } catch(NoSuchTxnException ex) {
       gotException = true;
       //txn 3 is not empty txn, so we get a better msg
       Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage());
     }
     Assert.assertTrue(gotException);
 
+    txnHandler.setOpenTxnTimeOutMillis(1);
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
+    txnHandler.setOpenTxnTimeOutMillis(1000);
     gotException = false;
     try {
-      txnHandler.abortTxn(new AbortTxnRequest(4));
+      txnHandler.abortTxn(new AbortTxnRequest(2));
+    } catch(NoSuchTxnException ex) {
+      gotException = true;
+      // now the second transaction is cleared and since it was empty, we do not recognize it anymore
+      Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(2), ex.getMessage());
     }
-    catch(NoSuchTxnException ex) {
+    Assert.assertTrue(gotException);
+
+    gotException = false;
+    try {
+      txnHandler.abortTxn(new AbortTxnRequest(4));
+    } catch(NoSuchTxnException ex) {
       gotException = true;
       Assert.assertEquals("No such transaction " + JavaUtils.txnIdToString(4), ex.getMessage());
     }
@@ -1672,9 +1682,11 @@ public class TestTxnHandler {
   @Test
   public void testReplOpenTxn() throws Exception {
     int numTxn = 50000;
-    String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n");
+    String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n");
     long startTxnId = Long.parseLong(output[1].trim());
+    txnHandler.setOpenTxnTimeOutMillis(30000);
     List<Long> txnList = replOpenTxnForTest(startTxnId, numTxn, "default.*");
+    txnHandler.setOpenTxnTimeOutMillis(1000);
     assert(txnList.size() == numTxn);
     txnHandler.abortTxns(new AbortTxnsRequest(txnList));
   }
@@ -1682,7 +1694,7 @@ public class TestTxnHandler {
   @Test
   public void testReplAllocWriteId() throws Exception {
     int numTxn = 2;
-    String[] output = TxnDbUtil.queryToString(conf, "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"").split("\n");
+    String[] output = TxnDbUtil.queryToString(conf, "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"").split("\n");
     long startTxnId = Long.parseLong(output[1].trim());
     List<Long> srcTxnIdList = LongStream.rangeClosed(startTxnId, numTxn+startTxnId-1)
             .boxed().collect(Collectors.toList());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2c13e8d..48bf852 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -89,6 +89,7 @@ public class TestTxnCommands2 {
 
   protected HiveConf hiveConf;
   protected Driver d;
+  private TxnStore txnHandler;
   protected enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart", "p"),
@@ -151,6 +152,7 @@ public class TestTxnCommands2 {
 
     TxnDbUtil.setConfValues(hiveConf);
     TxnDbUtil.prepDb(hiveConf);
+    txnHandler = TxnUtils.getTxnStore(hiveConf);
     File f = new File(TEST_WAREHOUSE_DIR);
     if (f.exists()) {
       FileUtil.fullyDelete(f);
@@ -322,7 +324,6 @@ public class TestTxnCommands2 {
     // 3. Perform a major compaction.
     runStatementOnDriver("alter table "+ Table.NONACIDORCTBL + " compact 'MAJOR'");
     runWorker(hiveConf);
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
     Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
@@ -930,7 +931,6 @@ public class TestTxnCommands2 {
     int[][] tableData = {{1,2},{3,4},{5,6}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR));
     runWorker(hiveConf);
     runCleaner(hiveConf);
@@ -953,7 +953,6 @@ public class TestTxnCommands2 {
     runStatementOnDriver("update t1" + " set b = -2 where a = 1");
     runStatementOnDriver("alter table t1 " + " compact 'MAJOR'");
     runWorker(hiveConf);
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals("Unexpected number of compactions in history", 1, resp.getCompactsSize());
     Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
@@ -1029,7 +1028,6 @@ public class TestTxnCommands2 {
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
 
     int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     AtomicBoolean stop = new AtomicBoolean(true);
     //create failed compactions
     for(int i = 0; i < numFailedCompactions; i++) {
@@ -1217,7 +1215,6 @@ public class TestTxnCommands2 {
     runStatementOnDriver("update " + tblName + " set b = 'blah' where a = 3");
 
     //run Worker to execute compaction
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
     runWorker(hiveConf);
 
@@ -1277,7 +1274,6 @@ public class TestTxnCommands2 {
   public void testOpenTxnsCounter() throws Exception {
     hiveConf.setIntVar(HiveConf.ConfVars.HIVE_MAX_OPEN_TXNS, 3);
     hiveConf.setTimeVar(HiveConf.ConfVars.HIVE_COUNT_OPEN_TXNS_INTERVAL, 10, TimeUnit.MILLISECONDS);
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     OpenTxnsResponse openTxnsResponse = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
 
     AcidOpenTxnsCounterService openTxnsCounterService = new AcidOpenTxnsCounterService();
@@ -1319,7 +1315,6 @@ public class TestTxnCommands2 {
     runStatementOnDriver("update " + Table.ACIDTBL + " set b = -2 where b = 2");
     runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MINOR'");
     runWorker(hiveConf);
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
     Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
@@ -1380,6 +1375,8 @@ public class TestTxnCommands2 {
     // now compact and see if compaction still preserves the data correctness
     runStatementOnDriver("alter table "+ tblName + " compact 'MAJOR'");
     runWorker(hiveConf);
+    // create a low water mark aborted transaction and clean the older ones
+    createAbortLowWaterMark();
     runCleaner(hiveConf); // Cleaner would remove the obsolete files.
 
     // Verify that there is now only 1 new directory: base_xxxxxxx and the rest have have been cleaned.
@@ -1403,6 +1400,14 @@ public class TestTxnCommands2 {
     Assert.assertEquals(Arrays.asList(expectedResult), rs);
   }
 
+  protected void createAbortLowWaterMark() throws Exception{
+    hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
+    runStatementOnDriver("select * from " + Table.ACIDTBL);
+    // wait for metastore.txn.opentxn.timeout
+    Thread.sleep(1000);
+    runInitiator(hiveConf);
+  }
+
   @Test
   public void testETLSplitStrategyForACID() throws Exception {
     hiveConf.setVar(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY, "ETL");
@@ -2007,7 +2012,6 @@ public class TestTxnCommands2 {
     Assert.assertEquals("2\t2000\taa", res.get(1));
 
     // Compact
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     CompactionRequest compactionRequest =
         new CompactionRequest("default", tblName, CompactionType.MAJOR);
     compactionRequest.setPartitionname("part=aa");
@@ -2054,7 +2058,6 @@ public class TestTxnCommands2 {
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause),
             2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause));
 
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
     runWorker(hiveConf);
     runCleaner(hiveConf);
@@ -2096,7 +2099,7 @@ public class TestTxnCommands2 {
     // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as
     // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained.
     // As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained.
-    txnHandler.cleanEmptyAbortedTxns();
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
     txnHandler.cleanTxnToWriteIdTable();
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
             3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
@@ -2109,7 +2112,7 @@ public class TestTxnCommands2 {
     txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
     runWorker(hiveConf);
     runCleaner(hiveConf);
-    txnHandler.cleanEmptyAbortedTxns();
+    txnHandler.cleanEmptyAbortedAndCommittedTxns();
     txnHandler.cleanTxnToWriteIdTable();
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
             2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
@@ -2234,7 +2237,6 @@ public class TestTxnCommands2 {
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2));
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData3));
 
-    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
     txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MINOR));
     runWorker(hiveConf);
     runCleaner(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 51b0fa3..5b8c670 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -479,11 +479,15 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
 
     runWorker(hiveConf);
-    assertTableIsEmpty("TXNS");
+    // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+    hiveConf.set("metastore.txn.opentxn.timeout", "1");
+    runInitiator(hiveConf);
+    hiveConf.set("metastore.txn.opentxn.timeout", "1000");
+    assertOneTxn();
     assertTableIsEmpty("TXN_COMPONENTS");
 
     runCleaner(hiveConf);
-    assertTableIsEmpty("TXNS");
+    assertOneTxn();
     assertTableIsEmpty("TXN_COMPONENTS");
   }
 
@@ -500,11 +504,14 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
 
     runWorker(hiveConf);
-    assertTableIsEmpty("TXNS");
+    // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+    hiveConf.set("metastore.txn.opentxn.timeout", "1");
+    runInitiator(hiveConf);
+    hiveConf.set("metastore.txn.opentxn.timeout", "1000");
     assertTableIsEmpty("TXN_COMPONENTS");
 
     runCleaner(hiveConf);
-    assertTableIsEmpty("TXNS");
+    assertOneTxn();
     assertTableIsEmpty("TXN_COMPONENTS");
   }
 
@@ -512,4 +519,8 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0,
         TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table));
   }
+  private void assertOneTxn() throws Exception {
+    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"), 1,
+        TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
+  }
 }
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index 6525ffc..eac2c63 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -465,8 +465,11 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
 
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
             2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
+    // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+    Thread.sleep(1000);
+    runInitiator(hiveConf);
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
-            0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
+            1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
 
     // Initiate a major compaction request on the table.
     runStatementOnDriver("alter table " + TableExtended.MMTBL  + " compact 'MAJOR'");
@@ -475,13 +478,16 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
     runWorker(hiveConf);
     verifyDirAndResult(2, true);
 
+    // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
+    Thread.sleep(1000);
+    runInitiator(hiveConf);
     // Run Cleaner.
     runCleaner(hiveConf);
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
             0,
             TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
     Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
-            0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
+            1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
     verifyDirAndResult(0, true);
   }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index 1435269..3ff68a3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -57,6 +59,8 @@ public abstract class TxnCommandsBaseForTests {
   public TestName testName = new TestName();
   protected HiveConf hiveConf;
   Driver d;
+  private TxnStore txnHandler;
+
   public enum Table {
     ACIDTBL("acidTbl"),
     ACIDTBLPART("acidTblPart"),
@@ -75,6 +79,10 @@ public abstract class TxnCommandsBaseForTests {
     }
   }
 
+  public TxnStore getTxnStore() {
+    return txnHandler;
+  }
+
   @Before
   public void setUp() throws Exception {
     setUpInternal();
@@ -106,6 +114,7 @@ public abstract class TxnCommandsBaseForTests {
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVESTATSCOLAUTOGATHER, false);
     hiveConf.setBoolean("mapred.input.dir.recursive", true);
     TxnDbUtil.setConfValues(hiveConf);
+    txnHandler = TxnUtils.getTxnStore(hiveConf);
     TxnDbUtil.prepDb(hiveConf);
     File f = new File(getWarehouseDir());
     if (f.exists()) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
new file mode 100644
index 0000000..b435e79
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.Driver;
+import org.apache.hadoop.hive.ql.QueryState;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+/**
+ * Base class for "end-to-end" tests for DbTxnManager and simulate concurrent queries.
+ */
+public abstract class DbTxnManagerEndToEndTestBase {
+
+  protected static HiveConf conf = new HiveConf(Driver.class);
+  protected HiveTxnManager txnMgr;
+  protected Context ctx;
+  protected Driver driver, driver2;
+  protected TxnStore txnHandler;
+
+  public DbTxnManagerEndToEndTestBase() {
+    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+            "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+    TxnDbUtil.setConfValues(conf);
+  }
+  @BeforeClass
+  public static void setUpDB() throws Exception{
+    TxnDbUtil.prepDb(conf);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
+    SessionState.start(conf);
+    ctx = new Context(conf);
+    driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
+    driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
+    TxnDbUtil.cleanDb(conf);
+    SessionState ss = SessionState.get();
+    ss.initTxnMgr(conf);
+    txnMgr = ss.getTxnMgr();
+    Assert.assertTrue(txnMgr instanceof DbTxnManager);
+    txnHandler = TxnUtils.getTxnStore(conf);
+
+  }
+  @After
+  public void tearDown() throws Exception {
+    driver.close();
+    driver2.close();
+    if (txnMgr != null) {
+      txnMgr.closeTxnManager();
+    }
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java
index a085e9f..12cbc2a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ITestDbTxnManager.java
@@ -62,8 +62,7 @@ public class ITestDbTxnManager extends TestDbTxnManager2 {
         MetastoreConf.getVar(conf, MetastoreConf.ConfVars.CONNECT_URL_KEY));
     // Start the docker container and create the hive user
     rule.before();
-    rule.createUser();
-    // We do not run the install script, it will be called anyway before every test in prepDb
+    rule.install();
   }
 
   @AfterClass
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index f0ab8af..1687425 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -32,22 +32,14 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.ql.TestTxnCommands2;
-import org.junit.After;
 import org.junit.Assert;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorException;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.ComparisonFailure;
 import org.junit.Rule;
 import org.junit.Test;
@@ -82,47 +74,7 @@ import java.util.Map;
  * using {@link #swapTxnManager(HiveTxnManager)} since in the SessionState the TM is associated with
  * each thread.
  */
-public class TestDbTxnManager2 {
-  protected static HiveConf conf = new HiveConf(Driver.class);
-
-  private HiveTxnManager txnMgr;
-  private Context ctx;
-  private Driver driver;
-  private TxnStore txnHandler;
-
-  public TestDbTxnManager2() {
-    conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
-        "org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
-    conf.setBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
-    TxnDbUtil.setConfValues(conf);
-  }
-
-  @BeforeClass
-  public static void setUpDB() throws Exception{
-    TxnDbUtil.prepDb(conf);
-  }
-
-  @Before
-  public void setUp() throws Exception {
-    conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
-    SessionState.start(conf);
-    ctx = new Context(conf);
-    driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
-    TxnDbUtil.cleanDb(conf);
-    SessionState ss = SessionState.get();
-    ss.initTxnMgr(conf);
-    txnMgr = ss.getTxnMgr();
-    Assert.assertTrue(txnMgr instanceof DbTxnManager);
-    txnHandler = TxnUtils.getTxnStore(conf);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    driver.close();
-    if (txnMgr != null) {
-      txnMgr.closeTxnManager();
-    }
-  }
+public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
 
   /**
    * HIVE-16688
@@ -1237,6 +1189,13 @@ public class TestDbTxnManager2 {
 
     locks = getLocks(txnMgr);
     Assert.assertEquals("Unexpected lock count", 0, locks.size());
+    /**
+     * The last transaction will always remain in the transaction table, so we will open an other one,
+     * wait for the timeout period to exceed, then start the initiator that will clean
+     */
+    txnMgr.openTxn(ctx, "Long Running");
+    Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+    // Now we can clean the write_set
     houseKeeper.run();
     Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\""));
   }
@@ -1311,6 +1270,13 @@ public class TestDbTxnManager2 {
     Assert.assertEquals("Unexpected lock count", 1, locks.size());
     checkLock(LockType.SHARED_READ, LockState.ACQUIRED, "default", "TAB2", null, locks);
     txnMgr.commitTxn();
+    /*
+     * The last transaction will always remain in the transaction table, so we will open an other one,
+     * wait for the timeout period to exceed, then start the initiator that will clean
+     */
+    txnMgr.openTxn(ctx, "Long Running");
+    Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+    // Now we can clean the write_set
     MetastoreTaskThread writeSetService = new AcidWriteSetService();
     writeSetService.setConf(conf);
     writeSetService.run();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java
new file mode 100644
index 0000000..6c30069
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManagerIsolationProperties.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.lockmgr;
+
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+import org.apache.hadoop.hive.ql.exec.FetchTask;
+import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Tests to check the ACID properties and isolation level requirements.
+ */
+public class TestDbTxnManagerIsolationProperties extends DbTxnManagerEndToEndTestBase {
+
+  @Test
+  public void basicOpenTxnsNoDirtyRead() throws Exception {
+    driver.run(("drop table if exists gap"));
+    driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+    // Create one TXN to read and do not run it
+    driver.compileAndRespond("select * from gap");
+    long first = txnMgr.getCurrentTxnId();
+
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    driver2.compileAndRespond("insert into gap values(1,2)");
+    long second = txnMgr2.getCurrentTxnId();
+    Assert.assertTrue("Sequence number goes onward", second > first);
+    driver2.run();
+
+    // Now we run our read query it should not see the write results of the insert
+    swapTxnManager(txnMgr);
+    driver.run();
+
+    FetchTask fetchTask = driver.getFetchTask();
+    List res = new ArrayList();
+    fetchTask.fetch(res);
+    Assert.assertEquals("No dirty read", 0, res.size());
+
+  }
+  @Test
+  public void gapOpenTxnsNoDirtyRead() throws Exception {
+    driver.run(("drop table if exists gap"));
+    driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+    // Create one TXN to delete later
+    driver.compileAndRespond("select * from gap");
+    long first = txnMgr.getCurrentTxnId();
+    driver.run();
+    // The second one we use for Low water mark
+    driver.run("select * from gap");
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    // Make sure, that the time window is great enough to consider the gap open
+    txnHandler.setOpenTxnTimeOutMillis(30000);
+    // Create a gap
+    deleteTransactionId(first);
+    CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap");
+    long third = txnMgr2.getCurrentTxnId();
+    Assert.assertTrue("Sequence number goes onward", third > first);
+    ValidTxnList validTxns = txnMgr2.getValidTxns();
+    Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn());
+    txnHandler.setOpenTxnTimeOutMillis(1000);
+
+    // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call
+    setBackSequence(first);
+    swapTxnManager(txnMgr);
+    driver.compileAndRespond("insert into gap values(1,2)");
+    long forth = txnMgr.getCurrentTxnId();
+    Assert.assertEquals(first, forth);
+    driver.run();
+
+    // Now we run our read query it should not see the write results of the insert
+    swapTxnManager(txnMgr2);
+    driver2.run();
+
+    FetchTask fetchTask = driver2.getFetchTask();
+    List res = new ArrayList();
+    fetchTask.fetch(res);
+    Assert.assertEquals("No dirty read", 0, res.size());
+
+  }
+
+
+
+  @Test
+  public void multipleGapOpenTxnsNoDirtyRead() throws Exception {
+    driver.run(("drop table if exists gap"));
+    driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+    // Create some TXN to delete later
+    OpenTxnsResponse openTxns = txnHandler.openTxns(new OpenTxnRequest(10, "user", "local"));
+    openTxns.getTxn_ids().stream().forEach(txnId -> {
+      silentCommitTxn(new CommitTxnRequest(txnId));
+    });
+
+    long first = openTxns.getTxn_ids().get(0);
+    long last = openTxns.getTxn_ids().get(9);
+    // The next one we use for Low water mark
+    driver.run("select * from gap");
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    // Make sure, that the time window is great enough to consider the gap open
+    txnHandler.setOpenTxnTimeOutMillis(30000);
+    // Create a gap
+    deleteTransactionId(first, last);
+    CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap");
+    long next = txnMgr2.getCurrentTxnId();
+    Assert.assertTrue("Sequence number goes onward", next > last);
+    ValidTxnList validTxns = txnMgr2.getValidTxns();
+    Assert.assertEquals("Expect to see the gap as open", first, (long) validTxns.getMinOpenTxn());
+    txnHandler.setOpenTxnTimeOutMillis(1000);
+
+    // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call
+    setBackSequence(first);
+    swapTxnManager(txnMgr);
+    driver.compileAndRespond("insert into gap values(1,2)");
+    next = txnMgr.getCurrentTxnId();
+    Assert.assertEquals(first, next);
+    driver.run();
+
+    // Now we run our read query it should not see the write results of the insert
+    swapTxnManager(txnMgr2);
+    driver2.run();
+
+    FetchTask fetchTask = driver2.getFetchTask();
+    List res = new ArrayList();
+    fetchTask.fetch(res);
+    Assert.assertEquals("No dirty read", 0, res.size());
+
+  }
+
+  @Test
+  public void gapOpenTxnsDirtyRead() throws Exception {
+    driver.run(("drop table if exists gap"));
+    driver.run("create table gap (a int, b int) " + "stored as orc TBLPROPERTIES ('transactional'='true')");
+    // Create one TXN to delete later
+    driver.compileAndRespond("select * from gap");
+    long first = txnMgr.getCurrentTxnId();
+    driver.run();
+    //The second one we use for Low water mark
+    driver.run("select * from gap");
+    DbTxnManager txnMgr2 = (DbTxnManager) TxnManagerFactory.getTxnManagerFactory().getTxnManager(conf);
+    swapTxnManager(txnMgr2);
+    // Now we wait for the time window to move forward
+    Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
+    // Create a gap
+    deleteTransactionId(first);
+    CommandProcessorResponse resp = driver2.compileAndRespond("select * from gap");
+    long third = txnMgr2.getCurrentTxnId();
+    Assert.assertTrue("Sequence number goes onward", third > first);
+    ValidTxnList validTxns = txnMgr2.getValidTxns();
+    Assert.assertNull("Expect to see no gap", validTxns.getMinOpenTxn());
+
+    // Now we cheat and create a transaction with the first sequenceId again imitating a very slow openTxns call
+    // This should never happen
+    setBackSequence(first);
+    swapTxnManager(txnMgr);
+    driver.compileAndRespond("insert into gap values(1,2)");
+    long forth = txnMgr.getCurrentTxnId();
+    Assert.assertEquals(first, forth);
+    driver.run();
+
+    // Now we run our read query it should unfortunately see the results of the insert
+    swapTxnManager(txnMgr2);
+    driver2.run();
+
+    FetchTask fetchTask = driver2.getFetchTask();
+    List res = new ArrayList();
+    fetchTask.fetch(res);
+    Assert.assertEquals("Dirty read!", 1, res.size());
+
+  }
+  private void silentCommitTxn(CommitTxnRequest commitTxnRequest) {
+    try {
+      txnHandler.commitTxn(commitTxnRequest);
+    } catch (Exception ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  private void deleteTransactionId(long txnId) throws SQLException {
+    deleteTransactionId(txnId, txnId);
+  }
+
+  private void deleteTransactionId(long minTxnId, long maxTxnId) throws SQLException {
+    DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+    DataSource ds = dsp.create(conf);
+    Connection dbConn = ds.getConnection();
+    Statement stmt = dbConn.createStatement();
+    stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId);
+    dbConn.commit();
+    stmt.close();
+    dbConn.close();
+  }
+
+  private void setBackSequence(long txnId) throws SQLException {
+    DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+    DataSource ds = dsp.create(conf);
+    Connection dbConn = ds.getConnection();
+    Statement stmt = dbConn.createStatement();
+    stmt.executeUpdate("ALTER TABLE TXNS ALTER TXN_ID RESTART WITH " + txnId);
+    dbConn.commit();
+    stmt.close();
+    dbConn.close();
+  }
+
+  public static HiveTxnManager swapTxnManager(HiveTxnManager txnMgr) {
+    return SessionState.get().setTxnMgr(txnMgr);
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 1151466..e4ff14a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -218,18 +218,19 @@ public class TestInitiator extends CompactorTest {
     req.setTxnid(txnid);
     LockResponse res = txnHandler.lock(req);
     txnHandler.abortTxn(new AbortTxnRequest(txnid));
-
+    txnHandler.setOpenTxnTimeOutMillis(30000);
     conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50);
     OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(
       TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname"));
     txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids()));
     GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
     Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
-
+    txnHandler.setOpenTxnTimeOutMillis(1);
     startInitiator();
 
     openTxns = txnHandler.getOpenTxns();
-    Assert.assertEquals(1, openTxns.getOpen_txnsSize());
+    // txnid:1 has txn_components, txnid:TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1 is the last
+    Assert.assertEquals(2, openTxns.getOpen_txnsSize());
   }
 
   @Test
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 9ce0085..842b7fe 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1200,6 +1200,8 @@ public class MetastoreConf {
             "class is used to store and retrieve transactions and locks"),
     TXN_TIMEOUT("metastore.txn.timeout", "hive.txn.timeout", 300, TimeUnit.SECONDS,
         "time after which transactions are declared aborted if the client has not sent a heartbeat."),
+    TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS,
+        "Time before an open transaction operation should persist, otherwise it is considered invalid and rolled back"),
     URI_RESOLVER("metastore.uri.resolver", "hive.metastore.uri.resolver", "",
             "If set, fully qualified class name of resolver for hive metastore uri's"),
     USERS_IN_ADMIN_ROLE("metastore.users.in.admin.role", "hive.users.in.admin.role", "", false,
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index 49b737e..08ef5e9 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -99,7 +99,7 @@ public final class SQLGenerator {
   }
 
   /**
-   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB.
    *
    * @param tblColumns e.g. "T(a,b,c)"
    * @param rows       e.g. list of Strings like 3,4,'d'
@@ -110,7 +110,7 @@ public final class SQLGenerator {
   }
 
   /**
-   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB.
    *
    * @param tblColumns e.g. "T(a,b,c)"
    * @param rows       e.g. list of Strings like 3,4,'d'
@@ -263,7 +263,7 @@ public final class SQLGenerator {
    * @throws SQLException
    */
   public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List<String> parameters)
-          throws SQLException {
+      throws SQLException {
     PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql));
     if ((parameters == null) || parameters.isEmpty()) {
       return pst;
@@ -292,4 +292,36 @@ public final class SQLGenerator {
     return s;
   }
 
+
+  /**
+   * Creates a lock statement for open/commit transaction based on the dbProduct in shared read / exclusive mode.
+   * @param shared shared or exclusive lock
+   * @return sql statement to execute
+   * @throws MetaException if the dbProduct is unknown
+   */
+  public String createTxnLockStatement(boolean shared) throws MetaException{
+    String txnLockTable = "TXN_LOCK_TBL";
+    switch (dbProduct) {
+    case MYSQL:
+      // For Mysql we do not use lock table statement for two reasons
+      // It is not released automatically on commit/rollback
+      // It requires to lock every table that will be used by the statement
+      // https://dev.mysql.com/doc/refman/8.0/en/lock-tables.html
+      return "SELECT  \"TXN_LOCK\" FROM \"" + txnLockTable + "\" " + (shared ? "LOCK IN SHARE MODE" : "FOR UPDATE");
+    case POSTGRES:
+      // https://www.postgresql.org/docs/9.4/sql-lock.html
+    case DERBY:
+      // https://db.apache.org/derby/docs/10.4/ref/rrefsqlj40506.html
+    case ORACLE:
+      // https://docs.oracle.com/cd/B19306_01/server.102/b14200/statements_9015.htm
+      return "LOCK TABLE \"" + txnLockTable + "\" IN " + (shared ? "SHARE" : "EXCLUSIVE") + " MODE";
+    case SQLSERVER:
+      // https://docs.microsoft.com/en-us/sql/t-sql/queries/hints-transact-sql-table?view=sql-server-ver15
+      return "SELECT * FROM \"" + txnLockTable + "\" WITH (" + (shared ? "TABLOCK" : "TABLOCKX") + ", HOLDLOCK)";
+    default:
+      String msg = "Unrecognized database product name <" + dbProduct + ">";
+      LOG.error(msg);
+      throw new MetaException(msg);
+    }
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 2344c2d..a1bc109 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -223,7 +223,7 @@ class CompactionTxnHandler extends TxnHandler {
         stmt = dbConn.createStatement();
         String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', "
             + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = "
-            + "(SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\")"
+            + "(SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\")"
             + " WHERE \"CQ_ID\" = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
         int updCnt = stmt.executeUpdate(s);
@@ -474,7 +474,7 @@ class CompactionTxnHandler extends TxnHandler {
   }
   /**
    * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
-   * min(NEXT_TXN_ID.ntxn_next, min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
+   * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
    */
   @Override
   @RetrySemantics.SafeToRetry
@@ -492,9 +492,9 @@ class CompactionTxnHandler extends TxnHandler {
 
         // First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
         // If there are no txns which are currently open or aborted in the system, then current value of
-        // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid.
+        // max(TXNS.txn_id) could be min_uncommitted_txnid.
         String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
-            "SELECT MIN(\"NTXN_NEXT\") AS \"ID\" FROM \"NEXT_TXN_ID\" " +
+            "SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\" " +
             "UNION " +
             "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " +
             "UNION " +
@@ -504,7 +504,7 @@ class CompactionTxnHandler extends TxnHandler {
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
+          throw new MetaException("Transaction tables not properly initialized, no record found in TXNS");
         }
         long minUncommitedTxnid = rs.getLong(1);
 
@@ -533,25 +533,36 @@ class CompactionTxnHandler extends TxnHandler {
   }
 
   /**
-   * Clean up aborted transactions from txns that have no components in txn_components. The reason such
-   * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
-   * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+   * Clean up aborted / committed transactions from txns that have no components in txn_components.
+   * The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally.
+   * The reason such aborted txns exist can be that now work was done in this txn
+   * (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work)
+   * or due to {@link #markCleaned(CompactionInfo)} being called.
    */
   @Override
   @RetrySemantics.SafeToRetry
-  public void cleanEmptyAbortedTxns() throws MetaException {
+  public void cleanEmptyAbortedAndCommittedTxns() throws MetaException {
+    LOG.info("Start to clean empty aborted or committed TXNS");
     try {
       Connection dbConn = null;
       Statement stmt = null;
       ResultSet rs = null;
       try {
-        //Aborted is a terminal state, so nothing about the txn can change
+        //Aborted and committed are terminal states, so nothing about the txn can change
         //after that, so READ COMMITTED is sufficient.
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
+        /**
+         * Only delete aborted / committed transaction in a way that guarantees two things:
+         * 1. never deletes anything that is inside the TXN_OPENTXN_TIMEOUT window
+         * 2. never deletes the maximum txnId even if it is before the TXN_OPENTXN_TIMEOUT window
+          */
+        long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
+
         String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " +
             "\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " +
-            "\"TXN_STATE\" = '" + TXN_ABORTED + "'";
+            " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "')  AND "
+            + " \"TXN_ID\" < " + lowWaterMark;
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         List<Long> txnids = new ArrayList<>();
@@ -568,16 +579,15 @@ class CompactionTxnHandler extends TxnHandler {
 
         // Delete from TXNS.
         prefix.append("DELETE FROM \"TXNS\" WHERE ");
-        suffix.append("");
 
         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", false, false);
 
         for (String query : queries) {
           LOG.debug("Going to execute update <" + query + ">");
           int rc = stmt.executeUpdate(query);
-          LOG.info("Removed " + rc + "  empty Aborted transactions from TXNS");
+          LOG.debug("Removed " + rc + "  empty Aborted and Committed transactions from TXNS");
         }
-        LOG.info("Aborted transactions removed from TXNS: " + txnids);
+        LOG.info("Aborted and committed transactions removed from TXNS: " + txnids);
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
@@ -591,7 +601,7 @@ class CompactionTxnHandler extends TxnHandler {
         close(rs, stmt, dbConn);
       }
     } catch (RetryException e) {
-      cleanEmptyAbortedTxns();
+      cleanEmptyAbortedAndCommittedTxns();
     }
   }
 
@@ -1147,12 +1157,12 @@ class CompactionTxnHandler extends TxnHandler {
               + quoteChar(READY_FOR_CLEANING) +
               ") \"RES\"";
         } else {
-          query = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"";
+          query = "SELECT MAX(\"TXN_ID\") + 1 FROM \"TXNS\"";
         }
         LOG.debug("Going to execute query <" + query + ">");
         rs = stmt.executeQuery(query);
         if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID");
+          throw new MetaException("Transaction tables not properly initialized, no record found in TXNS");
         }
         return rs.getLong(1);
       } catch (SQLException e) {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
index 97a0833..7a90316 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
@@ -218,63 +218,107 @@ public final class TxnDbUtil {
 
   public static void cleanDb(Configuration conf) throws Exception {
     LOG.info("Cleaning transactional tables");
-    int retryCount = 0;
-    while(++retryCount <= 3) {
-      boolean success = true;
-      Connection conn = null;
-      Statement stmt = null;
+
+    boolean success = true;
+    Connection conn = null;
+    Statement stmt = null;
+    try {
+      conn = getConnection(conf);
+      stmt = conn.createStatement();
+      if (!checkDbPrepared(stmt)){
+        // Nothing to clean
+        return;
+      }
+
+      // We want to try these, whether they succeed or fail.
+      success &= truncateTable(conn, stmt, "TXN_COMPONENTS");
+      success &= truncateTable(conn, stmt, "COMPLETED_TXN_COMPONENTS");
+      success &= truncateTable(conn, stmt, "TXNS");
+      success &= truncateTable(conn, stmt, "TXN_TO_WRITE_ID");
+      success &= truncateTable(conn, stmt, "NEXT_WRITE_ID");
+      success &= truncateTable(conn, stmt, "HIVE_LOCKS");
+      success &= truncateTable(conn, stmt, "NEXT_LOCK_ID");
+      success &= truncateTable(conn, stmt, "COMPACTION_QUEUE");
+      success &= truncateTable(conn, stmt, "NEXT_COMPACTION_QUEUE_ID");
+      success &= truncateTable(conn, stmt, "COMPLETED_COMPACTIONS");
+      success &= truncateTable(conn, stmt, "AUX_TABLE");
+      success &= truncateTable(conn, stmt, "WRITE_SET");
+      success &= truncateTable(conn, stmt, "REPL_TXN_MAP");
+      success &= truncateTable(conn, stmt, "MATERIALIZATION_REBUILD_LOCKS");
       try {
-        conn = getConnection(conf);
-        stmt = conn.createStatement();
-
-        // We want to try these, whether they succeed or fail.
-        success &= truncateTable(conn, stmt, "TXN_COMPONENTS");
-        success &= truncateTable(conn, stmt, "COMPLETED_TXN_COMPONENTS");
-        success &= truncateTable(conn, stmt, "TXNS");
-        success &= truncateTable(conn, stmt, "NEXT_TXN_ID");
-        success &= truncateTable(conn, stmt, "TXN_TO_WRITE_ID");
-        success &= truncateTable(conn, stmt, "NEXT_WRITE_ID");
-        success &= truncateTable(conn, stmt, "HIVE_LOCKS");
-        success &= truncateTable(conn, stmt, "NEXT_LOCK_ID");
-        success &= truncateTable(conn, stmt, "COMPACTION_QUEUE");
-        success &= truncateTable(conn, stmt, "NEXT_COMPACTION_QUEUE_ID");
-        success &= truncateTable(conn, stmt, "COMPLETED_COMPACTIONS");
-        success &= truncateTable(conn, stmt, "AUX_TABLE");
-        success &= truncateTable(conn, stmt, "WRITE_SET");
-        success &= truncateTable(conn, stmt, "REPL_TXN_MAP");
-        success &= truncateTable(conn, stmt, "MATERIALIZATION_REBUILD_LOCKS");
-        try {
-          stmt.executeUpdate("INSERT INTO \"NEXT_TXN_ID\" VALUES(1)");
-          stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)");
-          stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)");
-        } catch (SQLException e) {
-          if (!getTableNotExistsErrorCodes().contains(e.getSQLState())) {
-            LOG.error("Error initializing NEXT_TXN_ID");
-            success = false;
-          }
+        resetTxnSequence(conn, stmt);
+        stmt.executeUpdate("INSERT INTO \"NEXT_LOCK_ID\" VALUES(1)");
+        stmt.executeUpdate("INSERT INTO \"NEXT_COMPACTION_QUEUE_ID\" VALUES(1)");
+      } catch (SQLException e) {
+        if (!getTableNotExistsErrorCodes().contains(e.getSQLState())) {
+          LOG.error("Error initializing sequence values", e);
+          success = false;
         }
-        /*
-         * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other
-         * table which are not txn related to generate primary key. So if these tables are dropped
-         *  and other tables are not dropped, then it will create key duplicate error while inserting
-         *  to other table.
-         */
-      } finally {
-        closeResources(conn, stmt, null);
-      }
-      if(success) {
-        return;
       }
+      /*
+       * Don't drop NOTIFICATION_LOG, SEQUENCE_TABLE and NOTIFICATION_SEQUENCE as its used by other
+       * table which are not txn related to generate primary key. So if these tables are dropped
+       *  and other tables are not dropped, then it will create key duplicate error while inserting
+       *  to other table.
+       */
+    } finally {
+      closeResources(conn, stmt, null);
+    }
+    if(success) {
+      return;
     }
     throw new RuntimeException("Failed to clean up txn tables");
   }
 
+  private static void resetTxnSequence(Connection conn, Statement stmt) throws SQLException, MetaException{
+    String dbProduct = conn.getMetaData().getDatabaseProductName();
+    DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct);
+    switch (databaseProduct) {
+
+    case DERBY:
+      stmt.execute("ALTER TABLE \"TXNS\" ALTER \"TXN_ID\" RESTART WITH 1");
+      stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+          + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+          + "  VALUES(0, 'c', 0, 0, '', '')");
+      break;
+    case MYSQL:
+      stmt.execute("ALTER TABLE \"TXNS\" AUTO_INCREMENT=1");
+      stmt.execute("SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO,ANSI_QUOTES'");
+      stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+          + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+          + "  VALUES(0, 'c', 0, 0, '', '')");
+      break;
+    case POSTGRES:
+      stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+          + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+          + "  VALUES(0, 'c', 0, 0, '', '')");
+      stmt.execute("ALTER SEQUENCE \"TXNS_TXN_ID_seq\" RESTART");
+      break;
+    case ORACLE:
+      stmt.execute("ALTER TABLE \"TXNS\" MODIFY \"TXN_ID\" GENERATED BY DEFAULT AS IDENTITY (START WITH 1)");
+      stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+          + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+          + "  VALUES(0, 'c', 0, 0, '_', '_')");
+      break;
+    case SQLSERVER:
+      stmt.execute("DBCC CHECKIDENT ('txns', RESEED, 0)");
+      stmt.execute("SET IDENTITY_INSERT TXNS ON");
+      stmt.execute("INSERT INTO \"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\","
+          + " \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\")"
+          + "  VALUES(0, 'c', 0, 0, '', '')");
+      break;
+    case OTHER:
+    default:
+      break;
+    }
+  }
+
   private static boolean truncateTable(Connection conn, Statement stmt, String name) {
     try {
       // We can not use actual truncate due to some foreign keys, but we don't expect much data during tests
       String dbProduct = conn.getMetaData().getDatabaseProductName();
       DatabaseProduct databaseProduct = determineDatabaseProduct(dbProduct);
-      if (databaseProduct == POSTGRES) {
+      if (databaseProduct == POSTGRES || databaseProduct == MYSQL) {
         stmt.execute("DELETE FROM \"" + name + "\"");
       } else {
         stmt.execute("DELETE FROM " + name);
@@ -503,4 +547,28 @@ public final class TxnDbUtil {
     }
     return affectedRowsByQuery;
   }
+
+  /**
+   +   * Checks if the dbms supports the getGeneratedKeys for multiline insert statements.
+   +   * @param dbProduct DBMS type
+   +   * @return true if supports
+   +   * @throws MetaException
+   +   */
+  public static boolean supportsGetGeneratedKeys(DatabaseProduct dbProduct) throws MetaException {
+    switch (dbProduct) {
+    case DERBY:
+    case SQLSERVER:
+      // The getGeneratedKeys is not supported for multi line insert
+      return false;
+    case ORACLE:
+    case MYSQL:
+    case POSTGRES:
+      return true;
+    case OTHER:
+    default:
+      String msg = "Unknown database product: " + dbProduct.toString();
+      LOG.error(msg);
+      throw new MetaException(msg);
+    }
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 10a02b1..8fded60 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -55,6 +55,7 @@ import java.util.stream.Collectors;
 
 import javax.sql.DataSource;
 
+import org.apache.commons.lang3.time.StopWatch;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.commons.lang3.tuple.Pair;
@@ -73,7 +74,59 @@ import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
 import org.apache.hadoop.hive.metastore.TransactionalMetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.LockTypeComparator;
-import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.AddDynamicPartitions;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.CheckLockRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.CreationMetadata;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.HeartbeatRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeRequest;
+import org.apache.hadoop.hive.metastore.api.HeartbeatTxnRangeResponse;
+import org.apache.hadoop.hive.metastore.api.HiveObjectType;
+import org.apache.hadoop.hive.metastore.api.InitializeTableWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockResponse;
+import org.apache.hadoop.hive.metastore.api.LockState;
+import org.apache.hadoop.hive.metastore.api.LockType;
+import org.apache.hadoop.hive.metastore.api.Materialization;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.ReplLastIdInfo;
+import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
+import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnOpenException;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+import org.apache.hadoop.hive.metastore.api.UnlockRequest;
+import org.apache.hadoop.hive.metastore.api.WriteEventInfo;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
@@ -174,8 +227,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   static final protected char MINOR_TYPE = 'i';
 
   // Transaction states
-  static final protected char TXN_ABORTED = 'a';
-  static final protected char TXN_OPEN = 'o';
+  protected static final char TXN_ABORTED = 'a';
+  protected static final char TXN_OPEN = 'o';
+  protected static final char TXN_COMMITTED = 'c';
+
+  private static final char TXN_TMP = '_';
+
   //todo: make these like OperationType and remove above char constants
   enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN}
 
@@ -189,6 +246,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   private static DataSource connPool;
   private static DataSource connPoolMutex;
 
+  private static final String MANUAL_RETRY = "ManualRetry";
+
   // Query definitions
   private static final String HIVE_LOCKS_INSERT_QRY = "INSERT INTO \"HIVE_LOCKS\" ( " +
       "\"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", \"HL_TXNID\", \"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", " +
@@ -204,11 +263,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   private static final String COMPL_TXN_COMPONENTS_INSERT_QUERY = "INSERT INTO \"COMPLETED_TXN_COMPONENTS\" " +
           "(\"CTC_TXNID\"," + " \"CTC_DATABASE\", \"CTC_TABLE\", \"CTC_PARTITION\", \"CTC_WRITEID\", \"CTC_UPDATE_DELETE\")" +
           " VALUES (%s, ?, ?, ?, ?, %s)";
+  private static final String TXNS_INSERT_QRY = "INSERT INTO \"TXNS\" " +
+      "(\"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", \"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\") "
+      + "VALUES(?,%s,%s,?,?,?)";
   private static final String SELECT_LOCKS_FOR_LOCK_ID_QUERY = "SELECT \"HL_LOCK_EXT_ID\", \"HL_LOCK_INT_ID\", " +
-          "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " +
-          "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
+      "\"HL_DB\", \"HL_TABLE\", \"HL_PARTITION\", \"HL_LOCK_STATE\", \"HL_LOCK_TYPE\", \"HL_TXNID\" " +
+      "FROM \"HIVE_LOCKS\" WHERE \"HL_LOCK_EXT_ID\" = ?";
   private static final String SELECT_TIMED_OUT_LOCKS_QUERY = "SELECT DISTINCT \"HL_LOCK_EXT_ID\" FROM \"HIVE_LOCKS\" " +
-          "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0";
+      "WHERE \"HL_LAST_HEARTBEAT\" < %s - ? AND \"HL_TXNID\" = 0";
 
 
   private List<TransactionalMetaStoreEventListener> transactionalListeners;
@@ -273,9 +335,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   protected Configuration conf;
   private static DatabaseProduct dbProduct;
   private static SQLGenerator sqlGenerator;
+  private static long openTxnTimeOutMillis;
 
   // (End user) Transaction timeout, in milliseconds.
   private long timeout;
+  // Timeout for opening a transaction
 
   private int maxBatchSize;
   private String identifierQuoteString; // quotes to use for quoting tables, where necessary
@@ -357,6 +421,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     maxOpenTxns = MetastoreConf.getIntVar(conf, ConfVars.MAX_OPEN_TXNS);
     maxBatchSize = MetastoreConf.getIntVar(conf, ConfVars.JDBC_MAX_BATCH_SIZE);
 
+    openTxnTimeOutMillis = MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS);
+
     try {
       transactionalListeners = MetaStoreServerUtils.getMetaStoreListeners(
               TransactionalMetaStoreEventListener.class,
@@ -377,59 +443,74 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   @RetrySemantics.ReadOnly
   public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
     try {
-      // We need to figure out the current transaction number and the list of
-      // open transactions.  To avoid needing a transaction on the underlying
-      // database we'll look at the current transaction number first.  If it
-      // subsequently shows up in the open list that's ok.
+      // We need to figure out the HighWaterMark and the list of open transactions.
       Connection dbConn = null;
       Statement stmt = null;
       ResultSet rs = null;
       try {
-        /**
-         * This method can run at READ_COMMITTED as long as long as
-         * {@link #openTxns(org.apache.hadoop.hive.metastore.api.OpenTxnRequest)} is atomic.
-         * More specifically, as long as advancing TransactionID in NEXT_TXN_ID is atomic with
-         * adding corresponding entries into TXNS.  The reason is that any txnid below HWM
-         * is either in TXNS and thus considered open (Open/Aborted) or it's considered Committed.
+        /*
+         * This method need guarantees from
+         * {@link #openTxns(OpenTxnRequest)} and  {@link #commitTxn(CommitTxnRequest)}.
+         * It will look at the TXNS table and find each transaction between the max(txn_id) as HighWaterMark
+         * and the max(txn_id) before the TXN_OPENTXN_TIMEOUT period as LowWaterMark.
+         * Every transaction that is not found between these will be considered as open, since it may appear later.
+         * openTxns must ensure, that no new transaction will be opened with txn_id below LWM and
+         * commitTxn must ensure, that no committed transaction will be removed before the time period expires.
          */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_txn_id");
-        }
-        long hwm = rs.getLong(1);
-        if (rs.wasNull()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, null record found in next_txn_id");
-        }
-        close(rs);
         List<TxnInfo> txnInfos = new ArrayList<>();
-        //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" FROM " +
-            "\"TXNS\" WHERE \"TXN_ID\" <= " + hwm;
+
+        String s =
+            "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", "
+                + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")"
+                + "FROM \"TXNS\" ORDER BY \"TXN_ID\"";
         LOG.debug("Going to execute query<" + s + ">");
         rs = stmt.executeQuery(s);
+        /*
+         * We can use the maximum txn_id from the TXNS table as high water mark, since the commitTxn and the Initiator
+         * guarantees, that the transaction with the highest txn_id will never be removed from the TXNS table.
+         * If there is a pending openTxns, that is already acquired it's sequenceId but not yet committed the insert
+         * into the TXNS table, will have either a lower txn_id than HWM and will be listed in the openTxn list,
+         * or will have a higher txn_id and don't effect this getOpenTxns() call.
+         */
+        long hwm = 0;
+        long openTxnLowBoundary = 0;
+
         while (rs.next()) {
+          long txnId = rs.getLong(1);
+          long age = rs.getLong(7);
+          hwm = txnId;
+          if (age < getOpenTxnTimeOutMillis()) {
+            // We will consider every gap as an open transaction from the previous txnId
+            openTxnLowBoundary++;
+            while (txnId > openTxnLowBoundary) {
+              // Add an empty open transaction for every missing value
+              txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null));
+              openTxnLowBoundary++;
+            }
+          } else {
+            openTxnLowBoundary = txnId;
+          }
           char c = rs.getString(2).charAt(0);
           TxnState state;
           switch (c) {
-            case TXN_ABORTED:
-              state = TxnState.ABORTED;
-              break;
+          case TXN_COMMITTED:
+            // This is only here, to avoid adding this txnId as possible gap
+            continue;
 
-            case TXN_OPEN:
-              state = TxnState.OPEN;
-              break;
+          case TXN_ABORTED:
+            state = TxnState.ABORTED;
+            break;
 
-            default:
-              throw new MetaException("Unexpected transaction state " + c +
-                " found in txns table");
+          case TXN_OPEN:
+            state = TxnState.OPEN;
+            break;
+
+          default:
+            throw new MetaException("Unexpected transaction state " + c + " found in txns table");
           }
-          TxnInfo txnInfo = new TxnInfo(rs.getLong(1), state, rs.getString(3), rs.getString(4));
+          TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4));
           txnInfo.setStartedTime(rs.getLong(5));
           txnInfo.setLastHeartbeatTime(rs.getLong(6));
           txnInfos.add(txnInfo);
@@ -441,8 +522,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "getOpenTxnsInfo");
-        throw new MetaException("Unable to select from transaction database: " + getMessage(e)
-          + StringUtils.stringifyException(e));
+        throw new MetaException(
+            "Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e));
       } finally {
         close(rs, stmt, dbConn);
       }
@@ -455,42 +536,47 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   @RetrySemantics.ReadOnly
   public GetOpenTxnsResponse getOpenTxns() throws MetaException {
     try {
-      // We need to figure out the current transaction number and the list of
-      // open transactions.  To avoid needing a transaction on the underlying
-      // database we'll look at the current transaction number first.  If it
-      // subsequently shows up in the open list that's ok.
+      // We need to figure out the current transaction number and the list of open transactions.
       Connection dbConn = null;
       Statement stmt = null;
       ResultSet rs = null;
       try {
-        /**
+        /*
          * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
          */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        String s = "SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"";
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, no record found in next_txn_id");
-        }
-        long hwm = rs.getLong(1);
-        if (rs.wasNull()) {
-          throw new MetaException("Transaction tables not properly " +
-            "initialized, null record found in next_txn_id");
-        }
-        close(rs);
+
         List<Long> openList = new ArrayList<>();
-        //need the WHERE clause below to ensure consistent results with READ_COMMITTED
-        s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" <= " + hwm + " ORDER BY \"TXN_ID\"";
+        String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", "
+            + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")"
+            + " FROM \"TXNS\" ORDER BY \"TXN_ID\"";
         LOG.debug("Going to execute query<" + s + ">");
         rs = stmt.executeQuery(s);
+        long hwm = 0;
+        long openTxnLowBoundary = 0;
         long minOpenTxn = Long.MAX_VALUE;
         BitSet abortedBits = new BitSet();
         while (rs.next()) {
           long txnId = rs.getLong(1);
+          long age = rs.getLong(4);
+          hwm = txnId;
+          if (age < getOpenTxnTimeOutMillis()) {
+            // We will consider every gap as an open transaction from the previous txnId
+            openTxnLowBoundary++;
+            while (txnId > openTxnLowBoundary) {
+              // Add an empty open transaction for every missing value
+              openList.add(openTxnLowBoundary);
+              minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary);
+              openTxnLowBoundary++;
+            }
+          } else {
+            openTxnLowBoundary = txnId;
+          }
           char txnState = rs.getString(2).charAt(0);
+          if (txnState == TXN_COMMITTED) {
+            continue;
+          }
           if (txnState == TXN_OPEN) {
             minOpenTxn = Math.min(minOpenTxn, txnId);
           }
@@ -506,7 +592,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         dbConn.rollback();
         ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
         GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer);
-        if(minOpenTxn < Long.MAX_VALUE) {
+        if (minOpenTxn < Long.MAX_VALUE) {
           otr.setMin_open_txn(minOpenTxn);
         }
         return otr;
@@ -554,13 +640,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       Connection dbConn = null;
       Statement stmt = null;
       try {
-        lockInternal();
-        /**
+        /*
          * To make {@link #getOpenTxns()}/{@link #getOpenTxnsInfo()} work correctly, this operation must ensure
-         * that advancing the counter in NEXT_TXN_ID and adding appropriate entries to TXNS is atomic.
-         * Also, advancing the counter must work when multiple metastores are running.
-         * SELECT ... FOR UPDATE is used to prevent
-         * concurrent DB transactions being rolled back due to Write-Write conflict on NEXT_TXN_ID.
+         * that looking at the TXNS table every open transaction could be identified below a given High Water Mark.
+         * One way to do it, would be to serialize the openTxns call with a S4U lock, but that would cause
+         * performance degradation with high transaction load.
+         * To enable parallel openTxn calls, we define a time period (TXN_OPENTXN_TIMEOUT) and consider every
+         * transaction missing from the TXNS table in that period open, and prevent opening transaction outside
+         * the period.
+         * Example: At t[0] there is one open transaction in the TXNS table, T[1].
+         * T[2] acquires the next sequence at t[1] but only commits into the TXNS table at t[10].
+         * T[3] acquires its sequence at t[2], and commits into the TXNS table at t[3].
+         * Then T[3] calculates it’s snapshot at t[4] and puts T[1] and also T[2] in the snapshot’s
+         * open transaction list. T[1] because it is presented as open in TXNS,
+         * T[2] because it is a missing sequence.
          *
          * In the current design, there can be several metastore instances running in a given Warehouse.
          * This makes ideas like reserving a range of IDs to save trips to DB impossible.  For example,
@@ -573,35 +666,67 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          * set could support a write-through cache for added performance.
          */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        // Make sure the user has not requested an insane amount of txns.
-        int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
-        if (numTxns > maxTxns) numTxns = maxTxns;
-
         stmt = dbConn.createStatement();
-        List<Long> txnIds = openTxns(dbConn, stmt, rqst);
+        /*
+         * The openTxn and commitTxn must be mutexed, when committing a not read only transaction.
+         * This is achieved by requesting a shared table lock here, and an exclusive one at commit.
+         * Since table locks are working in Derby, we don't need the lockInternal call here.
+         * Example: Suppose we have two transactions with update like x = x+1.
+         * We have T[3,3] that was using a value from a snapshot with T[2,2]. If we allow committing T[3,3]
+         * and opening T[4] parallel it is possible, that T[4] will be using the value from a snapshot with T[2,2],
+         * and we will have a lost update problem
+         */
+        acquireTxnLock(stmt, true);
+        // Measure the time from acquiring the sequence value, till committing in the TXNS table
+        StopWatch generateTransactionWatch = new StopWatch();
+        generateTransactionWatch.start();
+
+        List<Long> txnIds = openTxns(dbConn, rqst);
 
         LOG.debug("Going to commit");
         dbConn.commit();
+        generateTransactionWatch.stop();
+        long elapsedMillis = generateTransactionWatch.getTime(TimeUnit.MILLISECONDS);
+        TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT;
+        if (txnType != TxnType.READ_ONLY && elapsedMillis >= openTxnTimeOutMillis) {
+          /*
+           * The commit was too slow, we can not allow this to continue (except if it is read only,
+           * since that can not cause dirty reads).
+           * When calculating the snapshot for a given transaction, we look back for possible open transactions
+           * (that are not yet committed in the TXNS table), for TXN_OPENTXN_TIMEOUT period.
+           * We can not allow a write transaction, that was slower than TXN_OPENTXN_TIMEOUT to continue,
+           * because there can be other transactions running, that didn't considered this transactionId open,
+           * this could cause dirty reads.
+           */
+          LOG.error("OpenTxnTimeOut exceeded commit duration {}, deleting transactionIds: {}", elapsedMillis, txnIds);
+          deleteInvalidOpenTransactions(dbConn, txnIds);
+          /*
+           * We do not throw RetryException directly, to not circumvent the max retry limit
+           */
+          throw new SQLException("OpenTxnTimeOut exceeded", MANUAL_RETRY);
+        }
         return new OpenTxnsResponse(txnIds);
       } catch (SQLException e) {
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "openTxns(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database "
-          + StringUtils.stringifyException(e));
+        throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e));
       } finally {
         close(null, stmt, dbConn);
-        unlockInternal();
       }
     } catch (RetryException e) {
       return openTxns(rqst);
     }
   }
 
-  private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst)
+  private List<Long> openTxns(Connection dbConn, OpenTxnRequest rqst)
           throws SQLException, MetaException {
     int numTxns = rqst.getNum_txns();
-    ResultSet rs = null;
+    // Make sure the user has not requested an insane amount of txns.
+    int maxTxns = MetastoreConf.getIntVar(conf, ConfVars.TXN_MAX_OPEN_BATCH);
+    if (numTxns > maxTxns) {
+      numTxns = maxTxns;
+    }
     List<PreparedStatement> insertPreparedStmts = null;
     TxnType txnType = rqst.isSetTxn_type() ? rqst.getTxn_type() : TxnType.DEFAULT;
     try {
@@ -620,51 +745,54 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         txnType = TxnType.REPL_CREATED;
       }
 
-      String s = sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\"");
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
-      if (!rs.next()) {
-        throw new MetaException("Transaction database not properly " +
-                "configured, can't find next transaction id.");
-      }
-      long first = rs.getLong(1);
-      s = "UPDATE \"NEXT_TXN_ID\" SET \"NTXN_NEXT\" = " + (first + numTxns);
-      LOG.debug("Going to execute update <" + s + ">");
-      stmt.executeUpdate(s);
-
       List<Long> txnIds = new ArrayList<>(numTxns);
-
-      List<String> rows = new ArrayList<>();
-      List<String> params = new ArrayList<>();
-      params.add(rqst.getUser());
-      params.add(rqst.getHostname());
-      List<List<String>> paramsList = new ArrayList<>(numTxns);
-
-      for (long i = first; i < first + numTxns; i++) {
-        txnIds.add(i);
-        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + TxnDbUtil.getEpochFn(dbProduct) + ","
-                + TxnDbUtil.getEpochFn(dbProduct) + ",?,?," + txnType.getValue());
-        paramsList.add(params);
-      }
-      insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
-            "\"TXNS\" (\"TXN_ID\", \"TXN_STATE\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", "
-                + "\"TXN_USER\", \"TXN_HOST\", \"TXN_TYPE\")",
-              rows, paramsList);
-      for (PreparedStatement pst : insertPreparedStmts) {
-        pst.execute();
+      /*
+       * The getGeneratedKeys are not supported in every dbms, after executing a multi line insert.
+       * But it is supported in every used dbms for single line insert, even if the metadata says otherwise.
+       * If the getGeneratedKeys are not supported first we insert a random batchId in the TXN_META_INFO field,
+       * then the keys are selected beck with that batchid.
+       */
+      boolean genKeySupport = TxnDbUtil.supportsGetGeneratedKeys(dbProduct);
+      genKeySupport = genKeySupport || (numTxns == 1);
+
+      String insertQuery = String.format(TXNS_INSERT_QRY, TxnDbUtil.getEpochFn(dbProduct),
+          TxnDbUtil.getEpochFn(dbProduct));
+      LOG.debug("Going to execute insert <" + insertQuery + ">");
+      try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new String[] {"TXN_ID"})) {
+        String state = genKeySupport ? Character.toString(TXN_OPEN) : Character.toString(TXN_TMP);
+        if (numTxns == 1) {
+          ps.setString(1, state);
+          ps.setString(2, rqst.getUser());
+          ps.setString(3, rqst.getHostname());
+          ps.setInt(4, txnType.getValue());
+          txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, false));
+        } else {
+          for (int i = 0; i < numTxns; ++i) {
+            ps.setString(1, state);
+            ps.setString(2, rqst.getUser());
+            ps.setString(3, rqst.getHostname());
+            ps.setInt(4, txnType.getValue());
+            ps.addBatch();
+
+            if ((i + 1) % maxBatchSize == 0) {
+              txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, true));
+            }
+          }
+          if (numTxns % maxBatchSize != 0) {
+            txnIds.addAll(executeTxnInsertBatchAndExtractGeneratedKeys(dbConn, genKeySupport, ps, true));
+          }
+        }
       }
 
+      assert txnIds.size() == numTxns;
+
       if (rqst.isSetReplPolicy()) {
-        List<String> rowsRepl = new ArrayList<>();
-        for (PreparedStatement pst : insertPreparedStmts) {
-          pst.close();
-        }
-        insertPreparedStmts.clear();
-        params.clear();
-        paramsList.clear();
+        List<String> rowsRepl = new ArrayList<>(numTxns);
+        List<String> params = new ArrayList<>(1);
+        List<List<String>> paramsList = new ArrayList<>(numTxns);
         params.add(rqst.getReplPolicy());
         for (int i = 0; i < numTxns; i++) {
-          rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+          rowsRepl.add("?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
           paramsList.add(params);
         }
 
@@ -687,10 +815,125 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           pst.close();
         }
       }
-      close(rs);
     }
   }
 
+  private List<Long> executeTxnInsertBatchAndExtractGeneratedKeys(Connection dbConn, boolean genKeySupport,
+      PreparedStatement ps, boolean batch) throws SQLException {
+    List<Long> txnIds = new ArrayList<>();
+    if (batch) {
+      ps.executeBatch();
+    } else {
+      // For slight performance advantage we do not use the executeBatch, when we only have one row
+      ps.execute();
+    }
+    if (genKeySupport) {
+      try (ResultSet generatedKeys = ps.getGeneratedKeys()) {
+        while (generatedKeys.next()) {
+          txnIds.add(generatedKeys.getLong(1));
+        }
+      }
+    } else {
+      try (PreparedStatement pstmt =
+          dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = ?")) {
+        pstmt.setString(1, Character.toString(TXN_TMP));
+        try (ResultSet rs = pstmt.executeQuery()) {
+          while (rs.next()) {
+            txnIds.add(rs.getLong(1));
+          }
+        }
+      }
+      try (PreparedStatement pstmt = dbConn
+          .prepareStatement("UPDATE \"TXNS\" SET \"TXN_STATE\" = ? WHERE \"TXN_STATE\" = ?")) {
+        pstmt.setString(1, Character.toString(TXN_OPEN));
+        pstmt.setString(2, Character.toString(TXN_TMP));
+        pstmt.executeUpdate();
+      }
+    }
+    return txnIds;
+  }
+
+  private void deleteInvalidOpenTransactions(Connection dbConn, List<Long> txnIds) throws MetaException {
+    if (txnIds.size() == 0) {
+      return;
+    }
+    try {
+      Statement stmt = null;
+      try {
+        stmt = dbConn.createStatement();
+
+        List<String> queries = new ArrayList<>();
+        StringBuilder prefix = new StringBuilder();
+        StringBuilder suffix = new StringBuilder();
+        prefix.append("DELETE FROM \"TXNS\" WHERE ");
+        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnIds, "\"TXN_ID\"", false, false);
+        for (String s : queries) {
+          LOG.debug("Going to execute update <" + s + ">");
+          stmt.executeUpdate(s);
+        }
+      } catch (SQLException e) {
+        LOG.debug("Going to rollback");
+        rollbackDBConn(dbConn);
+        checkRetryable(dbConn, e, "deleteInvalidOpenTransactions(" + txnIds + ")");
+        throw new MetaException("Unable to select from transaction database " + StringUtils.stringifyException(e));
+      } finally {
+        closeStmt(stmt);
+      }
+    } catch (RetryException ex) {
+      deleteInvalidOpenTransactions(dbConn, txnIds);
+    }
+  }
+
+  @Override
+  public long getOpenTxnTimeOutMillis() {
+    return openTxnTimeOutMillis;
+  }
+
+  @Override
+  public void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis) {
+    this.openTxnTimeOutMillis = openTxnTimeOutMillis;
+  }
+
+  protected long getOpenTxnTimeoutLowBoundaryTxnId(Connection dbConn) throws MetaException, SQLException {
+    long maxTxnId;
+    String s =
+        "SELECT MAX(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STARTED\" < (" + TxnDbUtil.getEpochFn(dbProduct) + " - "
+            + openTxnTimeOutMillis + ")";
+    try (Statement stmt = dbConn.createStatement()) {
+      LOG.debug("Going to execute query <" + s + ">");
+      try (ResultSet maxTxnIdRs = stmt.executeQuery(s)) {
+        maxTxnIdRs.next();
+        maxTxnId = maxTxnIdRs.getLong(1);
+        if (maxTxnIdRs.wasNull()) {
+          /*
+           * TXNS always contains at least one transaction,
+           * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted
+           */
+          throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)");
+        }
+      }
+    }
+    return maxTxnId;
+  }
+
+  private long getHighWaterMark(Statement stmt) throws SQLException, MetaException {
+    String s = "SELECT MAX(\"TXN_ID\") FROM \"TXNS\"";
+    LOG.debug("Going to execute query <" + s + ">");
+    long maxOpenTxnId;
+    try (ResultSet maxOpenTxnIdRs = stmt.executeQuery(s)) {
+      maxOpenTxnIdRs.next();
+      maxOpenTxnId = maxOpenTxnIdRs.getLong(1);
+      if (maxOpenTxnIdRs.wasNull()) {
+        /*
+         * TXNS always contains at least one transaction,
+         * the row where txnid = (select max(txnid) where txn_started < epoch - TXN_OPENTXN_TIMEOUT) is never deleted
+         */
+        throw new MetaException("Transaction tables not properly " + "initialized, null record found in MAX(TXN_ID)");
+      }
+    }
+    return maxOpenTxnId;
+  }
+
   private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Connection dbConn)
           throws SQLException {
     PreparedStatement pst = null;
@@ -716,7 +959,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       }
       LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString());
       return targetTxnIdList;
-    }  catch (SQLException e) {
+    } catch (SQLException e) {
       LOG.warn("failed to get target txn ids " + e.getMessage());
       throw e;
     } finally {
@@ -1159,7 +1402,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            * even if it includes all of its columns
            *
            * First insert into write_set using a temporary commitID, which will be updated in a separate call,
-           * see: {@link #updateCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}.
+           * see: {@link #updateWSCommitIdAndCleanUpMetadata(Statement, long, TxnType, Long, long)}}.
            * This should decrease the scope of the S4U lock on the next_txn_id table.
            */
           Savepoint undoWriteSetForCurrentTxn = dbConn.setSavepoint();
@@ -1173,13 +1416,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            * at the same time and no new txns start until all 3 commit.
            * We could've incremented the sequence for commitId as well but it doesn't add anything functionally.
            */
-          try (ResultSet commitIdRs = stmt.executeQuery(sqlGenerator.addForUpdateClause("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\""))) {
-            if (!commitIdRs.next()) {
-              throw new IllegalStateException("No rows found in NEXT_TXN_ID");
-            }
-            commitId = commitIdRs.getLong(1);
-          }
-
+          acquireTxnLock(stmt, false);
+          commitId = getHighWaterMark(stmt);
           /**
            * see if there are any overlapping txns that wrote the same element, i.e. have a conflict
            * Since entire commit operation is mutexed wrt other start/commit ops,
@@ -1211,9 +1449,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               throw new TxnAbortedException(msg);
             }
           }
-        }
-        else {
-          /**
+        } else {
+          /*
            * current txn didn't update/delete anything (may have inserted), so just proceed with commit
            *
            * We only care about commit id for write txns, so for RO (when supported) txns we don't
@@ -1223,6 +1460,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            * If RO < W, then there is no reads-from relationship.
            * In replication flow we don't expect any write write conflict as it should have been handled at source.
            */
+          assert true;
         }
 
         if (txnRecord.type != TxnType.READ_ONLY && !rqst.isSetReplPolicy()) {
@@ -1253,7 +1491,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
           deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
-        updateCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId);
+        updateWSCommitIdAndCleanUpMetadata(stmt, txnid, txnRecord.type, commitId, tempCommitId);
         if (rqst.isSetKeyValue()) {
           updateKeyValueAssociatedWithTxn(rqst, stmt);
         }
@@ -1272,8 +1510,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
-        closeDbConn(dbConn);
+        close(null, stmt, dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -1338,7 +1575,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
-  private void updateCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType, Long commitId, long tempId) throws SQLException {
+  private void updateWSCommitIdAndCleanUpMetadata(Statement stmt, long txnid, TxnType txnType,
+      Long commitId, long tempId) throws SQLException {
     List<String> queryBatch = new ArrayList<>(5);
     // update write_set with real commitId
     if (commitId != null) {
@@ -1350,7 +1588,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       queryBatch.add("DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid);
     }
     queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
-    queryBatch.add("DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid);
+    // DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate
+    queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid);
     queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
 
     // execute all in one batch
@@ -1431,7 +1670,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (numAbortedWrites > 0) {
           // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted.
-          List<Long> txnIds = openTxns(dbConn, stmt,
+          // We don't use the txnLock, all of these transactions will be aborted in this one rdbm transaction
+          // So they will not effect the commitTxn in any way
+          List<Long> txnIds = openTxns(dbConn,
                   new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName()));
           assert(numAbortedWrites == txnIds.size());
 
@@ -1491,7 +1732,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
         closeStmt(pStmt);
         close(rs, stmt, dbConn);
-        if(handle != null) {
+        if (handle != null) {
           handle.releaseLocks();
         }
         unlockInternal();
@@ -1533,7 +1774,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       String[] names = TxnUtils.getDbTableName(fullTableName);
       assert names.length == 2;
       List<String> params = Arrays.asList(names[0], names[1]);
-      String s = "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND \"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = " + writeId;
+      String s =
+          "SELECT \"T2W_TXNID\" FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_DATABASE\" = ? AND "
+              + "\"T2W_TABLE\" = ? AND \"T2W_WRITEID\" = "+ writeId;
       pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
       LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">", quoteString(names[0]),
               quoteString(names[1]));
@@ -2001,46 +2244,37 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   @Override
   @RetrySemantics.SafeToRetry
-  public void performWriteSetGC() {
+  public void performWriteSetGC() throws MetaException {
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet rs = null;
     try {
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
       stmt = dbConn.createStatement();
-      rs = stmt.executeQuery("SELECT \"NTXN_NEXT\" - 1 FROM \"NEXT_TXN_ID\"");
-      if(!rs.next()) {
-        throw new IllegalStateException("NEXT_TXN_ID is empty: DB is corrupted");
-      }
-      long highestAllocatedTxnId = rs.getLong(1);
-      close(rs);
+
+      long minOpenTxn;
       rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN));
-      if(!rs.next()) {
+      if (!rs.next()) {
         throw new IllegalStateException("Scalar query returned no rows?!?!!");
       }
-      long commitHighWaterMark;//all currently open txns (if any) have txnid >= than commitHighWaterMark
-      long lowestOpenTxnId = rs.getLong(1);
-      if(rs.wasNull()) {
-        //if here then there are no Open txns and  highestAllocatedTxnId must be
-        //resolved (i.e. committed or aborted), either way
-        //there are no open txns with id <= highestAllocatedTxnId
-        //the +1 is there because "delete ..." below has < (which is correct for the case when
-        //there is an open txn
-        //Concurrency: even if new txn starts (or starts + commits) it is still true that
-        //there are no currently open txns that overlap with any committed txn with
-        //commitId <= commitHighWaterMark (as set on next line).  So plain READ_COMMITTED is enough.
-        commitHighWaterMark = highestAllocatedTxnId + 1;
-      }
-      else {
-        commitHighWaterMark = lowestOpenTxnId;
+      minOpenTxn = rs.getLong(1);
+      if (rs.wasNull()) {
+        minOpenTxn = Long.MAX_VALUE;
       }
+      long lowWaterMark = getOpenTxnTimeoutLowBoundaryTxnId(dbConn);
+      /**
+       * We try to find the highest transactionId below everything was committed or aborted.
+       * For that we look for the lowest open transaction in the TXNS and the TxnMinTimeout boundary,
+       * because it is guaranteed there won't be open transactions below that.
+       */
+      long commitHighWaterMark = Long.min(minOpenTxn, lowWaterMark + 1);
+      LOG.debug("Perform WriteSet GC with minOpenTxn {}, lowWaterMark {}", minOpenTxn, lowWaterMark);
       int delCnt = stmt.executeUpdate("DELETE FROM \"WRITE_SET\" WHERE \"WS_COMMIT_ID\" < " + commitHighWaterMark);
       LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt);
       dbConn.commit();
     } catch (SQLException ex) {
       LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
-    }
-    finally {
+    } finally {
       close(rs, stmt, dbConn);
     }
   }
@@ -4154,7 +4388,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   private void checkQFileTestHack(){
     boolean hackOn = MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEST) ||
-      MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST);
+        MetastoreConf.getBoolVar(conf, ConfVars.HIVE_IN_TEZ_TEST);
     if (hackOn) {
       LOG.info("Hacking in canned values for transaction manager");
       // Set up the transaction/locking db in the derby metastore
@@ -4525,7 +4759,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   }
 
   /**
-   * Returns the state of the transaction iff it's able to determine it.  Some cases where it cannot:
+   * Returns the state of the transaction if it's able to determine it. Some cases where it cannot:
    * 1. txnid was Aborted/Committed and then GC'd (compacted)
    * 2. txnid was committed but it didn't modify anything (nothing in COMPLETED_TXN_COMPONENTS)
    */
@@ -4550,6 +4784,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       if (txnState == TXN_ABORTED) {
         return TxnStatus.ABORTED;
       }
+      if (txnState == TXN_COMMITTED) {
+        return TxnStatus.COMMITTED;
+      }
       assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN";
     }
     return TxnStatus.OPEN;
@@ -4929,11 +5166,15 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   static boolean isRetryable(Configuration conf, Exception ex) {
     if(ex instanceof SQLException) {
       SQLException sqlException = (SQLException)ex;
-      if("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
+      if (MANUAL_RETRY.equalsIgnoreCase(sqlException.getSQLState())) {
+        // Manual retry exception was thrown
+        return true;
+      }
+      if ("08S01".equalsIgnoreCase(sqlException.getSQLState())) {
         //in MSSQL this means Communication Link Failure
         return true;
       }
-      if("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
+      if ("ORA-08176".equalsIgnoreCase(sqlException.getSQLState()) ||
         sqlException.getMessage().contains("consistent read failure; rollback data not available")) {
         return true;
       }
@@ -5112,10 +5353,25 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       return acquireLock(key);
     }
   }
+
+  @Override
   public void acquireLock(String key, LockHandle handle) {
     //the idea is that this will use LockHandle.dbConn
     throw new NotImplementedException("acquireLock(String, LockHandle) is not implemented");
   }
+
+  /**
+   * Acquire the global txn lock, used to mutex the openTxn and commitTxn.
+   * @param stmt Statement to execute the lock on
+   * @param shared either SHARED_READ or EXCLUSIVE
+   * @throws SQLException
+   */
+  private void acquireTxnLock(Statement stmt, boolean shared) throws SQLException, MetaException {
+    String sqlStmt = sqlGenerator.createTxnLockStatement(shared);
+    stmt.execute(sqlStmt);
+    LOG.debug("TXN lock locked by {} in mode {}", quoteString(TxnHandler.hostname), shared);
+  }
+
   private static final class LockHandleImpl implements LockHandle {
     private final Connection dbConn;
     private final Statement stmt;
@@ -5152,6 +5408,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+
   private static class NoPoolConnectionPool implements DataSource {
     // Note that this depends on the fact that no-one in this class calls anything but
     // getConnection.  If you want to use any of the Logger or wrap calls you'll have to
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 87130a5..e8ac71a 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -44,7 +44,7 @@ public interface TxnStore extends Configurable {
   /**
    * Prefix for key when committing with a key/value.
    */
-  public static final String TXN_KEY_START = "_meta";
+  String TXN_KEY_START = "_meta";
 
   enum MUTEX_KEY {
     Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
@@ -380,18 +380,19 @@ public interface TxnStore extends Configurable {
 
   /**
    * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by
-   * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)).
+   * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)).
    */
   @RetrySemantics.SafeToRetry
   void cleanTxnToWriteIdTable() throws MetaException;
 
   /**
-   * Clean up aborted transactions from txns that have no components in txn_components.  The reson such
-   * txns exist can be that now work was done in this txn (e.g. Streaming opened TransactionBatch and
-   * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called.
+   * Clean up aborted or committed transactions from txns that have no components in txn_components.  The reason such
+   * txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and
+   * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called,
+   * or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window.
    */
   @RetrySemantics.SafeToRetry
-  void cleanEmptyAbortedTxns() throws MetaException;
+  void cleanEmptyAbortedAndCommittedTxns() throws MetaException;
 
   /**
    * This will take all entries assigned to workers
@@ -442,7 +443,7 @@ public interface TxnStore extends Configurable {
    * transaction metadata once it becomes unnecessary.  
    */
   @RetrySemantics.SafeToRetry
-  void performWriteSetGC();
+  void performWriteSetGC() throws MetaException;
 
   /**
    * Determine if there are enough consecutive failures compacting a table or partition that no
@@ -461,6 +462,12 @@ public interface TxnStore extends Configurable {
   @VisibleForTesting
   long setTimeout(long milliseconds);
 
+  @VisibleForTesting
+  long getOpenTxnTimeOutMillis();
+
+  @VisibleForTesting
+  void setOpenTxnTimeOutMillis(long openTxnTimeOutMillis);
+
   @RetrySemantics.Idempotent
   MutexAPI getMutexAPI();
 
@@ -473,7 +480,7 @@ public interface TxnStore extends Configurable {
    */
   interface MutexAPI {
     /**
-     * The {@code key} is name of the lock. Will acquire and exclusive lock or block.  It retuns
+     * The {@code key} is name of the lock. Will acquire an exclusive lock or block.  It returns
      * a handle which must be used to release the lock.  Each invocation returns a new handle.
      */
     LockHandle acquireLock(String key) throws MetaException;
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
index 366b6f0..727abce 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql
@@ -234,9 +234,8 @@ CREATE TABLE "APP"."CTLGS" (
     "CREATE_TIME" INTEGER);
 
 -- Insert a default value.  The location is TBD.  Hive will fix this when it starts
-INSERT INTO "APP"."CTLGS"
- ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME")
- VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL);
+INSERT INTO "APP"."CTLGS" ("CTLG_ID", "NAME", "DESC", "LOCATION_URI", "CREATE_TIME")
+VALUES (1, 'hive', 'Default catalog for Hive', 'TBD', NULL);
 
 -- ----------------------------------------------
 -- DML Statements
@@ -524,7 +523,7 @@ ALTER TABLE "APP"."SDS" ADD CONSTRAINT "SQL110318025505550" CHECK (IS_COMPRESSED
 -- Transaction and Lock Tables
 -- ----------------------------
 CREATE TABLE TXNS (
-  TXN_ID bigint PRIMARY KEY,
+  TXN_ID bigint PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
   TXN_STATE char(1) NOT NULL,
   TXN_STARTED bigint NOT NULL,
   TXN_LAST_HEARTBEAT bigint NOT NULL,
@@ -536,6 +535,9 @@ CREATE TABLE TXNS (
   TXN_TYPE integer
 );
 
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  VALUES(0, 'c', 0, 0, '', '');
+
 CREATE TABLE TXN_COMPONENTS (
   TC_TXNID bigint NOT NULL REFERENCES TXNS (TXN_ID),
   TC_DATABASE varchar(128) NOT NULL,
@@ -559,10 +561,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
 
-CREATE TABLE NEXT_TXN_ID (
-  NTXN_NEXT bigint NOT NULL
+CREATE TABLE TXN_LOCK_TBL (
+  TXN_LOCK bigint NOT NULL
 );
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
 
 CREATE TABLE HIVE_LOCKS (
   HL_LOCK_EXT_ID bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
index 8a3cd56..db2f43d 100644
--- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql
@@ -68,5 +68,19 @@ ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000);
 ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
 DROP TABLE MIN_HISTORY_LEVEL;
 
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  VALUES (1000000000, 'c', 0, 0, '_', '_');
+ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP bigint;
+UPDATE TXNS SET TXN_ID_TMP=TXN_ID;
+ALTER TABLE TXNS DROP COLUMN TXN_ID;
+ALTER TABLE TXNS ADD COLUMN TXN_ID BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY (START WITH 1000000001, INCREMENT BY 1);
+UPDATE TXNS SET TXN_ID=TXN_ID_TMP;
+ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP;
+
+RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL;
+
 -- This needs to be the last thing done.  Insert any changes above this line.
 UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
index 2e01777..6906bdf 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql
@@ -1096,28 +1096,31 @@ CREATE TABLE NEXT_LOCK_ID(
 
 INSERT INTO NEXT_LOCK_ID VALUES(1);
 
-CREATE TABLE NEXT_TXN_ID(
-	NTXN_NEXT bigint NOT NULL
+CREATE TABLE TXN_LOCK_TBL(
+    TXN_LOCK bigint NOT NULL
 );
 
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
 
 CREATE TABLE TXNS(
-	TXN_ID bigint NOT NULL,
-	TXN_STATE char(1) NOT NULL,
-	TXN_STARTED bigint NOT NULL,
-	TXN_LAST_HEARTBEAT bigint NOT NULL,
-	TXN_USER nvarchar(128) NOT NULL,
-	TXN_HOST nvarchar(128) NOT NULL,
+    TXN_ID bigint NOT NULL IDENTITY(1,1),
+    TXN_STATE char(1) NOT NULL,
+    TXN_STARTED bigint NOT NULL,
+    TXN_LAST_HEARTBEAT bigint NOT NULL,
+    TXN_USER nvarchar(128) NOT NULL,
+    TXN_HOST nvarchar(128) NOT NULL,
     TXN_AGENT_INFO nvarchar(128) NULL,
     TXN_META_INFO nvarchar(128) NULL,
     TXN_HEARTBEAT_COUNT int NULL,
     TXN_TYPE int NULL,
 PRIMARY KEY CLUSTERED
 (
-	TXN_ID ASC
+    TXN_ID ASC
 )
 );
+SET IDENTITY_INSERT TXNS ON;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  VALUES(0, 'c', 0, 0, '', '');
 
 CREATE TABLE TXN_COMPONENTS(
 	TC_TXNID bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
index 9f39515..7583c5c 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql
@@ -68,9 +68,56 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E
 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000);
 
 -- HIVE-23107
-ALTER TABLE COMPACTION_QUEUE bigint CQ_NEXT_TXN_ID NOT NULL;
+ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint NOT NULL;
 DROP TABLE MIN_HISTORY_LEVEL;
 
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS;
+
+CREATE TABLE TMP_TXNS(
+    TXN_ID bigint NOT NULL IDENTITY(1,1),
+    TXN_STATE char(1) NOT NULL,
+    TXN_STARTED bigint NOT NULL,
+    TXN_LAST_HEARTBEAT bigint NOT NULL,
+    TXN_USER nvarchar(128) NOT NULL,
+    TXN_HOST nvarchar(128) NOT NULL,
+    TXN_AGENT_INFO nvarchar(128) NULL,
+    TXN_META_INFO nvarchar(128) NULL,
+    TXN_HEARTBEAT_COUNT int NULL,
+    TXN_TYPE int NULL,
+PRIMARY KEY CLUSTERED
+(
+   TXN_ID ASC
+)
+);
+
+SET IDENTITY_INSERT TMP_TXNS ON;
+INSERT INTO TMP_TXNS (TXN_ID,TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE)
+SELECT TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST, TXN_AGENT_INFO, TXN_META_INFO, TXN_HEARTBEAT_COUNT, TXN_TYPE FROM TXNS TABLOCKX;
+
+SET IDENTITY_INSERT TMP_TXNS OFF;
+
+CREATE TABLE TMP_TXN_COMPONENTS(
+    TC_TXNID bigint NOT NULL,
+    TC_DATABASE nvarchar(128) NOT NULL,
+    TC_TABLE nvarchar(128) NULL,
+    TC_PARTITION nvarchar(767) NULL,
+    TC_OPERATION_TYPE char(1) NOT NULL,
+    TC_WRITEID bigint
+);
+INSERT INTO TMP_TXN_COMPONENTS SELECT * FROM TXN_COMPONENTS;
+
+DROP TABLE TXN_COMPONENTS;
+DROP TABLE TXNS;
+
+Exec sp_rename 'TMP_TXNS', 'TXNS';
+Exec sp_rename 'TMP_TXN_COMPONENTS', 'TXN_COMPONENTS';
+Exec sp_rename 'NEXT_TXN_ID', 'TXN_LOCK_TBL';
+
+ALTER TABLE TXN_COMPONENTS WITH CHECK ADD FOREIGN KEY(TC_TXNID) REFERENCES TXNS (TXN_ID);
+CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID);
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
index 0512a45..b7f423c 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql
@@ -989,7 +989,7 @@ CREATE TABLE IF NOT EXISTS WM_MAPPING
 -- Transaction and Lock Tables
 -- ----------------------------
 CREATE TABLE TXNS (
-  TXN_ID bigint PRIMARY KEY,
+  TXN_ID bigint PRIMARY KEY AUTO_INCREMENT,
   TXN_STATE char(1) NOT NULL,
   TXN_STARTED bigint NOT NULL,
   TXN_LAST_HEARTBEAT bigint NOT NULL,
@@ -1001,6 +1001,9 @@ CREATE TABLE TXNS (
   TXN_TYPE int
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
 
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  VALUES(0, 'c', 0, 0, '', '');
+
 CREATE TABLE TXN_COMPONENTS (
   TC_TXNID bigint NOT NULL,
   TC_DATABASE varchar(128) NOT NULL,
@@ -1025,10 +1028,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION) USING BTREE;
 
-CREATE TABLE NEXT_TXN_ID (
-  NTXN_NEXT bigint NOT NULL
+CREATE TABLE TXN_LOCK_TBL (
+  TXN_LOCK bigint NOT NULL
 ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
 
 CREATE TABLE HIVE_LOCKS (
   HL_LOCK_EXT_ID bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
index 4b82e36..78a841a 100644
--- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql
@@ -72,6 +72,23 @@ ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET l
 ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint;
 DROP TABLE MIN_HISTORY_LEVEL;
 
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  SELECT IFNULL(MAX(CTC_TXNID),0), 'c', 0, 0, '', '' FROM COMPLETED_TXN_COMPONENTS;
+ALTER TABLE TXNS ADD COLUMN TXN_ID_TMP BIGINT;
+UPDATE TXNS SET TXN_ID_TMP=TXN_ID;
+SET FOREIGN_KEY_CHECKS = 0;
+ALTER TABLE TXNS MODIFY TXN_ID BIGINT AUTO_INCREMENT;
+SET FOREIGN_KEY_CHECKS = 1;
+UPDATE TXNS SET TXN_ID=TXN_ID_TMP;
+ALTER TABLE TXNS DROP COLUMN TXN_ID_TMP;
+SELECT MAX(TXN_ID) + 1 INTO @AutoInc FROM TXNS;
+SET @s:=CONCAT('ALTER TABLE TXNS AUTO_INCREMENT=', @AutoInc);
+PREPARE stmt FROM @s;
+EXECUTE stmt;
+DEALLOCATE PREPARE stmt;
+RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE;
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
index db398e5..0082dcd 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql
@@ -972,7 +972,7 @@ ALTER TABLE MV_TABLES_USED ADD CONSTRAINT MV_TABLES_USED_FK2 FOREIGN KEY (TBL_ID
 -- Transaction and lock tables
 ------------------------------
 CREATE TABLE TXNS (
-  TXN_ID NUMBER(19) PRIMARY KEY,
+  TXN_ID NUMBER(19) GENERATED BY DEFAULT AS IDENTITY PRIMARY KEY,
   TXN_STATE char(1) NOT NULL,
   TXN_STARTED NUMBER(19) NOT NULL,
   TXN_LAST_HEARTBEAT NUMBER(19) NOT NULL,
@@ -984,6 +984,9 @@ CREATE TABLE TXNS (
   TXN_TYPE number(10)
 ) ROWDEPENDENCIES;
 
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  VALUES(0, 'c', 0, 0, '_', '_');
+
 CREATE TABLE TXN_COMPONENTS (
   TC_TXNID NUMBER(19) NOT NULL REFERENCES TXNS (TXN_ID),
   TC_DATABASE VARCHAR2(128) NOT NULL,
@@ -1007,10 +1010,10 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS (
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION);
 
-CREATE TABLE NEXT_TXN_ID (
-  NTXN_NEXT NUMBER(19) NOT NULL
+CREATE TABLE TXN_LOCK_TBL (
+  TXN_LOCK NUMBER(19) NOT NULL
 );
-INSERT INTO NEXT_TXN_ID VALUES(1);
+INSERT INTO TXN_LOCK_TBL VALUES(1);
 
 CREATE TABLE HIVE_LOCKS (
   HL_LOCK_EXT_ID NUMBER(19) NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
index 1be83fc..51f52a4 100644
--- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql
@@ -72,6 +72,21 @@ ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL;
 ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID NUMBER(19);
 DROP TABLE MIN_HISTORY_LEVEL;
 
+-- HIVE-23048
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  SELECT COALESCE(MAX(CTC_TXNID),0), 'c', 0, 0, '_', '_' FROM COMPLETED_TXN_COMPONENTS;
+INSERT INTO TXNS (TXN_ID, TXN_STATE, TXN_STARTED, TXN_LAST_HEARTBEAT, TXN_USER, TXN_HOST)
+  VALUES (1000000000, 'c', 0, 0, '_', '_');
+-- DECLARE max_txn NUMBER;
+-- BEGIN
+--    SELECT MAX(TXN_ID) + 1 INTO max_txn FROM TXNS;
+--    EXECUTE IMMEDIATE 'CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH ' || max_txn || ' CACHE 20';
+-- END;
+CREATE SEQUENCE TXNS_TXN_ID_SEQ INCREMENT BY 1 START WITH 1000000001 CACHE 20;
+ALTER TABLE TXNS MODIFY TXN_ID default TXNS_TXN_ID_SEQ.nextval;
+
+RENAME TABLE NEXT_TXN_ID TO TXN_LOCK_TBL;
+
 -- These lines need to be last.  Insert any changes above.
 UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual;
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
index e6e3016..717e707 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql
@@ -1658,7 +1658,7 @@ GRANT ALL ON SCHEMA public TO PUBLIC;
 -- Transaction and lock tables
 ------------------------------
 CREATE TABLE "TXNS" (
-  "TXN_ID" bigint PRIMARY KEY,
+  "TXN_ID" bigserial PRIMARY KEY,
   "TXN_STATE" char(1) NOT NULL,
   "TXN_STARTED" bigint NOT NULL,
   "TXN_LAST_HEARTBEAT" bigint NOT NULL,
@@ -1669,6 +1669,8 @@ CREATE TABLE "TXNS" (
   "TXN_HEARTBEAT_COUNT" integer,
   "TXN_TYPE" integer
 );
+INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST")
+  VALUES(0, 'c', 0, 0, '', '');
 
 CREATE TABLE "TXN_COMPONENTS" (
   "TC_TXNID" bigint NOT NULL REFERENCES "TXNS" ("TXN_ID"),
@@ -1693,10 +1695,10 @@ CREATE TABLE "COMPLETED_TXN_COMPONENTS" (
 
 CREATE INDEX COMPLETED_TXN_COMPONENTS_INDEX ON "COMPLETED_TXN_COMPONENTS" USING btree ("CTC_DATABASE", "CTC_TABLE", "CTC_PARTITION");
 
-CREATE TABLE "NEXT_TXN_ID" (
-  "NTXN_NEXT" bigint NOT NULL
+CREATE TABLE "TXN_LOCK_TBL" (
+  "TXN_LOCK" bigint NOT NULL
 );
-INSERT INTO "NEXT_TXN_ID" VALUES(1);
+INSERT INTO "TXN_LOCK_TBL" VALUES(1);
 
 CREATE TABLE "HIVE_LOCKS" (
   "HL_LOCK_EXT_ID" bigint NOT NULL,
diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
index b90cecb..63a5c44 100644
--- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
+++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql
@@ -203,6 +203,15 @@ ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000);
 ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NEXT_TXN_ID" bigint;
 DROP TABLE "MIN_HISTORY_LEVEL";
 
+-- HIVE-23048
+INSERT INTO "TXNS" ("TXN_ID", "TXN_STATE", "TXN_STARTED", "TXN_LAST_HEARTBEAT", "TXN_USER", "TXN_HOST")
+  SELECT COALESCE(MAX("CTC_TXNID"),0), 'c', 0, 0, '', '' FROM "COMPLETED_TXN_COMPONENTS";
+CREATE SEQUENCE "TXNS_TXN_ID_SEQ" MINVALUE 0 OWNED BY "TXNS"."TXN_ID";
+select setval('"TXNS_TXN_ID_SEQ"',  (SELECT MAX("TXN_ID") FROM "TXNS"));
+ALTER TABLE "TXNS" ALTER "TXN_ID" SET DEFAULT nextval('"TXNS_TXN_ID_SEQ"');
+
+ALTER TABLE "NEXT_TXN_ID" RENAME TO "TXN_LOCK_TBL";
+
 -- These lines need to be last. Insert any changes above.
 UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1;
 SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java
index c1a1629..2ebba39 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/DbInstallBase.java
@@ -36,6 +36,7 @@ public abstract class DbInstallBase {
     Assert.assertEquals(0, getRule().createUser());
     Assert.assertEquals(0, getRule().installAVersion(FIRST_VERSION));
     Assert.assertEquals(0, getRule().upgradeToLatest());
+    Assert.assertEquals(0, getRule().validateSchema());
   }
 
   protected abstract DatabaseRule getRule();
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java
index a6d22d1..e06011f 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/DatabaseRule.java
@@ -68,11 +68,11 @@ public abstract class DatabaseRule extends ExternalResource {
   public DatabaseRule setVerbose(boolean verbose) {
     this.verbose = verbose;
     return this;
-  };
+  }
 
   public String getDb() {
     return HIVE_DB;
-  };
+  }
 
   /**
    * URL to use when connecting as root rather than Hive
@@ -147,7 +147,7 @@ public abstract class DatabaseRule extends ExternalResource {
 
   protected String getDockerContainerName(){
     return String.format("metastore-test-%s-install", getDbType());
-  };
+  }
 
   private ProcessResults runCmd(String[] cmd, long secondsToWait)
       throws IOException, InterruptedException {
@@ -275,7 +275,7 @@ public abstract class DatabaseRule extends ExternalResource {
         "-dbType",
         getDbType(),
         "-userName",
-        HIVE_USER,
+        getHiveUser(),
         "-passWord",
         getHivePassword(),
         "-url",
@@ -289,4 +289,20 @@ public abstract class DatabaseRule extends ExternalResource {
     createUser();
     installLatest();
   }
+
+  public int validateSchema() {
+    return new MetastoreSchemaTool().setVerbose(verbose).run(buildArray(
+        "-validate",
+        "-dbType",
+        getDbType(),
+        "-userName",
+        getHiveUser(),
+        "-passWord",
+        getHivePassword(),
+        "-url",
+        getJdbcUrl(),
+        "-driver",
+        getJdbcDriver()
+    ));
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java
index 0b070e1..21c5de1 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/dbinstall/rules/Oracle.java
@@ -24,7 +24,7 @@ public class Oracle extends DatabaseRule {
 
   @Override
   public String getDockerImageName() {
-    return "orangehrm/oracle-xe-11g";
+    return "pvargacl/oracle-xe-18.4.0";
   }
 
   @Override
@@ -32,10 +32,6 @@ public class Oracle extends DatabaseRule {
     return buildArray(
         "-p",
         "1521:1521",
-        "-e",
-        "DEFAULT_SYS_PASS=" + getDbRootPassword(),
-        "-e",
-        "ORACLE_ALLOW_REMOTE=true",
         "-d"
     );
   }
@@ -72,11 +68,16 @@ public class Oracle extends DatabaseRule {
 
   @Override
   public boolean isContainerReady(String logOutput) {
-    return logOutput.contains("Oracle started successfully!");
+    return logOutput.contains("DATABASE IS READY TO USE!");
   }
 
   @Override
   public String getHivePassword() {
     return HIVE_PASSWORD;
   }
+
+  @Override
+  public String getHiveUser() {
+    return "c##"+ super.getHiveUser();
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java
new file mode 100644
index 0000000..8e1794a
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestOpenTxn.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProvider;
+import org.apache.hadoop.hive.metastore.datasource.DataSourceProviderFactory;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * Test openTxn and getOpenTxnList calls on TxnStore.
+ */
+public class TestOpenTxn {
+
+  private Configuration conf = MetastoreConf.newMetastoreConf();
+  private TxnStore txnHandler;
+
+  @Before
+  public void setUp() throws Exception {
+    // This will init the metastore db
+    txnHandler = TxnUtils.getTxnStore(conf);
+    TxnDbUtil.prepDb(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    TxnDbUtil.cleanDb(conf);
+  }
+
+  @Test
+  public void testSingleOpen() throws MetaException {
+    OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+    long txnId = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+    Assert.assertEquals(1, txnId);
+  }
+
+  @Test
+  public void testGap() throws Exception {
+    OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+    txnHandler.openTxns(openTxnRequest);
+    long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+    deleteTransaction(second);
+    txnHandler.openTxns(openTxnRequest);
+    GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+    Assert.assertEquals(3, openTxns.getOpen_txnsSize());
+
+  }
+
+  @Test
+  public void testGapWithOldOpen() throws Exception {
+    OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+    txnHandler.openTxns(openTxnRequest);
+    Thread.sleep(1000);
+    long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+    deleteTransaction(second);
+    txnHandler.openTxns(openTxnRequest);
+    GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+    Assert.assertEquals(3, openTxns.getOpen_txnsSize());
+  }
+
+  @Test
+  public void testGapWithOldCommit() throws Exception {
+    OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+    long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+    txnHandler.commitTxn(new CommitTxnRequest(first));
+    long second = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+    deleteTransaction(second);
+    txnHandler.openTxns(openTxnRequest);
+    GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+    Assert.assertEquals(2, openTxns.getOpen_txnsSize());
+  }
+
+  @Test
+  public void testMultiGapWithOldCommit() throws Exception {
+    OpenTxnRequest openTxnRequest = new OpenTxnRequest(1, "me", "localhost");
+    long first = txnHandler.openTxns(openTxnRequest).getTxn_ids().get(0);
+    txnHandler.commitTxn(new CommitTxnRequest(first));
+    long second = txnHandler.openTxns(new OpenTxnRequest(10, "me", "localhost")).getTxn_ids().get(0);
+    deleteTransaction(second, second + 9);
+    txnHandler.openTxns(openTxnRequest);
+    GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+    Assert.assertEquals(11, openTxns.getOpen_txnsSize());
+  }
+
+  private void deleteTransaction(long txnId) throws SQLException {
+    deleteTransaction(txnId, txnId);
+  }
+
+  private void deleteTransaction(long minTxnId, long maxTxnId) throws SQLException {
+    DataSourceProvider dsp = DataSourceProviderFactory.tryGetDataSourceProviderOrNull(conf);
+    DataSource ds = dsp.create(conf);
+    Connection dbConn = ds.getConnection();
+    Statement stmt = dbConn.createStatement();
+    stmt.executeUpdate("DELETE FROM TXNS WHERE TXN_ID >=" + minTxnId + " AND TXN_ID <=" + maxTxnId);
+    dbConn.commit();
+    stmt.close();
+    dbConn.close();
+  }
+
+}