You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/06/12 12:26:29 UTC
[hive] branch master updated: HIVE-23340: TxnHandler cleanup (Peter
Varga, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 56e59ed HIVE-23340: TxnHandler cleanup (Peter Varga, reviewed by Denys Kuzmenko)
56e59ed is described below
commit 56e59ed6a7c6f430b8ce0f8ac85bd936eabcbda4
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Fri Jun 12 14:22:36 2020 +0200
HIVE-23340: TxnHandler cleanup (Peter Varga, reviewed by Denys Kuzmenko)
---
.../hive/metastore/txn/CompactionTxnHandler.java | 17 +-
.../apache/hadoop/hive/metastore/txn/OpenTxn.java | 110 ++++++++
.../hadoop/hive/metastore/txn/OpenTxnList.java | 78 ++++++
.../hadoop/hive/metastore/txn/OperationType.java | 65 +++++
.../hadoop/hive/metastore/txn/TxnHandler.java | 305 ++++++---------------
.../hadoop/hive/metastore/txn/TxnStatus.java | 65 +++++
6 files changed, 410 insertions(+), 230 deletions(-)
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index d2efc59..b1bf10a 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+
/**
* Extends the transaction handler with methods needed only by the compactor threads. These
* methods are not available through the thrift interface.
@@ -109,7 +110,7 @@ class CompactionTxnHandler extends TxnHandler {
final String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\","
+ "MIN(\"TXN_STARTED\"), COUNT(*)"
+ "FROM \"TXNS\", \"TXN_COMPONENTS\" "
- + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = '" + TXN_ABORTED + "' "
+ + "WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " "
+ "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\""
+ (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold);
@@ -400,7 +401,7 @@ class CompactionTxnHandler extends TxnHandler {
* See {@link ql.txn.compactor.Cleaner.removeFiles()}
*/
s = "SELECT DISTINCT \"TXN_ID\" FROM \"TXNS\", \"TXN_COMPONENTS\" WHERE \"TXN_ID\" = \"TC_TXNID\" "
- + "AND \"TXN_STATE\" = '" + TXN_ABORTED + "' AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?";
+ + "AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ?";
if (info.highestWriteId != 0) s += " AND \"TC_WRITEID\" <= ?";
if (info.partName != null) s += " AND \"TC_PARTITION\" = ?";
@@ -513,8 +514,8 @@ class CompactionTxnHandler extends TxnHandler {
"UNION " +
"SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " +
"UNION " +
- "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) +
- " OR \"TXN_STATE\" = " + quoteChar(TXN_OPEN) +
+ "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED +
+ " OR \"TXN_STATE\" = " + TxnStatus.OPEN +
") \"RES\"";
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
@@ -576,7 +577,7 @@ class CompactionTxnHandler extends TxnHandler {
String s = "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE " +
"\"TXN_ID\" NOT IN (SELECT \"TC_TXNID\" FROM \"TXN_COMPONENTS\") AND " +
- " (\"TXN_STATE\" = '" + TXN_ABORTED + "' OR \"TXN_STATE\" = '" + TXN_COMMITTED + "') AND "
+ " (\"TXN_STATE\" = " + TxnStatus.ABORTED + " OR \"TXN_STATE\" = " + TxnStatus.COMMITTED + ") AND "
+ " \"TXN_ID\" < " + lowWaterMark;
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
@@ -811,7 +812,7 @@ class CompactionTxnHandler extends TxnHandler {
quoteString(ci.tableName) + "," +
(ci.partName == null ? "" : quoteString(ci.partName) + ",") +
ci.highestWriteId + ", " +
- quoteChar(OperationType.COMPACT.getSqlConst()) + ")";
+ OperationType.COMPACT + ")";
if(LOG.isDebugEnabled()) {
LOG.debug("About to execute: " + sqlText);
}
@@ -1147,7 +1148,7 @@ class CompactionTxnHandler extends TxnHandler {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN);
+ String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN;
LOG.debug("Going to execute query <" + query + ">");
rs = stmt.executeQuery(query);
if (!rs.next()) {
@@ -1156,7 +1157,7 @@ class CompactionTxnHandler extends TxnHandler {
long numOpenTxns = rs.getLong(1);
if (numOpenTxns > 0) {
query = "SELECT MIN(\"RES\".\"ID\") FROM (" +
- "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) +
+ "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN +
" UNION " +
"SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = "
+ quoteChar(READY_FOR_CLEANING) +
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxn.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxn.java
new file mode 100644
index 0000000..8ef5fa1
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxn.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+
+/**
+ * Class to represent one row in the TXNS table.
+ */
+public class OpenTxn {
+
+ public static final String OPEN_TXNS_QUERY = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", "
+ + "(%s - \"TXN_STARTED\") FROM \"TXNS\" ORDER BY \"TXN_ID\"";
+ public static final String OPEN_TXNS_INFO_QUERY = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", "
+ + "(%s - \"TXN_STARTED\"), \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\" "
+ + "FROM \"TXNS\" ORDER BY \"TXN_ID\"";
+
+ private long txnId;
+ private TxnStatus status;
+ private TxnType type;
+ private long startedTime;
+ private long lastHeartBeatTime;
+ private String user;
+ private String host;
+
+ public OpenTxn(long txnId, TxnStatus status, TxnType type) {
+ this.txnId = txnId;
+ this.status = status;
+ this.type = type;
+ }
+
+ public TxnInfo toTxnInfo() {
+ TxnInfo info = new TxnInfo(getTxnId(), getStatus().toTxnState(), getUser(), getHost());
+ info.setStartedTime(getStartedTime());
+ info.setLastHeartbeatTime(getLastHeartBeatTime());
+ return info;
+ }
+
+ public long getTxnId() {
+ return txnId;
+ }
+
+ public void setTxnId(long txnId) {
+ this.txnId = txnId;
+ }
+
+ public TxnStatus getStatus() {
+ return status;
+ }
+
+ public void setStatus(TxnStatus status) {
+ this.status = status;
+ }
+
+ public TxnType getType() {
+ return type;
+ }
+
+ public void setType(TxnType type) {
+ this.type = type;
+ }
+
+ public long getStartedTime() {
+ return startedTime;
+ }
+
+ public void setStartedTime(long startedTime) {
+ this.startedTime = startedTime;
+ }
+
+ public long getLastHeartBeatTime() {
+ return lastHeartBeatTime;
+ }
+
+ public void setLastHeartBeatTime(long lastHeartBeatTime) {
+ this.lastHeartBeatTime = lastHeartBeatTime;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public void setUser(String user) {
+ this.user = user;
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public void setHost(String host) {
+ this.host = host;
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxnList.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxnList.java
new file mode 100644
index 0000000..1ed5759
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OpenTxnList.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.TxnType;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hive.metastore.txn.TxnStatus.ABORTED;
+import static org.apache.hadoop.hive.metastore.txn.TxnStatus.OPEN;
+
+/**
+ * Class for the getOpenTxnList calculation.
+ */
+public class OpenTxnList {
+ private long hwm;
+ private List<OpenTxn> openTxnList;
+
+ public OpenTxnList(long hwm, List<OpenTxn> openTxnList) {
+ this.hwm = hwm;
+ this.openTxnList = openTxnList;
+ }
+
+ public GetOpenTxnsInfoResponse toOpenTxnsInfoResponse() {
+ return new GetOpenTxnsInfoResponse(getHwm(), openTxnList.stream().map(OpenTxn::toTxnInfo).collect(toList()));
+ }
+ public GetOpenTxnsResponse toOpenTxnsResponse() {
+ List<Long> openList = new ArrayList<>();
+ long minOpenTxn = Long.MAX_VALUE;
+ BitSet abortedBits = new BitSet();
+ for (OpenTxn openTxn : getOpenTxnList()) {
+ if (openTxn.getStatus() == OPEN) {
+ minOpenTxn = Math.min(minOpenTxn, openTxn.getTxnId());
+ }
+ if (openTxn.getType() != TxnType.READ_ONLY) {
+ openList.add(openTxn.getTxnId());
+ if (openTxn.getStatus() == ABORTED) {
+ abortedBits.set(openList.size() - 1);
+ }
+ }
+ }
+ ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
+ GetOpenTxnsResponse otr = new GetOpenTxnsResponse(getHwm(), openList, byteBuffer);
+ if (minOpenTxn < Long.MAX_VALUE) {
+ otr.setMin_open_txn(minOpenTxn);
+ }
+ return otr;
+ }
+
+ public long getHwm() {
+ return hwm;
+ }
+
+ public List<OpenTxn> getOpenTxnList() {
+ return openTxnList;
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OperationType.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OperationType.java
new file mode 100644
index 0000000..39cacd2
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/OperationType.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * These are the valid values for TXN_COMPONENTS.TC_OPERATION_TYPE.
+ */
+public enum OperationType {
+ SELECT('s', DataOperationType.SELECT),
+ INSERT('i', DataOperationType.INSERT),
+ UPDATE('u', DataOperationType.UPDATE),
+ DELETE('d', DataOperationType.DELETE),
+ COMPACT('c', null);
+
+ private final char sqlConst;
+ private final DataOperationType dataOperationType;
+
+ private static final Map<DataOperationType, OperationType> DOT_LOOKUP =
+ Arrays.stream(OperationType.values()).collect(toMap(OperationType::getDataOperationType, identity()));
+
+ OperationType(char sqlConst, DataOperationType dataOperationType) {
+ this.sqlConst = sqlConst;
+ this.dataOperationType = dataOperationType;
+ }
+
+ public String toString() {
+ return "'" + getSqlConst() + "'";
+ }
+
+ public static OperationType fromDataOperationType(DataOperationType dop) {
+ return Optional.of(DOT_LOOKUP.get(dop)).orElseThrow(IllegalArgumentException::new);
+ }
+
+ public String getSqlConst() {
+ return Character.toString(sqlConst);
+ }
+
+ public DataOperationType getDataOperationType() {
+ return dataOperationType;
+ }
+}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 89ddccb..5c1ec5b 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -120,9 +120,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TableValidWriteIds;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
-import org.apache.hadoop.hive.metastore.api.TxnInfo;
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
-import org.apache.hadoop.hive.metastore.api.TxnState;
import org.apache.hadoop.hive.metastore.api.TxnToWriteId;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
@@ -226,15 +224,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
static final protected char MAJOR_TYPE = 'a';
static final protected char MINOR_TYPE = 'i';
- // Transaction states
- protected static final char TXN_ABORTED = 'a';
- protected static final char TXN_OPEN = 'o';
- protected static final char TXN_COMMITTED = 'c';
- private static final char TXN_TMP = '_';
-
- //todo: make these like OperationType and remove above char constants
- enum TxnStatus {OPEN, ABORTED, COMMITTED, UNKNOWN}
+ private static final String TXN_TMP_STATE = "_";
// Lock states
static final protected char LOCK_ACQUIRED = 'a';
@@ -277,53 +268,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
private List<TransactionalMetaStoreEventListener> transactionalListeners;
- /**
- * These are the valid values for TXN_COMPONENTS.TC_OPERATION_TYPE
- */
- enum OperationType {
- SELECT('s'), INSERT('i'), UPDATE('u'), DELETE('d'), COMPACT('c');
- private final char sqlConst;
- OperationType(char sqlConst) {
- this.sqlConst = sqlConst;
- }
- public String toString() {
- return Character.toString(sqlConst);
- }
- public static OperationType fromString(char sqlConst) {
- switch (sqlConst) {
- case 's':
- return SELECT;
- case 'i':
- return INSERT;
- case 'u':
- return UPDATE;
- case 'd':
- return DELETE;
- case 'c':
- return COMPACT;
- default:
- throw new IllegalArgumentException(quoteChar(sqlConst));
- }
- }
- public static OperationType fromDataOperationType(DataOperationType dop) {
- switch (dop) {
- case SELECT:
- return OperationType.SELECT;
- case INSERT:
- return OperationType.INSERT;
- case UPDATE:
- return OperationType.UPDATE;
- case DELETE:
- return OperationType.DELETE;
- default:
- throw new IllegalArgumentException("Unexpected value: " + dop);
- }
- }
- char getSqlConst() {
- return sqlConst;
- }
- }
-
// Maximum number of open transactions that's allowed
private static volatile int maxOpenTxns = 0;
// Whether number of open transactions reaches the threshold
@@ -444,6 +388,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
@Override
@RetrySemantics.ReadOnly
public GetOpenTxnsInfoResponse getOpenTxnsInfo() throws MetaException {
+ return getOpenTxnsList(true).toOpenTxnsInfoResponse();
+ }
+
+ @Override
+ @RetrySemantics.ReadOnly
+ public GetOpenTxnsResponse getOpenTxns() throws MetaException {
+ return getOpenTxnsList(false).toOpenTxnsResponse();
+ }
+
+ private OpenTxnList getOpenTxnsList(boolean infoFields) throws MetaException {
try {
// We need to figure out the HighWaterMark and the list of open transactions.
Connection dbConn = null;
@@ -461,14 +415,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
*/
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- List<TxnInfo> txnInfos = new ArrayList<>();
-
- String s =
- "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_USER\", \"TXN_HOST\", \"TXN_STARTED\", \"TXN_LAST_HEARTBEAT\", "
- + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")"
- + "FROM \"TXNS\" ORDER BY \"TXN_ID\"";
- LOG.debug("Going to execute query<" + s + ">");
- rs = stmt.executeQuery(s);
+ List<OpenTxn> txnInfos = new ArrayList<>();
+ String txnsQuery = String.format(infoFields ? OpenTxn.OPEN_TXNS_INFO_QUERY : OpenTxn.OPEN_TXNS_QUERY,
+ TxnDbUtil.getEpochFn(dbProduct));
+ LOG.debug("Going to execute query<" + txnsQuery + ">");
+ rs = stmt.executeQuery(txnsQuery);
/*
* We can use the maximum txn_id from the TXNS table as high water mark, since the commitTxn and the Initiator
* guarantees, that the transaction with the highest txn_id will never be removed from the TXNS table.
@@ -481,134 +432,48 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
while (rs.next()) {
long txnId = rs.getLong(1);
- long age = rs.getLong(7);
+ long age = rs.getLong(4);
hwm = txnId;
if (age < getOpenTxnTimeOutMillis()) {
// We will consider every gap as an open transaction from the previous txnId
openTxnLowBoundary++;
while (txnId > openTxnLowBoundary) {
// Add an empty open transaction for every missing value
- txnInfos.add(new TxnInfo(openTxnLowBoundary, TxnState.OPEN, null, null));
+ txnInfos.add(new OpenTxn(openTxnLowBoundary, TxnStatus.OPEN, TxnType.DEFAULT));
+ LOG.debug("Open transaction added for missing value in TXNS {}",
+ JavaUtils.txnIdToString(openTxnLowBoundary));
openTxnLowBoundary++;
}
} else {
openTxnLowBoundary = txnId;
}
- char c = rs.getString(2).charAt(0);
- TxnState state;
- switch (c) {
- case TXN_COMMITTED:
+ TxnStatus state = TxnStatus.fromString(rs.getString(2));
+ if (state == TxnStatus.COMMITTED) {
// This is only here, to avoid adding this txnId as possible gap
continue;
-
- case TXN_ABORTED:
- state = TxnState.ABORTED;
- break;
-
- case TXN_OPEN:
- state = TxnState.OPEN;
- break;
-
- default:
- throw new MetaException("Unexpected transaction state " + c + " found in txns table");
}
- TxnInfo txnInfo = new TxnInfo(txnId, state, rs.getString(3), rs.getString(4));
- txnInfo.setStartedTime(rs.getLong(5));
- txnInfo.setLastHeartbeatTime(rs.getLong(6));
+ OpenTxn txnInfo = new OpenTxn(txnId, state, TxnType.findByValue(rs.getInt(3)));
+ if (infoFields) {
+ txnInfo.setUser(rs.getString(5));
+ txnInfo.setHost(rs.getString(6));
+ txnInfo.setStartedTime(rs.getLong(7));
+ txnInfo.setLastHeartBeatTime(rs.getLong(8));
+ }
txnInfos.add(txnInfo);
}
- LOG.debug("Going to rollback");
dbConn.rollback();
- return new GetOpenTxnsInfoResponse(hwm, txnInfos);
+ LOG.debug("Got OpenTxnList with hwm: {} and openTxnList size {}.", hwm, txnInfos.size());
+ return new OpenTxnList(hwm, txnInfos);
} catch (SQLException e) {
- LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "getOpenTxnsInfo");
+ checkRetryable(dbConn, e, "getOpenTxnsList");
throw new MetaException(
"Unable to select from transaction database: " + getMessage(e) + StringUtils.stringifyException(e));
} finally {
close(rs, stmt, dbConn);
}
} catch (RetryException e) {
- return getOpenTxnsInfo();
- }
- }
-
- @Override
- @RetrySemantics.ReadOnly
- public GetOpenTxnsResponse getOpenTxns() throws MetaException {
- try {
- // We need to figure out the current transaction number and the list of open transactions.
- Connection dbConn = null;
- Statement stmt = null;
- ResultSet rs = null;
- try {
- /*
- * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
- */
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
-
- List<Long> openList = new ArrayList<>();
- String s = "SELECT \"TXN_ID\", \"TXN_STATE\", \"TXN_TYPE\", "
- + "(" + TxnDbUtil.getEpochFn(dbProduct) + " - \"TXN_STARTED\")"
- + " FROM \"TXNS\" ORDER BY \"TXN_ID\"";
- LOG.debug("Going to execute query<" + s + ">");
- rs = stmt.executeQuery(s);
- long hwm = 0;
- long openTxnLowBoundary = 0;
- long minOpenTxn = Long.MAX_VALUE;
- BitSet abortedBits = new BitSet();
- while (rs.next()) {
- long txnId = rs.getLong(1);
- long age = rs.getLong(4);
- hwm = txnId;
- if (age < getOpenTxnTimeOutMillis()) {
- // We will consider every gap as an open transaction from the previous txnId
- openTxnLowBoundary++;
- while (txnId > openTxnLowBoundary) {
- // Add an empty open transaction for every missing value
- openList.add(openTxnLowBoundary);
- minOpenTxn = Math.min(minOpenTxn, openTxnLowBoundary);
- openTxnLowBoundary++;
- }
- } else {
- openTxnLowBoundary = txnId;
- }
- char txnState = rs.getString(2).charAt(0);
- if (txnState == TXN_COMMITTED) {
- continue;
- }
- if (txnState == TXN_OPEN) {
- minOpenTxn = Math.min(minOpenTxn, txnId);
- }
- TxnType txnType = TxnType.findByValue(rs.getInt(3));
- if (txnType != TxnType.READ_ONLY) {
- openList.add(txnId);
- if (txnState == TXN_ABORTED) {
- abortedBits.set(openList.size() - 1);
- }
- }
- }
- LOG.debug("Going to rollback");
- dbConn.rollback();
- ByteBuffer byteBuffer = ByteBuffer.wrap(abortedBits.toByteArray());
- GetOpenTxnsResponse otr = new GetOpenTxnsResponse(hwm, openList, byteBuffer);
- if (minOpenTxn < Long.MAX_VALUE) {
- otr.setMin_open_txn(minOpenTxn);
- }
- return otr;
- } catch (SQLException e) {
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(dbConn, e, "getOpenTxns");
- throw new MetaException("Unable to select from transaction database, "
- + StringUtils.stringifyException(e));
- } finally {
- close(rs, stmt, dbConn);
- }
- } catch (RetryException e) {
- return getOpenTxns();
+ return getOpenTxnsList(infoFields);
}
}
@@ -761,7 +626,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
TxnDbUtil.getEpochFn(dbProduct));
LOG.debug("Going to execute insert <" + insertQuery + ">");
try (PreparedStatement ps = dbConn.prepareStatement(insertQuery, new String[] {"TXN_ID"})) {
- String state = genKeySupport ? Character.toString(TXN_OPEN) : Character.toString(TXN_TMP);
+ String state = genKeySupport ? TxnStatus.OPEN.getSqlConst() : TXN_TMP_STATE;
if (numTxns == 1) {
ps.setString(1, state);
ps.setString(2, rqst.getUser());
@@ -838,7 +703,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
} else {
try (PreparedStatement pstmt =
dbConn.prepareStatement("SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = ?")) {
- pstmt.setString(1, Character.toString(TXN_TMP));
+ pstmt.setString(1, TXN_TMP_STATE);
try (ResultSet rs = pstmt.executeQuery()) {
while (rs.next()) {
txnIds.add(rs.getLong(1));
@@ -847,8 +712,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
try (PreparedStatement pstmt = dbConn
.prepareStatement("UPDATE \"TXNS\" SET \"TXN_STATE\" = ? WHERE \"TXN_STATE\" = ?")) {
- pstmt.setString(1, Character.toString(TXN_OPEN));
- pstmt.setString(2, Character.toString(TXN_TMP));
+ pstmt.setString(1, TxnStatus.OPEN.getSqlConst());
+ pstmt.setString(2, TXN_TMP_STATE);
pstmt.executeUpdate();
}
}
@@ -1036,7 +901,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
txnid = targetTxnIds.get(0);
}
- TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN);
if (txnRecord == null) {
TxnStatus status = findTxnState(txnid, stmt);
if (status == TxnStatus.ABORTED) {
@@ -1091,9 +956,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
stmt = dbConn.createStatement();
List<String> queries = new ArrayList<>();
- StringBuilder prefix = new StringBuilder("SELECT \"TXN_ID\", \"TXN_TYPE\" from \"TXNS\" where \"TXN_STATE\" = ")
- .append(quoteChar(TXN_OPEN)).append(" and \"TXN_TYPE\" != ").append(TxnType.READ_ONLY.getValue())
- .append(" and ");
+ StringBuilder prefix =
+ new StringBuilder("SELECT \"TXN_ID\", \"TXN_TYPE\" from \"TXNS\" where \"TXN_STATE\" = ")
+ .append(TxnStatus.OPEN)
+ .append(" and \"TXN_TYPE\" != ").append(TxnType.READ_ONLY.getValue()).append(" and ");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
txnIds, "\"TXN_ID\"", false, false);
@@ -1366,7 +1232,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* should not normally run concurrently (for same txn) but could due to bugs in the client
* which could then corrupt internal transaction manager state. Also competes with abortTxn().
*/
- TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN);
if (txnRecord == null) {
//if here, txn was not found (in expected state)
TxnStatus actualTxnStatus = findTxnState(txnid, stmt);
@@ -1386,7 +1252,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" + txnid + " AND \"TC_OPERATION_TYPE\" IN(" +
- quoteChar(OperationType.UPDATE.sqlConst) + "," + quoteChar(OperationType.DELETE.sqlConst) + ")";
+ OperationType.UPDATE + "," + OperationType.DELETE + ")";
long tempCommitId = generateTemporaryId();
if (txnRecord.type != TxnType.READ_ONLY
@@ -1549,10 +1415,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
//The same happens when Hive splits U=I+D early so it looks like 2 branches of a
//multi-insert stmt (an Insert and a Delete branch). It also 'feels'
// un-serializable to allow concurrent deletes
- " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) +
- ", " + quoteChar(OperationType.DELETE.sqlConst) +
- ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + quoteChar(OperationType.UPDATE.sqlConst) + ", "
- + quoteChar(OperationType.DELETE.sqlConst) + "))");
+ " and (\"COMMITTED\".\"WS_OPERATION_TYPE\" IN(" + OperationType.UPDATE +
+ ", " + OperationType.DELETE +
+ ") AND \"CUR\".\"WS_OPERATION_TYPE\" IN(" + OperationType.UPDATE+ ", "
+ + OperationType.DELETE + "))");
LOG.debug("Going to execute query: <" + writeConflictQuery + ">");
return stmt.executeQuery(writeConflictQuery);
}
@@ -1566,7 +1432,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
"' FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid +
//we only track compactor activity in TXN_COMPONENTS to handle the case where the
//compactor txn aborts - so don't bother copying it to COMPLETED_TXN_COMPONENTS
- " AND \"TC_OPERATION_TYPE\" <> " + quoteChar(OperationType.COMPACT.sqlConst);
+ " AND \"TC_OPERATION_TYPE\" <> " + OperationType.COMPACT;
LOG.debug("Going to execute insert <" + s + ">");
if ((stmt.executeUpdate(s)) < 1) {
@@ -1591,7 +1457,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
queryBatch.add("DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid);
// DO NOT remove the transaction from the TXN table, the cleaner will remove it when appropriate
- queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + quoteChar(TXN_COMMITTED) + " WHERE \"TXN_ID\" = " + txnid);
+ queryBatch.add("UPDATE \"TXNS\" SET \"TXN_STATE\" = " + TxnStatus.COMMITTED + " WHERE \"TXN_ID\" = " + txnid);
if (txnType == TxnType.MATER_VIEW_REBUILD) {
queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid);
}
@@ -2235,7 +2101,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
stmt = dbConn.createStatement();
long minOpenTxn;
- rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + quoteChar(TXN_OPEN));
+ rs = stmt.executeQuery("SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\"=" + TxnStatus.OPEN);
if (!rs.next()) {
throw new IllegalStateException("Scalar query returned no rows?!?!!");
}
@@ -2373,7 +2239,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throws MetaException {
if (LOG.isDebugEnabled()) {
- LOG.debug("Acquiring lock for materialization rebuild with txnId={} for {}", txnId, Warehouse.getQualifiedName(dbName,tableName));
+ LOG.debug("Acquiring lock for materialization rebuild with {} for {}",
+ JavaUtils.txnIdToString(txnId), TableName.getDbTable(dbName, tableName));
}
TxnStore.MutexAPI.LockHandle handle = null;
@@ -2447,7 +2314,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (rc < 1) {
LOG.debug("Going to rollback");
dbConn.rollback();
- LOG.info("No lock found for rebuild of " + Warehouse.getQualifiedName(dbName, tableName) +
+ LOG.info("No lock found for rebuild of " + TableName.getDbTable(dbName, tableName) +
" when trying to heartbeat");
// It could not be renewed, return that information
return false;
@@ -2460,7 +2327,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(dbConn, e,
- "heartbeatLockMaterializationRebuild(" + Warehouse.getQualifiedName(dbName, tableName) + ", " + txnId + ")");
+ "heartbeatLockMaterializationRebuild(" + TableName.getDbTable(dbName, tableName) + ", " + txnId + ")");
throw new MetaException("Unable to heartbeat rebuild lock due to " +
StringUtils.stringifyException(e));
} finally {
@@ -2573,9 +2440,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* @throws SQLException
* @throws MetaException
*/
- private TxnRecord lockTransactionRecord(Statement stmt, long txnId, Character txnState) throws SQLException, MetaException {
+ private TxnRecord lockTransactionRecord(Statement stmt, long txnId, TxnStatus txnState)
+ throws SQLException, MetaException {
String query = "SELECT \"TXN_TYPE\" FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnId
- + (txnState != null ? " AND \"TXN_STATE\" = " + quoteChar(txnState) : "");
+ + (txnState != null ? " AND \"TXN_STATE\" = " + txnState : "");
try (ResultSet rs = stmt.executeQuery(sqlGenerator.addForUpdateClause(query))) {
return rs.next() ? new TxnRecord(rs.getInt(1)) : null;
}
@@ -2596,9 +2464,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* 1. We use S4U (withe read_committed) to generate the next (ext) lock id. This serializes
* any 2 {@code enqueueLockWithRetry()} calls.
* 2. We use S4U on the relevant TXNS row to block any concurrent abort/commit/etc operations
- * @see #checkLockWithRetry(Connection, long, long)
+ * @see #checkLockWithRetry(Connection, long, long, boolean)
*/
- private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst) throws NoSuchTxnException, TxnAbortedException, MetaException {
+ private ConnectionLockIdPair enqueueLockWithRetry(LockRequest rqst)
+ throws NoSuchTxnException, TxnAbortedException, MetaException {
boolean success = false;
Connection dbConn = null;
try {
@@ -2610,7 +2479,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
stmt = dbConn.createStatement();
if (isValidTxn(txnid)) {
//this also ensures that txn is still there in expected state
- TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TXN_OPEN);
+ TxnRecord txnRecord = lockTransactionRecord(stmt, txnid, TxnStatus.OPEN);
if (txnRecord == null) {
ensureValidTxn(dbConn, txnid, stmt);
shouldNeverHappen(txnid);
@@ -2704,7 +2573,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pstmt.setString(2, dbName);
pstmt.setString(3, tblName);
pstmt.setString(4, partName);
- pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).toString());
+ pstmt.setString(5, OperationType.fromDataOperationType(lc.getOperationType()).getSqlConst());
pstmt.setObject(6, writeId.orElse(null));
pstmt.addBatch();
@@ -2889,12 +2758,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* The clients that operate in blocking mode, can't heartbeat a lock until the lock is acquired.
* We should make CheckLockRequest include timestamp or last request to skip unnecessary heartbeats. Thrift change.
*
- * {@link #checkLock(java.sql.Connection, long, long)} must run at SERIALIZABLE (make sure some lock we are checking
- * against doesn't move from W to A in another txn) but this method can heartbeat in
- * separate txn at READ_COMMITTED.
+ * {@link #checkLock(java.sql.Connection, long, long, boolean)} must run at SERIALIZABLE
+ * (make sure some lock we are checking against doesn't move from W to A in another txn)
+ * but this method can heartbeat in separate txn at READ_COMMITTED.
*
* Retry-by-caller note:
- * Retryable because {@link #checkLock(Connection, long, long)} is
+ * Retryable because {@link #checkLock(Connection, long, long, boolean)} is
*/
@Override
@RetrySemantics.SafeToRetry
@@ -3193,7 +3062,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
TxnUtils.buildQueryWithINClause(conf, queries,
new StringBuilder("UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) +
- " WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + " AND "),
+ " WHERE \"TXN_STATE\" = " + TxnStatus.OPEN + " AND "),
new StringBuilder(""), txnIds, "\"TXN_ID\"", true, false);
int updateCnt = 0;
for (String query : queries) {
@@ -3516,7 +3385,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TXN_OPEN);
+ TxnRecord txnRecord = lockTransactionRecord(stmt, rqst.getTxnid(), TxnStatus.OPEN);
if (txnRecord == null) {
//ensures txn is still there and in expected state
ensureValidTxn(dbConn, rqst.getTxnid(), stmt);
@@ -3536,7 +3405,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
pstmt.setString(2, normalizeCase(rqst.getDbname()));
pstmt.setString(3, normalizeCase(rqst.getTablename()));
pstmt.setString(4, partName);
- pstmt.setString(5, Character.toString(ot.sqlConst));
+ pstmt.setString(5, ot.getSqlConst());
pstmt.setObject(6, writeId);
pstmt.addBatch();
@@ -4424,8 +4293,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
StringBuilder suffix = new StringBuilder();
// add update txns queries to query list
- prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(quoteChar(TXN_ABORTED))
- .append(" WHERE \"TXN_STATE\" = ").append(quoteChar(TXN_OPEN)).append(" AND ");
+ prefix.append("UPDATE \"TXNS\" SET \"TXN_STATE\" = ").append(TxnStatus.ABORTED)
+ .append(" WHERE \"TXN_STATE\" = ").append(TxnStatus.OPEN).append(" AND ");
if (checkHeartbeat) {
suffix.append(" AND \"TXN_LAST_HEARTBEAT\" < ")
.append(TxnDbUtil.getEpochFn(dbProduct)).append("-").append(timeout);
@@ -4735,7 +4604,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
try (Statement stmt = dbConn.createStatement()) {
String s = "UPDATE \"TXNS\" SET \"TXN_LAST_HEARTBEAT\" = " + TxnDbUtil.getEpochFn(dbProduct) +
- " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = '" + TXN_OPEN + "'";
+ " WHERE \"TXN_ID\" = " + txnid + " AND \"TXN_STATE\" = " + TxnStatus.OPEN;
LOG.debug("Going to execute update <" + s + ">");
int rc = stmt.executeUpdate(s);
if (rc < 1) {
@@ -4771,16 +4640,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// could also check WRITE_SET but that seems overkill
return TxnStatus.UNKNOWN;
}
- char txnState = rs.getString(1).charAt(0);
- if (txnState == TXN_ABORTED) {
- return TxnStatus.ABORTED;
- }
- if (txnState == TXN_COMMITTED) {
- return TxnStatus.COMMITTED;
- }
- assert txnState == TXN_OPEN : "we found it in TXNS but it's not ABORTED, so must be OPEN";
+ return TxnStatus.fromString(rs.getString(1));
}
- return TxnStatus.OPEN;
}
/**
@@ -4795,8 +4656,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// Get the count of txns from the given list that are in open state and not read-only.
// If the returned count is same as the input number of txns, then all txns are in open state and not read-only.
- prefix.append("SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN
- + "' AND \"TXN_TYPE\" != " + TxnType.READ_ONLY.getValue() + " AND ");
+ prefix.append("SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN
+ + " AND \"TXN_TYPE\" != " + TxnType.READ_ONLY.getValue() + " AND ");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, new StringBuilder(),
txnIds, "\"TXN_ID\"", false, false);
@@ -4833,10 +4694,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try (ResultSet rs = stmt.executeQuery(query)) {
while (rs.next()) {
long txnId = rs.getLong(1);
- char txnState = rs.getString(2).charAt(0);
+ TxnStatus txnState = TxnStatus.fromString(rs.getString(2));
TxnType txnType = TxnType.findByValue(rs.getInt(3));
- if (txnState != TXN_OPEN) {
+ if (txnState != TxnStatus.OPEN) {
txnInfo.append("{").append(txnId).append(",").append(txnState).append("}");
} else if (txnType == TxnType.READ_ONLY) {
txnInfo.append("{").append(txnId).append(",read-only}");
@@ -4920,7 +4781,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new NoSuchTxnException("No such transaction " + JavaUtils.txnIdToString(txnid));
}
}
- if (rs.getString(1).charAt(0) == TXN_ABORTED) {
+ if (TxnStatus.fromString(rs.getString(1)) == TxnStatus.ABORTED) {
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
throw new TxnAbortedException("Transaction " + JavaUtils.txnIdToString(txnid)
@@ -5047,8 +4908,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
timeOutLocks(dbConn);
while(true) {
stmt = dbConn.createStatement();
- String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN +
- "' AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout +
+ String s = " \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN +
+ " AND \"TXN_LAST_HEARTBEAT\" < " + TxnDbUtil.getEpochFn(dbProduct) + "-" + timeout +
" AND \"TXN_TYPE\" != " + TxnType.REPL_CREATED.getValue();
//safety valve for extreme cases
s = sqlGenerator.addLimitClause(10 * TIMED_OUT_TXN_ABORT_BATCH_SIZE, s);
@@ -5108,7 +4969,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = '" + TXN_OPEN + "'";
+ String s = "SELECT COUNT(*) FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.OPEN;
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
if (!rs.next()) {
@@ -5499,6 +5360,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
public boolean isWrapperFor(Class<?> iface) throws SQLException {
throw new UnsupportedOperationException();
}
- };
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStatus.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStatus.java
new file mode 100644
index 0000000..a9ad560
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStatus.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.hive.metastore.api.TxnState;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
+
+/**
+ * These are the valid values for TXNS.TXN_STATE.
+ */
+public enum TxnStatus {
+ OPEN('o', TxnState.OPEN),
+ ABORTED('a', TxnState.ABORTED),
+ COMMITTED('c', TxnState.COMMITTED),
+ UNKNOWN('u', null);
+
+ private final char sqlConst;
+ private final TxnState txnState;
+
+ private static final Map<String, TxnStatus> LOOKUP =
+ Arrays.stream(TxnStatus.values()).collect(toMap(TxnStatus::getSqlConst, identity()));
+
+ TxnStatus(char sqlConst, TxnState txnState) {
+
+ this.sqlConst = sqlConst;
+ this.txnState = txnState;
+ }
+
+ public String toString() {
+ return "'" + getSqlConst() + "'";
+ }
+
+ public String getSqlConst() {
+ return Character.toString(sqlConst);
+ }
+
+ public TxnState toTxnState() {
+ return Optional.of(txnState).orElseThrow(IllegalArgumentException::new);
+ }
+
+ public static TxnStatus fromString(String sqlConst) {
+ return Optional.of(LOOKUP.get(sqlConst)).orElseThrow(IllegalArgumentException::new);
+ }
+}