You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/12/13 13:26:35 UTC
[hive] branch master updated: HIVE-22327: Repl: Ignore read-only
transactions in notification log (Denys Kuzmenko reviewed by mahesh kumar
behera and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 25efd17 HIVE-22327: Repl: Ignore read-only transactions in notification log (Denys Kuzmenko reviewed by mahesh kumar behera and Peter Vary)
25efd17 is described below
commit 25efd174077ab9f0dce76f7abf16faec9fbf1428
Author: denys kuzmenko <dk...@cloudera.com>
AuthorDate: Fri Dec 13 14:25:35 2019 +0100
HIVE-22327: Repl: Ignore read-only transactions in notification log (Denys Kuzmenko reviewed by mahesh kumar behera and Peter Vary)
---
.../hcatalog/listener/DbNotificationListener.java | 10 ++
.../listener/TestDbNotificationListener.java | 85 +++++++++++++++-
.../hadoop/hive/metastore/txn/TestTxnHandler.java | 15 ++-
.../hive/metastore/events/AbortTxnEvent.java | 24 ++++-
.../hive/metastore/events/AllocWriteIdEvent.java | 4 +
.../hive/metastore/events/CommitTxnEvent.java | 22 ++++-
.../hadoop/hive/metastore/events/OpenTxnEvent.java | 25 ++++-
.../hadoop/hive/metastore/txn/TxnHandler.java | 109 +++++++++++++--------
8 files changed, 242 insertions(+), 52 deletions(-)
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 5f9d809..3a8cb39 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.metastore.api.SQLNotNullConstraint;
import org.apache.hadoop.hive.metastore.api.SQLPrimaryKey;
import org.apache.hadoop.hive.metastore.api.SQLUniqueConstraint;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
@@ -553,6 +554,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onOpenTxn(OpenTxnEvent openTxnEvent, Connection dbConn, SQLGenerator sqlGenerator) throws MetaException {
+ if (openTxnEvent.getTxnType() == TxnType.READ_ONLY) {
+ return;
+ }
int lastTxnIdx = openTxnEvent.getTxnIds().size() - 1;
OpenTxnMessage msg =
MessageBuilder.getInstance().buildOpenTxnMessage(openTxnEvent.getTxnIds().get(0),
@@ -571,6 +575,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onCommitTxn(CommitTxnEvent commitTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
throws MetaException {
+ if (commitTxnEvent.getTxnType() == TxnType.READ_ONLY) {
+ return;
+ }
CommitTxnMessage msg =
MessageBuilder.getInstance().buildCommitTxnMessage(commitTxnEvent.getTxnId());
@@ -588,6 +595,9 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
@Override
public void onAbortTxn(AbortTxnEvent abortTxnEvent, Connection dbConn, SQLGenerator sqlGenerator)
throws MetaException {
+ if (abortTxnEvent.getTxnType() == TxnType.READ_ONLY) {
+ return;
+ }
AbortTxnMessage msg =
MessageBuilder.getInstance().buildAbortTxnMessage(abortTxnEvent.getTxnId());
NotificationEvent event =
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index edf861f..66bdee1 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -25,10 +25,10 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
import java.util.concurrent.TimeUnit;
-import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -63,6 +63,7 @@ import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.events.AddPartitionEvent;
import org.apache.hadoop.hive.metastore.events.AlterPartitionEvent;
@@ -1058,6 +1059,88 @@ public class TestDbNotificationListener {
}
@Test
+ public void openTxn() throws Exception {
+ msClient.openTxn("me", TxnType.READ_ONLY);
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.openTxn("me", TxnType.DEFAULT);
+ rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 1, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.OPEN_TXN.toString(), event.getEventType());
+ }
+
+ @Test
+ public void abortTxn() throws Exception {
+ long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+ long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ msClient.abortTxns(Collections.singletonList(txnId1));
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.abortTxns(Collections.singletonList(txnId2));
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+ }
+
+ @Test
+ public void rollbackTxn() throws Exception {
+ long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+ long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ msClient.rollbackTxn(txnId1);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.rollbackTxn(txnId2);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.ABORT_TXN.toString(), event.getEventType());
+ }
+
+ @Test
+ public void commitTxn() throws Exception {
+ long txnId1 = msClient.openTxn("me", TxnType.READ_ONLY);
+ long txnId2 = msClient.openTxn("me", TxnType.DEFAULT);
+
+ NotificationEventResponse rsp = msClient.getNextNotification(firstEventId, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ msClient.commitTxn(txnId1);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(0, rsp.getEventsSize());
+
+ msClient.commitTxn(txnId2);
+ rsp = msClient.getNextNotification(firstEventId + 1, 0, null);
+ assertEquals(1, rsp.getEventsSize());
+
+ NotificationEvent event = rsp.getEvents().get(0);
+ assertEquals(firstEventId + 2, event.getEventId());
+ assertTrue(event.getEventTime() >= startTime);
+ assertEquals(EventType.COMMIT_TXN.toString(), event.getEventType());
+ }
+
+ @Test
public void insertTable() throws Exception {
String defaultDbName = "default";
String tblName = "inserttbl";
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
index be37b2a..01e5b82 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestTxnHandler.java
@@ -210,7 +210,7 @@ public class TestTxnHandler {
Assert.assertEquals("Transaction " + JavaUtils.txnIdToString(3) + " is already committed.", ex.getMessage());
}
Assert.assertTrue(gotException);
-
+
gotException = false;
try {
txnHandler.abortTxn(new AbortTxnRequest(4));
@@ -223,6 +223,19 @@ public class TestTxnHandler {
}
@Test
+ public void testAbortTxns() throws Exception {
+ OpenTxnsResponse openedTxns = txnHandler.openTxns(new OpenTxnRequest(3, "me", "localhost"));
+ List<Long> txnList = openedTxns.getTxn_ids();
+ txnHandler.abortTxns(new AbortTxnsRequest(txnList));
+
+ GetOpenTxnsInfoResponse txnsInfo = txnHandler.getOpenTxnsInfo();
+ assertEquals(3, txnsInfo.getOpen_txns().size());
+ txnsInfo.getOpen_txns().forEach(txn ->
+ assertEquals(TxnState.ABORTED, txn.getState())
+ );
+ }
+
+ @Test
public void testAbortInvalidTxn() throws Exception {
boolean caught = false;
try {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java
index fe4b974..5e00fcc 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AbortTxnEvent.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.events;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.api.TxnType;
/**
* AbortTxnEvent
@@ -31,15 +32,25 @@ import org.apache.hadoop.hive.metastore.IHMSHandler;
public class AbortTxnEvent extends ListenerEvent {
private final Long txnId;
+ private final TxnType txnType;
+
+ public AbortTxnEvent(Long transactionId, IHMSHandler handler) {
+ this(transactionId, null, handler);
+ }
+
+ public AbortTxnEvent(Long transactionId, TxnType txnType) {
+ this(transactionId, txnType, null);
+ }
/**
- *
* @param transactionId Unique identification for the transaction that got rolledback.
+ * @param txnType type of transaction
* @param handler handler that is firing the event
*/
- public AbortTxnEvent(Long transactionId, IHMSHandler handler) {
+ public AbortTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) {
super(true, handler);
- txnId = transactionId;
+ this.txnId = transactionId;
+ this.txnType = txnType;
}
/**
@@ -48,4 +59,11 @@ public class AbortTxnEvent extends ListenerEvent {
public Long getTxnId() {
return txnId;
}
+
+ /**
+ * @return txnType
+ */
+ public TxnType getTxnType() {
+ return txnType;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java
index 84a9a4e..2a719f2 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/AllocWriteIdEvent.java
@@ -36,6 +36,10 @@ public class AllocWriteIdEvent extends ListenerEvent {
private final String tableName;
private final String dbName;
+ public AllocWriteIdEvent(List<TxnToWriteId> txnToWriteIdList, String dbName, String tableName) {
+ this(txnToWriteIdList, dbName, tableName, null);
+ }
+
public AllocWriteIdEvent(List<TxnToWriteId> txnToWriteIdList, String dbName, String tableName, IHMSHandler handler) {
super(true, handler);
this.txnToWriteIdList = txnToWriteIdList;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java
index ba382cd..b357dbb 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/CommitTxnEvent.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.events;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.api.TxnType;
/**
* CommitTxnEvent
@@ -31,15 +32,25 @@ import org.apache.hadoop.hive.metastore.IHMSHandler;
public class CommitTxnEvent extends ListenerEvent {
private final Long txnId;
+ private final TxnType txnType;
+
+ public CommitTxnEvent(Long transactionId, IHMSHandler handler) {
+ this(transactionId, null, handler);
+ }
+
+ public CommitTxnEvent(Long transactionId, TxnType txnType) {
+ this(transactionId, txnType, null);
+ }
/**
- *
* @param transactionId Unique identification for the transaction just got committed.
+ * @param txnType type of transaction
* @param handler handler that is firing the event
*/
- public CommitTxnEvent(Long transactionId, IHMSHandler handler) {
+ public CommitTxnEvent(Long transactionId, TxnType txnType, IHMSHandler handler) {
super(true, handler);
this.txnId = transactionId;
+ this.txnType = txnType;
}
/**
@@ -48,4 +59,11 @@ public class CommitTxnEvent extends ListenerEvent {
public Long getTxnId() {
return txnId;
}
+
+ /**
+ * @return txnType
+ */
+ public TxnType getTxnType() {
+ return txnType;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java
index d935ed1..289dfd0 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/events/OpenTxnEvent.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hive.metastore.IHMSHandler;
import com.google.common.collect.Lists;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+
import java.util.List;
/**
@@ -31,15 +33,27 @@ import java.util.List;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class OpenTxnEvent extends ListenerEvent {
- private List<Long> txnIds;
+
+ private final List<Long> txnIds;
+ private final TxnType txnType;
+
+ public OpenTxnEvent(List<Long> txnIds, IHMSHandler handler) {
+ this(txnIds, null, handler);
+ }
+
+ public OpenTxnEvent(List<Long> txnIds, TxnType txnType) {
+ this(txnIds, txnType, null);
+ }
/**
* @param txnIds List of unique identification for the transaction just opened.
+ * @param txnType type of transaction
* @param handler handler that is firing the event
*/
- public OpenTxnEvent(List<Long> txnIds, IHMSHandler handler) {
+ public OpenTxnEvent(List<Long> txnIds, TxnType txnType, IHMSHandler handler) {
super(true, handler);
this.txnIds = Lists.newArrayList(txnIds);
+ this.txnType = txnType;
}
/**
@@ -48,4 +62,11 @@ public class OpenTxnEvent extends ListenerEvent {
public List<Long> getTxnIds() {
return txnIds;
}
+
+ /**
+ * @return txnType
+ */
+ public TxnType getTxnType() {
+ return txnType;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 9733256..64a5429 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -677,7 +677,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (transactionalListeners != null) {
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
- EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator);
+ EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, txnType), dbConn, sqlGenerator);
}
return txnIds;
} finally {
@@ -790,9 +790,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
txnid = targetTxnIds.get(0);
}
- if (abortTxns(dbConn, Collections.singletonList(txnid), true) != 1) {
- TxnStatus status = findTxnState(txnid,stmt);
- if(status == TxnStatus.ABORTED) {
+ TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ if (txnRecord == null) {
+ TxnStatus status = findTxnState(txnid, stmt);
+ if (status == TxnStatus.ABORTED) {
if (rqst.isSetReplPolicy()) {
// in case of replication, idempotent is taken care by getTargetTxnId
LOG.warn("Invalid state ABORTED for transactions started using replication replay task");
@@ -804,6 +805,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
raiseTxnUnexpectedState(status, txnid);
}
+ abortTxns(dbConn, Collections.singletonList(txnid), true);
if (rqst.isSetReplPolicy()) {
deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
@@ -811,7 +813,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (transactionalListeners != null) {
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
- EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, null), dbConn, sqlGenerator);
+ EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator);
}
LOG.debug("Going to commit");
@@ -833,24 +835,46 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
@Override
@RetrySemantics.Idempotent
- public void abortTxns(AbortTxnsRequest rqst) throws NoSuchTxnException, MetaException {
- List<Long> txnids = rqst.getTxn_ids();
+ public void abortTxns(AbortTxnsRequest rqst) throws MetaException {
+ List<Long> txnIds = rqst.getTxn_ids();
try {
Connection dbConn = null;
+ Statement stmt = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- int numAborted = abortTxns(dbConn, txnids, false);
- if (numAborted != txnids.size()) {
+ stmt = dbConn.createStatement();
+
+ List<String> queries = new ArrayList<>();
+ StringBuilder prefix = new StringBuilder("select TXN_ID, TXN_TYPE from TXNS where TXN_STATE = ")
+ .append(quoteChar(TXN_OPEN)).append(" and TXN_TYPE != ").append(TxnType.READ_ONLY.getValue())
+ .append(" and ");
+
+ TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
+ txnIds, "TXN_ID", false, false);
+
+ Map<Long, TxnType> nonReadOnlyTxns = new HashMap<>();
+ for (String query : queries) {
+ LOG.debug("Going to execute query<" + query + ">");
+ try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) {
+ while (rs.next()) {
+ TxnType txnType = TxnType.findByValue(rs.getInt(2));
+ nonReadOnlyTxns.put(rs.getLong(1), txnType);
+ }
+ }
+ }
+ int numAborted = abortTxns(dbConn, txnIds, false);
+ if (numAborted != txnIds.size()) {
LOG.warn("Abort Transactions command only aborted " + numAborted + " out of " +
- txnids.size() + " transactions. It's possible that the other " +
- (txnids.size() - numAborted) +
+ txnIds.size() + " transactions. It's possible that the other " +
+ (txnIds.size() - numAborted) +
" transactions have been aborted or committed, or the transaction ids are invalid.");
}
- for (Long txnId : txnids) {
- if (transactionalListeners != null) {
+ if (transactionalListeners != null){
+ for (Long txnId : txnIds) {
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
- EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId, null), dbConn, sqlGenerator);
+ EventMessage.EventType.ABORT_TXN, new AbortTxnEvent(txnId,
+ nonReadOnlyTxns.getOrDefault(txnId, TxnType.READ_ONLY)), dbConn, sqlGenerator);
}
}
LOG.debug("Going to commit");
@@ -862,6 +886,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
+ closeStmt(stmt);
closeDbConn(dbConn);
}
} catch (RetryException e) {
@@ -1065,7 +1090,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Connection dbConn = null;
Statement stmt = null;
List<PreparedStatement> insertPreparedStmts = null;
- ResultSet lockHandle = null;
ResultSet commitIdRs = null, rs;
try {
lockInternal();
@@ -1097,11 +1121,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* should not normally run concurrently (for same txn) but could due to bugs in the client
* which could then corrupt internal transaction manager state. Also competes with abortTxn().
*/
- lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
- if (lockHandle == null) {
+ TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ if (txnRecord == null) {
//if here, txn was not found (in expected state)
TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
- if(actualTxnStatus == TxnStatus.COMMITTED) {
+ if (actualTxnStatus == TxnStatus.COMMITTED) {
if (rqst.isSetReplPolicy()) {
// in case of replication, idempotent is taken care by getTargetTxnId
LOG.warn("Invalid state COMMITTED for transactions started using replication replay task");
@@ -1114,8 +1138,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return;
}
raiseTxnUnexpectedState(actualTxnStatus, txnid);
- shouldNeverHappen(txnid);
- //dbConn is rolled back in finally{}
}
String conflictSQLSuffix = null;
@@ -1152,7 +1174,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* but we want to add a PK on WRITE_SET which won't have unique rows w/o this distinct
* even if it includes all of it's columns
*/
- int numCompsWritten = stmt.executeUpdate(
+ stmt.executeUpdate(
"insert into WRITE_SET (ws_database, ws_table, ws_partition, ws_txnid, ws_commit_id, ws_operation_type)" +
" select distinct tc_database, tc_table, tc_partition, tc_txnid, " + commitId + ", tc_operation_type " + conflictSQLSuffix);
/**
@@ -1325,7 +1347,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (transactionalListeners != null) {
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
- EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, null), dbConn, sqlGenerator);
+ EventMessage.EventType.COMMIT_TXN, new CommitTxnEvent(txnid, txnRecord.type), dbConn, sqlGenerator);
}
LOG.debug("Going to commit");
@@ -1343,8 +1365,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
closeStmt(pst);
}
}
- close(commitIdRs);
- close(lockHandle, stmt, dbConn);
+ close(commitIdRs, stmt, dbConn);
unlockInternal();
}
} catch (RetryException e) {
@@ -1841,7 +1862,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (transactionalListeners != null) {
MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners,
EventMessage.EventType.ALLOC_WRITE_ID,
- new AllocWriteIdEvent(txnToWriteIds, dbName, tblName, null),
+ new AllocWriteIdEvent(txnToWriteIds, dbName, tblName),
dbConn, sqlGenerator);
}
@@ -2305,22 +2326,27 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* and then executeUpdate(). One other alternative would be to actually update the row in TXNS but
* to the same value as before thus forcing db to acquire write lock for duration of the transaction.
*
- * There is no real reason to return the ResultSet here other than to make sure the reference to it
- * is retained for duration of intended lock scope and is not GC'd thus (unlikely) causing lock
- * to be released.
+ * SELECT ... FOR UPDATE locks the row until the transaction commits or rolls back.
+ * Second connection using `SELECT ... FOR UPDATE` will suspend until the lock is released.
* @param txnState the state this txn is expected to be in. may be null
* @return null if no row was found
* @throws SQLException
* @throws MetaException
*/
- private ResultSet lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
- String query = "select TXN_STATE from TXNS where TXN_ID = " + txnId + (txnState != null ? " AND TXN_STATE=" + quoteChar(txnState) : "");
- ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query));
- if(rs.next()) {
- return rs;
+ private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
+ String query = "select TXN_TYPE from TXNS where TXN_ID = " + txnId
+ + (txnState != null ? " and TXN_STATE = " + quoteChar(txnState) : "");
+ try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) {
+ return rs.next() ? new TxnRecord(rs.getInt(1)) : null;
+ }
+ }
+
+ private static final class TxnRecord {
+ private final TxnType type;
+
+ private TxnRecord(int txnType) {
+ this.type = TxnType.findByValue(txnType);
}
- close(rs);
- return null;
}
/**
@@ -2340,7 +2366,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
PreparedStatement pStmt = null;
List<PreparedStatement> insertPreparedStmts = null;
ResultSet rs = null;
- ResultSet lockHandle = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -2348,8 +2373,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
stmt = dbConn.createStatement();
if (isValidTxn(txnid)) {
//this also ensures that txn is still there in expected state
- lockHandle = lockTransactionRecord(stmt, txnid, TXN_OPEN);
- if(lockHandle == null) {
+ TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ if (txnRecord == null) {
ensureValidTxn(dbConn, txnid, stmt);
shouldNeverHappen(txnid);
}
@@ -2552,7 +2577,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
closeStmt(pst);
}
}
- close(lockHandle);
closeStmt(pStmt);
close(rs, stmt, null);
if (!success) {
@@ -3227,15 +3251,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throws NoSuchTxnException, TxnAbortedException, MetaException {
Connection dbConn = null;
Statement stmt = null;
- ResultSet lockHandle = null;
List<PreparedStatement> insertPreparedStmts = null;
try {
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- lockHandle = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
- if(lockHandle == null) {
+ TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
+ if (txnRecord == null) {
//ensures txn is still there and in expected state
ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
shouldNeverHappen(rqst.getTxnid());
@@ -3279,7 +3302,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
closeStmt(pst);
}
}
- close(lockHandle, stmt, dbConn);
+ close(null, stmt, dbConn);
unlockInternal();
}
} catch (RetryException e) {