You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/06/03 12:00:45 UTC
[hive] branch master updated: HIVE-26267: Addendum to HIVE-26107: Issue with the PreparedStatement in TxnHandler#compact (Laszlo Vegh, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5f49636c4a HIVE-26267: Addendum to HIVE-26107: Issue with the PreparedStatement in TxnHandler#compact (Laszlo Vegh, reviewed by Denys Kuzmenko)
5f49636c4a is described below
commit 5f49636c4ab2df9996aa8a874908926556c6eb5e
Author: veghlaci05 <90...@users.noreply.github.com>
AuthorDate: Fri Jun 3 14:00:36 2022 +0200
HIVE-26267: Addendum to HIVE-26107: Issue with the PreparedStatement in TxnHandler#compact (Laszlo Vegh, reviewed by Denys Kuzmenko)
Closes #3325
---
.../hadoop/hive/metastore/txn/TxnHandler.java | 224 +++++++++++----------
1 file changed, 113 insertions(+), 111 deletions(-)
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index dabb554eea..cb9ced7f78 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3689,9 +3689,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
public CompactionResponse compact(CompactionRequest rqst) throws MetaException {
// Put a compaction request in the queue.
try {
- Connection dbConn = null;
- Statement stmt = null;
- PreparedStatement pst = null;
TxnStore.MutexAPI.LockHandle handle = null;
try {
lockInternal();
@@ -3701,122 +3698,127 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* compactions for any resource.
*/
handle = getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- long id = generateCompactionQueueId(stmt);
-
- GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(
- Collections.singletonList(getFullTableName(rqst.getDbname(), rqst.getTablename())));
- final ValidCompactorWriteIdList tblValidWriteIds =
- TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0));
- LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
+ try (Statement stmt = dbConn.createStatement()) {
+
+ long id = generateCompactionQueueId(stmt);
+
+ GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(
+ Collections.singletonList(getFullTableName(rqst.getDbname(), rqst.getTablename())));
+ final ValidCompactorWriteIdList tblValidWriteIds =
+ TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0));
+ LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+
+ StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE").
+ append(" (\"CQ_STATE\" IN(").
+ append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)).
+ append(") OR (\"CQ_STATE\" = ").append(quoteChar(READY_FOR_CLEANING)).
+ append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))").
+ append(" AND \"CQ_DATABASE\"=?").
+ append(" AND \"CQ_TABLE\"=?").append(" AND ");
+ if(rqst.getPartitionname() == null) {
+ sb.append("\"CQ_PARTITION\" is null");
+ } else {
+ sb.append("\"CQ_PARTITION\"=?");
+ }
- List<String> params = new ArrayList<>();
- StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE").
- append(" (\"CQ_STATE\" IN(").
- append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)).
- append(") OR (\"CQ_STATE\" = ").append(quoteChar(READY_FOR_CLEANING)).
- append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))").
- append(" AND \"CQ_DATABASE\"=?").
- append(" AND \"CQ_TABLE\"=?").append(" AND ");
- params.add(Long.toString(tblValidWriteIds.getHighWatermark()));
- params.add(rqst.getDbname());
- params.add(rqst.getTablename());
- if(rqst.getPartitionname() == null) {
- sb.append("\"CQ_PARTITION\" is null");
- } else {
- sb.append("\"CQ_PARTITION\"=?");
- params.add(rqst.getPartitionname());
- }
+ try (PreparedStatement pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(sb.toString()))) {
+ pst.setLong(1, tblValidWriteIds.getHighWatermark());
+ pst.setString(2, rqst.getDbname());
+ pst.setString(3, rqst.getTablename());
+ if (rqst.getPartitionname() != null) {
+ pst.setString(4, rqst.getPartitionname());
+ }
+ LOG.debug("Going to execute query <" + sb + ">");
+ try (ResultSet rs = pst.executeQuery()) {
+ if(rs.next()) {
+ long enqueuedId = rs.getLong(1);
+ String state = compactorStateToResponse(rs.getString(2).charAt(0));
+ LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() +
+ "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) +
+ " with id=" + enqueuedId);
+ CompactionResponse resp = new CompactionResponse(-1, REFUSED_RESPONSE, false);
+ resp.setErrormessage("Compaction is already scheduled with state=" + quoteString(state) +
+ " and id=" + enqueuedId);
+ return resp;
+ }
+ }
+ }
+ List<String> params = new ArrayList<>();
+ StringBuilder buf = new StringBuilder("INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " +
+ "\"CQ_TABLE\", ");
+ String partName = rqst.getPartitionname();
+ if (partName != null) buf.append("\"CQ_PARTITION\", ");
+ buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\"");
+ if (rqst.getProperties() != null) {
+ buf.append(", \"CQ_TBLPROPERTIES\"");
+ }
+ if (rqst.getRunas() != null) {
+ buf.append(", \"CQ_RUN_AS\"");
+ }
+ if (rqst.getInitiatorId() != null) {
+ buf.append(", \"CQ_INITIATOR_ID\"");
+ }
+ if (rqst.getInitiatorVersion() != null) {
+ buf.append(", \"CQ_INITIATOR_VERSION\"");
+ }
+ buf.append(") values (");
+ buf.append(id);
+ buf.append(", ?");
+ buf.append(", ?");
+ buf.append(", ");
+ params.add(rqst.getDbname());
+ params.add(rqst.getTablename());
+ if (partName != null) {
+ buf.append("?, '");
+ params.add(partName);
+ } else {
+ buf.append("'");
+ }
+ buf.append(INITIATED_STATE);
+ buf.append("', '");
+ buf.append(thriftCompactionType2DbType(rqst.getType()));
+ buf.append("',");
+ buf.append(getEpochFn(dbProduct));
+ if (rqst.getProperties() != null) {
+ buf.append(", ?");
+ params.add(new StringableMap(rqst.getProperties()).toString());
+ }
+ if (rqst.getRunas() != null) {
+ buf.append(", ?");
+ params.add(rqst.getRunas());
+ }
+ if (rqst.getInitiatorId() != null) {
+ buf.append(", ?");
+ params.add(rqst.getInitiatorId());
+ }
+ if (rqst.getInitiatorVersion() != null) {
+ buf.append(", ?");
+ params.add(rqst.getInitiatorVersion());
+ }
+ buf.append(")");
+ String s = buf.toString();
- pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params);
- LOG.debug("Going to execute query <" + sb + ">");
- ResultSet rs = pst.executeQuery();
- if(rs.next()) {
- long enqueuedId = rs.getLong(1);
- String state = compactorStateToResponse(rs.getString(2).charAt(0));
- LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() +
- "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) +
- " with id=" + enqueuedId);
- CompactionResponse resp = new CompactionResponse(-1, REFUSED_RESPONSE, false);
- resp.setErrormessage("Compaction is already scheduled with state=" + quoteString(state) +
- " and id=" + enqueuedId);
- return resp;
- }
- close(rs);
- closeStmt(pst);
- params.clear();
- StringBuilder buf = new StringBuilder("INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " +
- "\"CQ_TABLE\", ");
- String partName = rqst.getPartitionname();
- if (partName != null) buf.append("\"CQ_PARTITION\", ");
- buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\"");
- if (rqst.getProperties() != null) {
- buf.append(", \"CQ_TBLPROPERTIES\"");
- }
- if (rqst.getRunas() != null) {
- buf.append(", \"CQ_RUN_AS\"");
- }
- if (rqst.getInitiatorId() != null) {
- buf.append(", \"CQ_INITIATOR_ID\"");
- }
- if (rqst.getInitiatorVersion() != null) {
- buf.append(", \"CQ_INITIATOR_VERSION\"");
- }
- buf.append(") values (");
- buf.append(id);
- buf.append(", ?");
- buf.append(", ?");
- buf.append(", ");
- params.add(rqst.getDbname());
- params.add(rqst.getTablename());
- if (partName != null) {
- buf.append("?, '");
- params.add(partName);
- } else {
- buf.append("'");
- }
- buf.append(INITIATED_STATE);
- buf.append("', '");
- buf.append(thriftCompactionType2DbType(rqst.getType()));
- buf.append("',");
- buf.append(getEpochFn(dbProduct));
- if (rqst.getProperties() != null) {
- buf.append(", ?");
- params.add(new StringableMap(rqst.getProperties()).toString());
- }
- if (rqst.getRunas() != null) {
- buf.append(", ?");
- params.add(rqst.getRunas());
- }
- if (rqst.getInitiatorId() != null) {
- buf.append(", ?");
- params.add(rqst.getInitiatorId());
- }
- if (rqst.getInitiatorVersion() != null) {
- buf.append(", ?");
- params.add(rqst.getInitiatorVersion());
+ try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params)) {
+ LOG.debug("Going to execute update <" + s + ">");
+ pst.executeUpdate();
+ }
+ LOG.debug("Going to commit");
+ dbConn.commit();
+ return new CompactionResponse(id, INITIATED_RESPONSE, true);
+ } catch (SQLException e) {
+ LOG.debug("Going to rollback: ", e);
+ dbConn.rollback();
+ throw e;
+ }
}
- buf.append(")");
- String s = buf.toString();
- pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
- LOG.debug("Going to execute update <" + s + ">");
- pst.executeUpdate();
- LOG.debug("Going to commit");
- dbConn.commit();
- return new CompactionResponse(id, INITIATED_RESPONSE, true);
} catch (SQLException e) {
- LOG.debug("Going to rollback: ", e);
- rollbackDBConn(dbConn);
checkRetryable(e, "COMPACT(" + rqst + ")");
- throw new MetaException("Unable to select from transaction database " +
+ throw new MetaException("Unable to put the compaction request into the queue: " +
StringUtils.stringifyException(e));
} finally {
- closeStmt(pst);
- closeStmt(stmt);
- closeDbConn(dbConn);
- if(handle != null) {
+ if (handle != null) {
handle.releaseLocks();
}
unlockInternal();