You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/12/13 13:26:35 UTC

[hive] branch master updated: HIVE-22327: Repl: Ignore read-only transactions in notification log (Denys Kuzmenko reviewed by mahesh kumar behera and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 25efd17  HIVE-22327: Repl: Ignore read-only transactions in notification log (Denys Kuzmenko reviewed by mahesh kumar behera and Peter Vary)
25efd17 is described below

commit 25efd174077ab9f0dce76f7abf16faec9fbf1428
Author: denys kuzmenko <dk...@cloudera.com>
AuthorDate: Fri Dec 13 14:25:35 2019 +0100

    HIVE-22327: Repl: Ignore read-only transactions in notification log (Denys Kuzmenko reviewed by mahesh kumar behera and Peter Vary)
---
 .../hcatalog/listener/DbNotificationListener.java  |  10 ++
 .../listener/TestDbNotificationListener.java       |  85 +++++++++++++++-
 .../hadoop/hive/metastore/txn/TestTxnHandler.java  |  15 ++-
 .../hive/metastore/events/AbortTxnEvent.java       |  24 ++++-
 .../hive/metastore/events/AllocWriteIdEvent.java   |   4 +
 .../hive/metastore/events/CommitTxnEvent.java      |  22 ++++-
 .../hadoop/hive/metastore/events/OpenTxnEvent.java |  25 ++++-
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 109 +++++++++++++--------
 8 files changed, 242 insertions(+), 52 deletions(-)

diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 5f9d809..3a8cb39 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
 import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
 import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -553,6 +554,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
 
   @Override
   public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException {
+    if (openTxnEvent.getTxnType() == TxnType.READ_ONLY) {
+      return;
+    }
     int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1;
     OpenTxnMessage msg =
         MessageBuilder.getInstance().buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
@@ -571,6 +575,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
           throws MetaException {
+    if (commitTxnEvent.getTxnType() == TxnType.READ_ONLY) {
+      return;
+    }
     CommitTxnMessage msg =
         MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId());
 
@@ -588,6 +595,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
   @Override
   public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
           throws MetaException {
+    if (abortTxnEvent.getTxnType() == TxnType.READ_ONLY) {
+      return;
+    }
     AbortTxnMessage msg =
         MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId());
     NotificationEvent event =
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index edf861f..66bdee1 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -25,10 +25,10 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
 
 import java.util.concurrent.TimeUnit;
-import java.lang.reflect.Field;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
 import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
@@ -1058,6 +1059,88 @@ public class TestDbNotificationListener {
   }
 
   @Test
+  public void openTxn() throws Exception {
+    msClient.openTxn("me", TxnType.READ_ONLY);
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.openTxn("me", TxnType.DEFAULT);
+    rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 1, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
+  }
+
+  @Test
+  public void abortTxn() throws Exception {
+    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    msClient.abortTxns(Collections.singletonList(txnId1));
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.abortTxns(Collections.singletonList(txnId2));
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+  }
+
+  @Test
+  public void rollbackTxn() throws Exception {
+    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    msClient.rollbackTxn(txnId1);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.rollbackTxn(txnId2);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+  }
+
+  @Test
+  public void commitTxn() throws Exception {
+    long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+    long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+    NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    msClient.commitTxn(txnId1);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(0, rsp.getEventsSize());
+
+    msClient.commitTxn(txnId2);
+    rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+    assertEquals(1, rsp.getEventsSize());
+
+    NotificationEvent event = rsp.getEvents().get(0);
+    assertEquals(firstEventId + 2, event.getEventId());
+    assertTrue(event.getEventTime() >= startTime);
+    assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
+  }
+
+  @Test
   public void insertTable() throws Exception {
     String defaultDbName = "default";
     String tblName = "inserttbl";
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index be37b2a..01e5b82 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -210,7 +210,7 @@ public class TestTxnHandler {
       Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage());
     }
     Assert.assertTrue(gotException);
-    
+
     gotException = false;
     try {
       txnHandler.abortTxn(new AbortTxnRequest(4));
@@ -223,6 +223,19 @@ public class TestTxnHandler {
   }
 
   @Test
+  public void testAbortTxns() throws Exception {
+    OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
+    List<Long> txnList = openedTxns.getTxn_ids();
+    txnHandler.abortTxns(new AbortTxnsRequest(txnList));
+
+    GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+    assertEquals(3, txnsInfo.getOpen_txns().size());
+    txnsInfo.getOpen_txns().forEach(txn ->
+      assertEquals(TxnState.ABORTED, txn.getState())
+    );
+  }
+
+  @Test
   public void testAbortInvalidTxn() throws Exception {
     boolean caught = false;
     try {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java
index fe4b974..5e00fcc 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.events;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 
 /**
  * AbortTxnEvent
@@ -31,15 +32,25 @@ import org.apache.hadoop.hive.metastore.IHMSHandler;
 public class AbortTxnEvent extends ListenerEvent {
 
   private final Long txnId;
+  private final TxnType txnType;
+
+  public AbortTxnEvent(Long transactionId, IHMSHandler handler) {
+    this(transactionId, null, handler);
+  }
+
+  public AbortTxnEvent(Long transactionId, TxnType txnType) {
+    this(transactionId, txnType, null);
+  }
 
   /**
-   *
    * @param transactionId Unique identification for the transaction that got rolledback.
+   * @param txnType type of transaction
    * @param handler handler that is firing the event
    */
-  public AbortTxnEvent(Long transactionId, IHMSHandler handler) {
+  public AbortTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) {
     super(true, handler);
-    txnId = transactionId;
+    this.txnId = transactionId;
+    this.txnType = txnType;
   }
 
   /**
@@ -48,4 +59,11 @@ public class AbortTxnEvent extends ListenerEvent {
   public Long getTxnId() {
     return txnId;
   }
+
+  /**
+   * @return txnType
+   */
+  public TxnType getTxnType() {
+    return txnType;
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java
index 84a9a4e..2a719f2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java
@@ -36,6 +36,10 @@ public class AllocWriteIdEvent extends ListenerEvent {
   private final String tableName;
   private final String dbName;
 
+  public AllocWriteIdEvent(List<TxnToWriteId> txnToWriteIdList, String dbName, String tableName) {
+    this(txnToWriteIdList, dbName, tableName, null);
+  }
+
   public AllocWriteIdEvent(List<TxnToWriteId> txnToWriteIdList, String dbName, String tableName, IHMSHandler handler) {
     super(true, handler);
     this.txnToWriteIdList = txnToWriteIdList;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java
index ba382cd..b357dbb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.events;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.api.TxnType;
 
 /**
  * CommitTxnEvent
@@ -31,15 +32,25 @@ import org.apache.hadoop.hive.metastore.IHMSHandler;
 public class CommitTxnEvent extends ListenerEvent {
 
   private final Long txnId;
+  private final TxnType txnType;
+
+  public CommitTxnEvent(Long transactionId, IHMSHandler handler) {
+    this(transactionId, null, handler);
+  }
+
+  public CommitTxnEvent(Long transactionId, TxnType txnType) {
+    this(transactionId, txnType, null);
+  }
 
   /**
-   *
    * @param transactionId Unique identification for the transaction just got committed.
+   * @param txnType type of transaction
    * @param handler handler that is firing the event
    */
-  public CommitTxnEvent(Long transactionId, IHMSHandler handler) {
+  public CommitTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) {
     super(true, handler);
     this.txnId = transactionId;
+    this.txnType = txnType;
   }
 
   /**
@@ -48,4 +59,11 @@ public class CommitTxnEvent extends ListenerEvent {
   public Long getTxnId() {
     return txnId;
   }
+
+  /**
+   * @return txnType
+   */
+  public TxnType getTxnType() {
+    return txnType;
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java
index d935ed1..289dfd0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hive.metastore.IHMSHandler;
 import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+
 import java.util.List;
 
 /**
@@ -31,15 +33,27 @@ import java.util.List;
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public class OpenTxnEvent extends ListenerEvent {
-  private List<Long> txnIds;
+
+  private final List<Long> txnIds;
+  private final TxnType txnType;
+
+  public OpenTxnEvent(List<Long> txnIds, IHMSHandler handler) {
+    this(txnIds, null, handler);
+  }
+
+  public OpenTxnEvent(List<Long> txnIds, TxnType txnType) {
+    this(txnIds, txnType, null);
+  }
 
   /**
    * @param txnIds List of unique identification for the transaction just opened.
+   * @param txnType type of transaction
    * @param handler handler that is firing the event
    */
-  public OpenTxnEvent(List<Long> txnIds, IHMSHandler handler) {
+  public OpenTxnEvent(List<Long> txnIds, TxnType txnType, IHMSHandler handler) {
     super(true, handler);
     this.txnIds = Lists.newArrayList(txnIds);
+    this.txnType = txnType;
   }
 
   /**
@@ -48,4 +62,11 @@ public class OpenTxnEvent extends ListenerEvent {
   public List<Long> getTxnIds() {
     return txnIds;
   }
+
+  /**
+   * @return txnType
+   */
+  public TxnType getTxnType() {
+    return txnType;
+  }
 }
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9733256..64a5429 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -677,7 +677,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
       if (transactionalListeners != null) {
         MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator);
+                EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, txnType), dbConn, sqlGenerator);
       }
       return txnIds;
     } finally {
@@ -790,9 +790,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           txnid = targetTxnIds.get(0);
         }
 
-        if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
-          TxnStatus status = findTxnState(txnid,stmt);
-          if(status == TxnStatus.ABORTED) {
+        TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+        if (txnRecord == null) {
+          TxnStatus status = findTxnState(txnid, stmt);
+          if (status == TxnStatus.ABORTED) {
             if (rqst.isSetReplPolicy()) {
               // in case of replication, idempotent is taken care by getTargetTxnId
               LOG.warn("Invalid state ABORTED for transactions started using replication replay task");
@@ -804,6 +805,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           }
           raiseTxnUnexpectedState(status, txnid);
         }
+        abortTxns(dbConn, Collections.singletonList(txnid), true);
 
         if (rqst.isSetReplPolicy()) {
           deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
@@ -811,7 +813,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (transactionalListeners != null) {
           MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator);
+                  EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator);
         }
 
         LOG.debug("Going to commit");
@@ -833,24 +835,46 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   @Override
   @RetrySemantics.Idempotent
-  public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException {
-    List<Long> txnids = rqst.getTxn_ids();
+  public void abortTxns(AbortTxnsRequest rqst) throws MetaException {
+    List<Long> txnIds = rqst.getTxn_ids();
     try {
       Connection dbConn = null;
+      Statement stmt = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        int numAborted = abortTxns(dbConn, txnids, false);
-        if (numAborted != txnids.size()) {
+        stmt = dbConn.createStatement();
+
+        List<String> queries = new ArrayList<>();
+        StringBuilder prefix = new StringBuilder("select TXN_ID, TXN_TYPE from TXNS where TXN_STATE = ")
+          .append(quoteChar(TXN_OPEN)).append(" and TXN_TYPE != ").append(TxnType.READ_ONLY.getValue())
+          .append(" and ");
+
+        TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+            txnIds, "TXN_ID", false, false);
+
+        Map<Long, TxnType> nonReadOnlyTxns = new HashMap<>();
+        for (String query : queries) {
+          LOG.debug("Going to execute query<" + query + ">");
+          try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) {
+            while (rs.next()) {
+              TxnType txnType = TxnType.findByValue(rs.getInt(2));
+              nonReadOnlyTxns.put(rs.getLong(1), txnType);
+            }
+          }
+        }
+        int numAborted = abortTxns(dbConn, txnIds, false);
+        if (numAborted != txnIds.size()) {
           LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " +
-              txnids.size() + " transactions. It's possible that the other " +
-              (txnids.size() - numAborted) +
+              txnIds.size() + " transactions. It's possible that the other " +
+              (txnIds.size() - numAborted) +
               " transactions have been aborted or committed, or the transaction ids are invalid.");
         }
 
-        for (Long txnId : txnids) {
-          if (transactionalListeners != null) {
+        if (transactionalListeners != null){
+          for (Long txnId : txnIds) {
             MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                    EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator);
+                    EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId,
+                nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY)), dbConn, sqlGenerator);
           }
         }
         LOG.debug("Going to commit");
@@ -862,6 +886,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
             + StringUtils.stringifyException(e));
       } finally {
+        closeStmt(stmt);
         closeDbConn(dbConn);
       }
     } catch (RetryException e) {
@@ -1065,7 +1090,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       Connection dbConn = null;
       Statement stmt = null;
       List<PreparedStatement> insertPreparedStmts = null;
-      ResultSet lockHandle = null;
       ResultSet commitIdRs = null, rs;
       try {
         lockInternal();
@@ -1097,11 +1121,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          * should not normally run concurrently (for same txn) but could due to bugs in the client
          * which could then corrupt internal transaction manager state.  Also competes with abortTxn().
          */
-        lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
-        if (lockHandle == null) {
+        TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+        if (txnRecord == null) {
           //if here, txn was not found (in expected state)
           TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
-          if(actualTxnStatus == TxnStatus.COMMITTED) {
+          if (actualTxnStatus == TxnStatus.COMMITTED) {
             if (rqst.isSetReplPolicy()) {
               // in case of replication, idempotent is taken care by getTargetTxnId
               LOG.warn("Invalid state COMMITTED for transactions started using replication replay task");
@@ -1114,8 +1138,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             return;
           }
           raiseTxnUnexpectedState(actualTxnStatus, txnid);
-          shouldNeverHappen(txnid);
-          //dbConn is rolled back in finally{}
         }
 
         String conflictSQLSuffix = null;
@@ -1152,7 +1174,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
            * but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct
            * even if it includes all of it's columns
            */
-          int numCompsWritten = stmt.executeUpdate(
+          stmt.executeUpdate(
             "insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
             " select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix);
           /**
@@ -1325,7 +1347,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (transactionalListeners != null) {
           MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
-                  EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator);
+                  EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator);
         }
 
         LOG.debug("Going to commit");
@@ -1343,8 +1365,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             closeStmt(pst);
           }
         }
-        close(commitIdRs);
-        close(lockHandle, stmt, dbConn);
+        close(commitIdRs, stmt, dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -1841,7 +1862,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (transactionalListeners != null) {
           MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
                   EventMessage.EventType.ALLOC_WRITE_ID,
-                  new AllocWriteIdEvent(txnToWriteIds, dbName, tblName, null),
+                  new AllocWriteIdEvent(txnToWriteIds, dbName, tblName),
                   dbConn, sqlGenerator);
         }
 
@@ -2305,22 +2326,27 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
    * and then executeUpdate().  One other alternative would be to actually update the row in TXNS but
    * to the same value as before thus forcing db to acquire write lock for duration of the transaction.
    *
-   * There is no real reason to return the ResultSet here other than to make sure the reference to it
-   * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock
-   * to be released.
+   * SELECT ... FOR UPDATE locks the row until the transaction commits or rolls back.
+   * Second connection using `SELECT ... FOR UPDATE` will suspend until the lock is released.
    * @param txnState the state this txn is expected to be in.  may be null
    * @return null if no row was found
    * @throws SQLException
    * @throws MetaException
    */
-  private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
-    String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
-    ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query));
-    if(rs.next()) {
-      return rs;
+  private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
+    String query = "select TXN_TYPE from TXNS where TXN_ID = " + txnId
+      + (txnState != null ? " and TXN_STATE = " + quoteChar(txnState) : "");
+    try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) {
+      return rs.next() ? new TxnRecord(rs.getInt(1)) : null;
+    }
+  }
+
+  private static final class TxnRecord {
+    private final TxnType type;
+
+    private TxnRecord(int txnType) {
+      this.type = TxnType.findByValue(txnType);
     }
-    close(rs);
-    return null;
   }
 
   /**
@@ -2340,7 +2366,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       PreparedStatement pStmt = null;
       List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
-      ResultSet lockHandle = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -2348,8 +2373,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         stmt = dbConn.createStatement();
         if (isValidTxn(txnid)) {
           //this also ensures that txn is still there in expected state
-          lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
-          if(lockHandle == null) {
+          TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+          if (txnRecord == null) {
             ensureValidTxn(dbConn, txnid, stmt);
             shouldNeverHappen(txnid);
           }
@@ -2552,7 +2577,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             closeStmt(pst);
           }
         }
-        close(lockHandle);
         closeStmt(pStmt);
         close(rs, stmt, null);
         if (!success) {
@@ -3227,15 +3251,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       throws NoSuchTxnException,  TxnAbortedException, MetaException {
     Connection dbConn = null;
     Statement stmt = null;
-    ResultSet lockHandle = null;
     List<PreparedStatement> insertPreparedStmts = null;
     try {
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
-        if(lockHandle == null) {
+        TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
+        if (txnRecord == null) {
           //ensures txn is still there and in expected state
           ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
           shouldNeverHappen(rqst.getTxnid());
@@ -3279,7 +3302,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             closeStmt(pst);
           }
         }
-        close(lockHandle, stmt, dbConn);
+        close(null, stmt, dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {