You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/06/03 12:00:45 UTC

[hive] branch master updated: HIVE-26267: Addendum to HIVE-26107: Issue with the PreparedStatement in TxnHandler#compact (Laszlo Vegh, reviewed by Denys Kuzmenko)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f49636c4a HIVE-26267: Addendum to HIVE-26107: Issue with the PreparedStatement in TxnHandler#compact (Laszlo Vegh, reviewed by Denys Kuzmenko)
5f49636c4a is described below

commit 5f49636c4ab2df9996aa8a874908926556c6eb5e
Author: veghlaci05 <90...@users.noreply.github.com>
AuthorDate: Fri Jun 3 14:00:36 2022 +0200

    HIVE-26267: Addendum to HIVE-26107: Issue with the PreparedStatement in TxnHandler#compact (Laszlo Vegh, reviewed by Denys Kuzmenko)
    
    Closes #3325
---
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 224 +++++++++++----------
 1 file changed, 113 insertions(+), 111 deletions(-)

diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index dabb554eea..cb9ced7f78 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3689,9 +3689,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   public CompactionResponse compact(CompactionRequest rqst) throws MetaException {
     // Put a compaction request in the queue.
     try {
-      Connection dbConn = null;
-      Statement stmt = null;
-      PreparedStatement pst = null;
       TxnStore.MutexAPI.LockHandle handle = null;
       try {
         lockInternal();
@@ -3701,122 +3698,127 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          * compactions for any resource.
          */
         handle = getMutexAPI().acquireLock(MUTEX_KEY.CompactionScheduler.name());
-        dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
-        long id = generateCompactionQueueId(stmt);
-
-        GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(
-            Collections.singletonList(getFullTableName(rqst.getDbname(), rqst.getTablename())));
-        final ValidCompactorWriteIdList tblValidWriteIds =
-            TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0));
-        LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+        try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED)) {
+          try (Statement stmt = dbConn.createStatement()) {
+
+            long id = generateCompactionQueueId(stmt);
+
+            GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(
+                Collections.singletonList(getFullTableName(rqst.getDbname(), rqst.getTablename())));
+            final ValidCompactorWriteIdList tblValidWriteIds =
+                TxnUtils.createValidCompactWriteIdList(getValidWriteIds(request).getTblValidWriteIds().get(0));
+            LOG.debug("ValidCompactWriteIdList: " + tblValidWriteIds.writeToString());
+
+            StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE").
+                append(" (\"CQ_STATE\" IN(").
+                append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)).
+                append(") OR (\"CQ_STATE\" = ").append(quoteChar(READY_FOR_CLEANING)).
+                append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))").
+                append(" AND \"CQ_DATABASE\"=?").
+                append(" AND \"CQ_TABLE\"=?").append(" AND ");
+            if(rqst.getPartitionname() == null) {
+              sb.append("\"CQ_PARTITION\" is null");
+            } else {
+              sb.append("\"CQ_PARTITION\"=?");
+            }
 
-        List<String> params = new ArrayList<>();
-        StringBuilder sb = new StringBuilder("SELECT \"CQ_ID\", \"CQ_STATE\" FROM \"COMPACTION_QUEUE\" WHERE").
-          append(" (\"CQ_STATE\" IN(").
-            append(quoteChar(INITIATED_STATE)).append(",").append(quoteChar(WORKING_STATE)).
-            append(") OR (\"CQ_STATE\" = ").append(quoteChar(READY_FOR_CLEANING)).
-            append(" AND \"CQ_HIGHEST_WRITE_ID\" = ?))").
-            append(" AND \"CQ_DATABASE\"=?").
-            append(" AND \"CQ_TABLE\"=?").append(" AND ");
-        params.add(Long.toString(tblValidWriteIds.getHighWatermark()));
-        params.add(rqst.getDbname());
-        params.add(rqst.getTablename());
-        if(rqst.getPartitionname() == null) {
-          sb.append("\"CQ_PARTITION\" is null");
-        } else {
-          sb.append("\"CQ_PARTITION\"=?");
-          params.add(rqst.getPartitionname());
-        }
+            try (PreparedStatement pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(sb.toString()))) {
+              pst.setLong(1, tblValidWriteIds.getHighWatermark());
+              pst.setString(2, rqst.getDbname());
+              pst.setString(3, rqst.getTablename());
+              if (rqst.getPartitionname() != null) {
+                pst.setString(4, rqst.getPartitionname());
+              }
+              LOG.debug("Going to execute query <" + sb + ">");
+              try (ResultSet rs = pst.executeQuery()) {
+                if(rs.next()) {
+                  long enqueuedId = rs.getLong(1);
+                  String state = compactorStateToResponse(rs.getString(2).charAt(0));
+                  LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() +
+                      "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) +
+                      " with id=" + enqueuedId);
+                  CompactionResponse resp = new CompactionResponse(-1, REFUSED_RESPONSE, false);
+                  resp.setErrormessage("Compaction is already scheduled with state=" + quoteString(state) +
+                      " and id=" + enqueuedId);
+                  return resp;
+                }
+              }
+            }
+            List<String> params = new ArrayList<>();
+            StringBuilder buf = new StringBuilder("INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " +
+                "\"CQ_TABLE\", ");
+            String partName = rqst.getPartitionname();
+            if (partName != null) buf.append("\"CQ_PARTITION\", ");
+            buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\"");
+            if (rqst.getProperties() != null) {
+              buf.append(", \"CQ_TBLPROPERTIES\"");
+            }
+            if (rqst.getRunas() != null) {
+              buf.append(", \"CQ_RUN_AS\"");
+            }
+            if (rqst.getInitiatorId() != null) {
+              buf.append(", \"CQ_INITIATOR_ID\"");
+            }
+            if (rqst.getInitiatorVersion() != null) {
+              buf.append(", \"CQ_INITIATOR_VERSION\"");
+            }
+            buf.append(") values (");
+            buf.append(id);
+            buf.append(", ?");
+            buf.append(", ?");
+            buf.append(", ");
+            params.add(rqst.getDbname());
+            params.add(rqst.getTablename());
+            if (partName != null) {
+              buf.append("?, '");
+              params.add(partName);
+            } else {
+              buf.append("'");
+            }
+            buf.append(INITIATED_STATE);
+            buf.append("', '");
+            buf.append(thriftCompactionType2DbType(rqst.getType()));
+            buf.append("',");
+            buf.append(getEpochFn(dbProduct));
+            if (rqst.getProperties() != null) {
+              buf.append(", ?");
+              params.add(new StringableMap(rqst.getProperties()).toString());
+            }
+            if (rqst.getRunas() != null) {
+              buf.append(", ?");
+              params.add(rqst.getRunas());
+            }
+            if (rqst.getInitiatorId() != null) {
+              buf.append(", ?");
+              params.add(rqst.getInitiatorId());
+            }
+            if (rqst.getInitiatorVersion() != null) {
+              buf.append(", ?");
+              params.add(rqst.getInitiatorVersion());
+            }
+            buf.append(")");
+            String s = buf.toString();
 
-        pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params);
-        LOG.debug("Going to execute query <" + sb + ">");
-        ResultSet rs = pst.executeQuery();
-        if(rs.next()) {
-          long enqueuedId = rs.getLong(1);
-          String state = compactorStateToResponse(rs.getString(2).charAt(0));
-          LOG.info("Ignoring request to compact " + rqst.getDbname() + "/" + rqst.getTablename() +
-            "/" + rqst.getPartitionname() + " since it is already " + quoteString(state) +
-            " with id=" + enqueuedId);
-          CompactionResponse resp = new CompactionResponse(-1, REFUSED_RESPONSE, false);
-          resp.setErrormessage("Compaction is already scheduled with state=" + quoteString(state) +
-              " and id=" + enqueuedId);
-          return resp;
-        }
-        close(rs);
-        closeStmt(pst);
-        params.clear();
-        StringBuilder buf = new StringBuilder("INSERT INTO \"COMPACTION_QUEUE\" (\"CQ_ID\", \"CQ_DATABASE\", " +
-          "\"CQ_TABLE\", ");
-        String partName = rqst.getPartitionname();
-        if (partName != null) buf.append("\"CQ_PARTITION\", ");
-        buf.append("\"CQ_STATE\", \"CQ_TYPE\", \"CQ_ENQUEUE_TIME\"");
-        if (rqst.getProperties() != null) {
-          buf.append(", \"CQ_TBLPROPERTIES\"");
-        }
-        if (rqst.getRunas() != null) {
-          buf.append(", \"CQ_RUN_AS\"");
-        }
-        if (rqst.getInitiatorId() != null) {
-          buf.append(", \"CQ_INITIATOR_ID\"");
-        }
-        if (rqst.getInitiatorVersion() != null) {
-          buf.append(", \"CQ_INITIATOR_VERSION\"");
-        }
-        buf.append(") values (");
-        buf.append(id);
-        buf.append(", ?");
-        buf.append(", ?");
-        buf.append(", ");
-        params.add(rqst.getDbname());
-        params.add(rqst.getTablename());
-        if (partName != null) {
-          buf.append("?, '");
-          params.add(partName);
-        } else {
-          buf.append("'");
-        }
-        buf.append(INITIATED_STATE);
-        buf.append("', '");
-        buf.append(thriftCompactionType2DbType(rqst.getType()));
-        buf.append("',");
-        buf.append(getEpochFn(dbProduct));
-        if (rqst.getProperties() != null) {
-          buf.append(", ?");
-          params.add(new StringableMap(rqst.getProperties()).toString());
-        }
-        if (rqst.getRunas() != null) {
-          buf.append(", ?");
-          params.add(rqst.getRunas());
-        }
-        if (rqst.getInitiatorId() != null) {
-          buf.append(", ?");
-          params.add(rqst.getInitiatorId());
-        }
-        if (rqst.getInitiatorVersion() != null) {
-          buf.append(", ?");
-          params.add(rqst.getInitiatorVersion());
+            try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params)) {
+              LOG.debug("Going to execute update <" + s + ">");
+              pst.executeUpdate();
+            }
+            LOG.debug("Going to commit");
+            dbConn.commit();
+            return new CompactionResponse(id, INITIATED_RESPONSE, true);
+          } catch (SQLException e) {
+            LOG.debug("Going to rollback: ", e);
+            dbConn.rollback();
+            throw e;
+          }
         }
-        buf.append(")");
-        String s = buf.toString();
-        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
-        LOG.debug("Going to execute update <" + s + ">");
-        pst.executeUpdate();
-        LOG.debug("Going to commit");
-        dbConn.commit();
-        return new CompactionResponse(id, INITIATED_RESPONSE, true);
       } catch (SQLException e) {
-        LOG.debug("Going to rollback: ", e);
-        rollbackDBConn(dbConn);
         checkRetryable(e, "COMPACT(" + rqst + ")");
-        throw new MetaException("Unable to select from transaction database " +
+        throw new MetaException("Unable to put the compaction request into the queue: " +
           StringUtils.stringifyException(e));
       } finally {
-        closeStmt(pst);
-        closeStmt(stmt);
-        closeDbConn(dbConn);
-        if(handle != null) {
+        if (handle != null) {
           handle.releaseLocks();
         }
         unlockInternal();