You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/10/25 04:57:18 UTC
hive git commit: HIVE-20607: TxnHandler should use PreparedStatement
to execute direct SQL queries (Sankar Hariappan, reviewed by Daniel Dai)
Repository: hive
Updated Branches:
refs/heads/branch-3 507a6f7a9 -> 09b92d3c8
HIVE-20607: TxnHandler should use PreparedStatement to execute direct SQL queries (Sankar Hariappan, reviewed by Daniel Dai)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/09b92d3c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/09b92d3c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/09b92d3c
Branch: refs/heads/branch-3
Commit: 09b92d3c864b00df99923f03a843a8179bd874a0
Parents: 507a6f7
Author: Sankar Hariappan <sa...@apache.org>
Authored: Thu Oct 25 10:26:52 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Thu Oct 25 10:26:52 2018 +0530
----------------------------------------------------------------------
.../listener/DbNotificationListener.java | 44 +-
.../hive/ql/lockmgr/TestDbTxnManager.java | 4 +-
.../hive/metastore/tools/SQLGenerator.java | 110 +++-
.../hadoop/hive/metastore/txn/TxnHandler.java | 556 ++++++++++++-------
4 files changed, 484 insertions(+), 230 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 909ed56..e5da5d6 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -23,7 +23,7 @@ import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
@@ -747,11 +747,14 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" +
" \"TXN_WRITE_NOTIFICATION_LOG\" " +
- "where \"WNL_DATABASE\" = " + quoteString(dbName) +
- "and \"WNL_TABLE\" = " + quoteString(tblName) + " and \"WNL_PARTITION\" = " +
- quoteString(partition) + " and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId()));
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
+ "where \"WNL_DATABASE\" = ? " +
+ "and \"WNL_TABLE\" = ? " + " and \"WNL_PARTITION\" = ? " +
+ "and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId()));
+ List<String> params = Arrays.asList(dbName, tblName, partition);
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName), quoteString(partition));
+ rs = pst.executeQuery();
if (!rs.next()) {
// if rs is empty then no lock is taken and thus it can not cause deadlock.
long nextNLId = getNextNLId(stmt, sqlGenerator,
@@ -760,6 +763,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
"(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
"\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " +
"\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)";
+ closeStmt(pst);
int currentTime = now();
pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
pst.setLong(1, nextNLId);
@@ -792,6 +796,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
" \"WNL_FILES\" = ? ," +
" \"WNL_EVENT_TIME\" = ?" +
" where \"WNL_ID\" = ?";
+ closeStmt(pst);
pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
pst.setString(1, tableObj);
pst.setString(2, partitionObj);
@@ -825,6 +830,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
return;
}
Statement stmt = null;
+ PreparedStatement pst = null;
ResultSet rs = null;
try {
stmt = dbConn.createStatement();
@@ -851,21 +857,20 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
long nextNLId = getNextNLId(stmt, sqlGenerator,
"org.apache.hadoop.hive.metastore.model.MNotificationLog");
- List<String> insert = new ArrayList<>();
+ String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," +
+ quoteString(" ") + ",?, ?)";
- insert.add(0, nextNLId + "," + nextEventId + "," + now() + "," +
- quoteString(event.getEventType()) + "," + quoteString(event.getDbName()) + "," +
- quoteString(" ") + "," + quoteString(event.getMessage()) + "," +
- quoteString(event.getMessageFormat()));
+ s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " +
+ " \"EVENT_TYPE\", \"DB_NAME\", " +
+ " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal;
+ List<String> params = Arrays.asList(
+ event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat());
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
- List<String> sql = sqlGenerator.createInsertValuesStmt(
- "\"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " +
- " \"EVENT_TYPE\", \"DB_NAME\"," +
- " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\")", insert);
- for (String q : sql) {
- LOG.info("Going to execute insert <" + q + ">");
- stmt.execute(q);
- }
+ LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(event.getEventType()), quoteString(event.getDbName()),
+ quoteString(event.getMessage()), quoteString(event.getMessageFormat()));
+ pst.execute();
// Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
if (event.isSetEventId()) {
@@ -878,6 +883,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
throw e;
} finally {
closeStmt(stmt);
+ closeStmt(pst);
close(rs);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 2576ba2..cc86afe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -98,8 +98,8 @@ public class TestDbTxnManager {
public void testSingleReadPartition() throws Exception {
addPartitionInput(newTable(true));
QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
- txnMgr.openTxn(ctx, null);
- txnMgr.acquireLocks(qp, ctx, null);
+ txnMgr.openTxn(ctx, "fred");
+ txnMgr.acquireLocks(qp, ctx, "fred");
List<HiveLock> locks = ctx.getHiveLocks();
Assert.assertEquals(1, locks.size());
Assert.assertEquals(1,
http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index d0ac7db..49b737e 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -47,18 +50,80 @@ public final class SQLGenerator {
}
/**
- * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+ * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+ *
+ * @param tblColumns e.g. "T(a,b,c)"
+ * @param rows e.g. list of Strings like 3,4,'d'
+ * @param paramsList List of parameters which in turn is list of Strings to be set in PreparedStatement object
+ * @return List PreparedStatement objects for fully formed INSERT INTO ... statements
+ */
+ public List<PreparedStatement> createInsertValuesPreparedStmt(Connection dbConn,
+ String tblColumns, List<String> rows,
+ List<List<String>> paramsList)
+ throws SQLException {
+ if (rows == null || rows.size() == 0) {
+ return Collections.emptyList();
+ }
+ assert((paramsList == null) || (rows.size() == paramsList.size()));
+
+ List<Integer> rowsCountInStmts = new ArrayList<>();
+ List<String> insertStmts = createInsertValuesStmt(tblColumns, rows, rowsCountInStmts);
+ assert(insertStmts.size() == rowsCountInStmts.size());
+
+ List<PreparedStatement> preparedStmts = new ArrayList<>();
+ int paramsListFromIdx = 0;
+ try {
+ for (int stmtIdx = 0; stmtIdx < insertStmts.size(); stmtIdx++) {
+ String sql = insertStmts.get(stmtIdx);
+ PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null);
+ if (paramsList != null) {
+ int paramIdx = 1;
+ int paramsListToIdx = paramsListFromIdx + rowsCountInStmts.get(stmtIdx);
+ for (int paramsListIdx = paramsListFromIdx; paramsListIdx < paramsListToIdx; paramsListIdx++) {
+ List<String> params = paramsList.get(paramsListIdx);
+ for (int i = 0; i < params.size(); i++, paramIdx++) {
+ pStmt.setString(paramIdx, params.get(i));
+ }
+ }
+ paramsListFromIdx = paramsListToIdx;
+ }
+ preparedStmts.add(pStmt);
+ }
+ } catch (SQLException e) {
+ for (PreparedStatement pst : preparedStmts) {
+ pst.close();
+ }
+ throw e;
+ }
+ return preparedStmts;
+ }
+
+ /**
+ * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
*
* @param tblColumns e.g. "T(a,b,c)"
* @param rows e.g. list of Strings like 3,4,'d'
* @return fully formed INSERT INTO ... statements
*/
public List<String> createInsertValuesStmt(String tblColumns, List<String> rows) {
+ return createInsertValuesStmt(tblColumns, rows, null);
+ }
+
+ /**
+ * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+ *
+ * @param tblColumns e.g. "T(a,b,c)"
+ * @param rows e.g. list of Strings like 3,4,'d'
+ * @param rowsCountInStmts Output the number of rows in each insert statement returned.
+ * @return fully formed INSERT INTO ... statements
+ */
+ private List<String> createInsertValuesStmt(String tblColumns, List<String> rows, List<Integer> rowsCountInStmts) {
if (rows == null || rows.size() == 0) {
return Collections.emptyList();
}
List<String> insertStmts = new ArrayList<>();
StringBuilder sb = new StringBuilder();
+ int numRowsInCurrentStmt = 0;
switch (dbProduct) {
case ORACLE:
if (rows.size() > 1) {
@@ -69,15 +134,23 @@ public final class SQLGenerator {
if (numRows > 0) {
sb.append(" select * from dual");
insertStmts.add(sb.toString());
+ if (rowsCountInStmts != null) {
+ rowsCountInStmts.add(numRowsInCurrentStmt);
+ }
+ numRowsInCurrentStmt = 0;
}
sb.setLength(0);
sb.append("insert all ");
}
sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows))
.append(") ");
+ numRowsInCurrentStmt++;
}
sb.append("select * from dual");
insertStmts.add(sb.toString());
+ if (rowsCountInStmts != null) {
+ rowsCountInStmts.add(numRowsInCurrentStmt);
+ }
return insertStmts;
}
//fall through
@@ -89,13 +162,21 @@ public final class SQLGenerator {
if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) {
if (numRows > 0) {
insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma
+ if (rowsCountInStmts != null) {
+ rowsCountInStmts.add(numRowsInCurrentStmt);
+ }
+ numRowsInCurrentStmt = 0;
}
sb.setLength(0);
sb.append("insert into ").append(tblColumns).append(" values");
}
sb.append('(').append(rows.get(numRows)).append("),");
+ numRowsInCurrentStmt++;
}
insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma
+ if (rowsCountInStmts != null) {
+ rowsCountInStmts.add(numRowsInCurrentStmt);
+ }
return insertStmts;
default:
String msg = "Unrecognized database product name <" + dbProduct + ">";
@@ -171,6 +252,33 @@ public final class SQLGenerator {
}
}
+ /**
+ * Make PreparedStatement object with list of String type parameters to be set.
+ * It is assumed the input sql string have the number of "?" equal to number of parameters
+ * passed as input.
+ * @param dbConn - Connection object
+ * @param sql - SQL statement with "?" for input parameters.
+ * @param parameters - List of String type parameters to be set in PreparedStatement object
+ * @return PreparedStatement type object
+ * @throws SQLException
+ */
+ public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List<String> parameters)
+ throws SQLException {
+ PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql));
+ if ((parameters == null) || parameters.isEmpty()) {
+ return pst;
+ }
+ try {
+ for (int i = 1; i <= parameters.size(); i++) {
+ pst.setString(i, parameters.get(i - 1));
+ }
+ } catch (SQLException e) {
+ pst.close();
+ throw e;
+ }
+ return pst;
+ }
+
public DatabaseProduct getDbProduct() {
return dbProduct;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 4df75fb..b969efb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -21,6 +21,7 @@ import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.Driver;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
@@ -574,10 +575,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throws SQLException, MetaException {
int numTxns = rqst.getNum_txns();
ResultSet rs = null;
+ List<PreparedStatement> insertPreparedStmts = null;
TxnType txnType = TxnType.DEFAULT;
try {
if (rqst.isSetReplPolicy()) {
- List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt);
+ List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), dbConn);
if (!targetTxnIdList.isEmpty()) {
if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
@@ -607,16 +609,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
List<Long> txnIds = new ArrayList<>(numTxns);
List<String> rows = new ArrayList<>();
+ List<String> params = new ArrayList<>();
+ params.add(rqst.getUser());
+ params.add(rqst.getHostname());
+ List<List<String>> paramsList = new ArrayList<>(numTxns);
for (long i = first; i < first + numTxns; i++) {
txnIds.add(i);
- rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
- + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue());
+ rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ",?,?," + txnType.getValue());
+ paramsList.add(params);
}
- List<String> queries = sqlGenerator.createInsertValuesStmt(
- "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows);
- for (String q : queries) {
- LOG.debug("Going to execute update <" + q + ">");
- stmt.execute(q);
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+ "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)",
+ rows, paramsList);
+ for (PreparedStatement pst : insertPreparedStmts) {
+ pst.execute();
}
// Need to register minimum open txnid for current transactions into MIN_HISTORY table.
@@ -648,18 +654,23 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (rqst.isSetReplPolicy()) {
List<String> rowsRepl = new ArrayList<>();
-
+ for (PreparedStatement pst : insertPreparedStmts) {
+ pst.close();
+ }
+ insertPreparedStmts.clear();
+ params.clear();
+ paramsList.clear();
+ params.add(rqst.getReplPolicy());
for (int i = 0; i < numTxns; i++) {
- rowsRepl.add(
- quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+ rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+ paramsList.add(params);
}
- List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
- "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl);
-
- for (String query : queriesRepl) {
- LOG.info("Going to execute insert <" + query + ">");
- stmt.execute(query);
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+ "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl,
+ paramsList);
+ for (PreparedStatement pst : insertPreparedStmts) {
+ pst.execute();
}
}
@@ -669,12 +680,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
return txnIds;
} finally {
+ if (insertPreparedStmts != null) {
+ for (PreparedStatement pst : insertPreparedStmts) {
+ pst.close();
+ }
+ }
close(rs);
}
}
- private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Statement stmt)
+ private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Connection dbConn)
throws SQLException {
+ PreparedStatement pst = null;
ResultSet rs = null;
try {
List<String> inQueries = new ArrayList<>();
@@ -682,15 +699,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
StringBuilder suffix = new StringBuilder();
List<Long> targetTxnIdList = new ArrayList<>();
prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where ");
- suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy));
+ suffix.append(" and RTM_REPL_POLICY = ?");
TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, sourceTxnIdList,
"RTM_SRC_TXN_ID", false, false);
+ List<String> params = Arrays.asList(replPolicy);
for (String query : inQueries) {
- LOG.debug("Going to execute select <" + query + ">");
- rs = stmt.executeQuery(query);
+ LOG.debug("Going to execute select <" + query.replaceAll("\\?", "{}") + ">", quoteString(replPolicy));
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+ rs = pst.executeQuery();
while (rs.next()) {
targetTxnIdList.add(rs.getLong(1));
}
+ closeStmt(pst);
}
LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString());
return targetTxnIdList;
@@ -698,6 +718,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
LOG.warn("failed to get target txn ids " + e.getMessage());
throw e;
} finally {
+ closeStmt(pst);
close(rs);
}
}
@@ -707,12 +728,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException {
try {
Connection dbConn = null;
- Statement stmt = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt);
+ List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), dbConn);
if (targetTxnIds.isEmpty()) {
LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy);
return -1;
@@ -726,7 +745,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to get target transaction id "
+ StringUtils.stringifyException(e));
} finally {
- close(null, stmt, dbConn);
+ closeDbConn(dbConn);
unlockInternal();
}
} catch (RetryException e) {
@@ -734,6 +753,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ private void deleteReplTxnMapEntry(Connection dbConn, long sourceTxnId, String replPolicy) throws SQLException {
+ String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = ?";
+ try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(replPolicy))) {
+ LOG.info("Going to execute <" + s.replaceAll("\\?", "{}") + ">", quoteString(replPolicy));
+ pst.executeUpdate();
+ }
+ }
+
@Override
@RetrySemantics.Idempotent
public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
@@ -750,7 +777,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (rqst.isSetReplPolicy()) {
sourceTxnId = rqst.getTxnid();
List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
- Collections.singletonList(sourceTxnId), stmt);
+ Collections.singletonList(sourceTxnId), dbConn);
if (targetTxnIds.isEmpty()) {
// Idempotent case where txn was already closed or abort txn event received without
// corresponding open txn event.
@@ -768,10 +795,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (rqst.isSetReplPolicy()) {
// in case of replication, idempotent is taken care by getTargetTxnId
LOG.warn("Invalid state ABORTED for transactions started using replication replay task");
- String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
- " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
- LOG.info("Going to execute <" + s + ">");
- stmt.executeUpdate(s);
+ deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
}
LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) +
") requested by it is already " + TxnStatus.ABORTED);
@@ -781,10 +805,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
if (rqst.isSetReplPolicy()) {
- String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
- " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
- LOG.info("Going to execute <" + s + ">");
- stmt.executeUpdate(s);
+ deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
}
if (transactionalListeners != null) {
@@ -883,6 +904,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
Connection dbConn = null;
Statement stmt = null;
+ List<PreparedStatement> insertPreparedStmts = null;
ResultSet lockHandle = null;
ResultSet commitIdRs = null, rs;
try {
@@ -893,7 +915,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (rqst.isSetReplPolicy()) {
sourceTxnId = rqst.getTxnid();
List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
- Collections.singletonList(sourceTxnId), stmt);
+ Collections.singletonList(sourceTxnId), dbConn);
if (targetTxnIds.isEmpty()) {
// Idempotent case where txn was already closed or commit txn event received without
// corresponding open txn event.
@@ -1042,8 +1064,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
"ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid, tc_database, tc_table, " +
"tc_partition, tc_writeid, '" + isUpdateDelete + "' from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute insert <" + s + ">");
- int modCount = 0;
- if ((modCount = stmt.executeUpdate(s)) < 1) {
+
+ if ((stmt.executeUpdate(s)) < 1) {
//this can be reasonable for an empty txn START/COMMIT or read-only txn
//also an IUD with DP that didn't match any rows.
LOG.info("Expected to move at least one record from txn_components to " +
@@ -1052,27 +1074,29 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
} else {
if (rqst.isSetWriteEventInfos()) {
List<String> rows = new ArrayList<>();
+ List<List<String>> paramsList = new ArrayList<>();
for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
- rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," +
- quoteString(writeEventInfo.getTable()) + "," +
- quoteString(writeEventInfo.getPartition()) + "," +
+ rows.add(txnid + ", ?, ?, ?," +
writeEventInfo.getWriteId() + "," +
quoteChar(isUpdateDelete));
+ List<String> params = new ArrayList<>();
+ params.add(writeEventInfo.getDatabase());
+ params.add(writeEventInfo.getTable());
+ params.add(writeEventInfo.getPartition());
+ paramsList.add(params);
}
- List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
- "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)", rows);
- for (String q : queries) {
- LOG.debug("Going to execute insert <" + q + "> ");
- stmt.execute(q);
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+ "COMPLETED_TXN_COMPONENTS " +
+ "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)",
+ rows, paramsList);
+ for (PreparedStatement pst : insertPreparedStmts) {
+ pst.execute();
}
}
-
- s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
- " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
- LOG.info("Repl going to execute <" + s + ">");
- stmt.executeUpdate(s);
+ deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
}
+ // cleanup all txn related metadata
s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
@@ -1106,6 +1130,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
+ if (insertPreparedStmts != null) {
+ for (PreparedStatement pst : insertPreparedStmts) {
+ closeStmt(pst);
+ }
+ }
close(commitIdRs);
close(lockHandle, stmt, dbConn);
unlockInternal();
@@ -1133,8 +1162,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
Connection dbConn = null;
Statement stmt = null;
+ PreparedStatement pStmt = null;
+ List<PreparedStatement> insertPreparedStmts = null;
ResultSet rs = null;
TxnStore.MutexAPI.LockHandle handle = null;
+ List<String> params = Arrays.asList(dbName, tblName);
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -1142,11 +1174,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// Check if this txn state is already replicated for this given table. If yes, then it is
// idempotent case and just return.
- String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
- + " and nwi_table = " + quoteString(tblName);
- LOG.debug("Going to execute query <" + sql + ">");
-
- rs = stmt.executeQuery(sql);
+ String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?";
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
+ LOG.debug("Going to execute query <" + sql.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ rs = pStmt.executeQuery();
if (rs.next()) {
LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: "
+ dbName + "." + tblName);
@@ -1162,19 +1194,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// Map each aborted write id with each allocated txn.
List<String> rows = new ArrayList<>();
+ List<List<String>> paramsList = new ArrayList<>();
int i = 0;
for (long txn : txnIds) {
long writeId = abortedWriteIds.get(i++);
- rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+ rows.add(txn + ", ?, ?, " + writeId);
+ paramsList.add(params);
LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
}
// Insert entries to TXN_TO_WRITE_ID for aborted write ids
- List<String> inserts = sqlGenerator.createInsertValuesStmt(
- "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
- for (String insert : inserts) {
- LOG.debug("Going to execute insert <" + insert + ">");
- stmt.execute(insert);
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+ "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows,
+ paramsList);
+ for (PreparedStatement pst : insertPreparedStmts) {
+ pst.execute();
}
// Abort all the allocated txns so that the mapped write ids are referred as aborted ones.
@@ -1189,11 +1223,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
long nextWriteId = validWriteIdList.getHighWatermark() + 1;
// First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
- sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
- + quoteString(dbName) + "," + quoteString(tblName) + ","
+ sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, "
+ Long.toString(nextWriteId) + ")";
- LOG.debug("Going to execute insert <" + sql + ">");
- stmt.execute(sql);
+ closeStmt(pStmt);
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
+ LOG.debug("Going to execute insert <" + sql.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ pStmt.execute();
LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName);
LOG.debug("Going to commit");
@@ -1205,6 +1241,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
+ if (insertPreparedStmts != null) {
+ for (PreparedStatement pst : insertPreparedStmts) {
+ closeStmt(pst);
+ }
+ }
+ closeStmt(pStmt);
close(rs, stmt, dbConn);
if(handle != null) {
handle.releaseLocks();
@@ -1246,7 +1288,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throws NoSuchTxnException, MetaException {
try {
Connection dbConn = null;
- Statement stmt = null;
ValidTxnList validTxnList;
// We should prepare the valid write ids list based on validTxnList of current txn.
@@ -1263,12 +1304,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
*/
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
// Get the valid write id list for all the tables read by the current txn
List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>();
for (String fullTableName : rqst.getFullTableNames()) {
- tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList));
+ tblValidWriteIdsList.add(getValidWriteIdsForTable(dbConn, fullTableName, validTxnList));
}
LOG.debug("Going to rollback");
@@ -1282,7 +1322,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to select from transaction database, "
+ StringUtils.stringifyException(e));
} finally {
- close(null, stmt, dbConn);
+ closeDbConn(dbConn);
}
} catch (RetryException e) {
return getValidWriteIds(rqst);
@@ -1291,10 +1331,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// Method to get the Valid write ids list for the given table
// Input fullTableName is expected to be of format <db_name>.<table_name>
- private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName,
+ private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, String fullTableName,
ValidTxnList validTxnList) throws SQLException {
+ PreparedStatement pst = null;
ResultSet rs = null;
String[] names = TxnUtils.getDbTableName(fullTableName);
+ assert(names.length == 2);
+ List<String> params = Arrays.asList(names[0], names[1]);
try {
// Need to initialize to 0 to make sure if nobody modified this table, then current txn
// shouldn't read any data.
@@ -1309,11 +1352,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// Find the writeId high water mark based upon txnId high water mark. If found, then, need to
// traverse through all write Ids less than writeId HWM to make exceptions list.
// The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm))
- String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm
- + " and t2w_database = " + quoteString(names[0])
- + " and t2w_table = " + quoteString(names[1]);
- LOG.debug("Going to execute query<" + s + ">");
- rs = stmt.executeQuery(s);
+ String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + Long.toString(txnHwm)
+ + " and t2w_database = ? and t2w_table = ?";
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(names[0]), quoteString(names[1]));
+ rs = pst.executeQuery();
if (rs.next()) {
writeIdHwm = rs.getLong(1);
}
@@ -1322,10 +1366,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (writeIdHwm <= 0) {
// Need to subtract 1 as nwi_next would be the next write id to be allocated but we need highest
// allocated write id.
- s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + quoteString(names[0])
- + " and nwi_table = " + quoteString(names[1]);
- LOG.debug("Going to execute query<" + s + ">");
- rs = stmt.executeQuery(s);
+ s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?";
+ closeStmt(pst);
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(names[0]), quoteString(names[1]));
+ rs = pst.executeQuery();
if (rs.next()) {
long maxWriteId = rs.getLong(1);
if (maxWriteId > 0) {
@@ -1339,13 +1385,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// then will be added to invalid list. The results should be sorted in ascending order based
// on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up
// using binary search.
- s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm
- + " and t2w_database = " + quoteString(names[0])
- + " and t2w_table = " + quoteString(names[1])
- + " order by t2w_writeid asc";
-
- LOG.debug("Going to execute query<" + s + ">");
- rs = stmt.executeQuery(s);
+ s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + Long.toString(writeIdHwm)
+ + " and t2w_database = ? and t2w_table = ? order by t2w_writeid asc";
+ closeStmt(pst);
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(names[0]), quoteString(names[1]));
+ rs = pst.executeQuery();
while (rs.next()) {
long txnId = rs.getLong(1);
long writeId = rs.getLong(2);
@@ -1371,6 +1417,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
return owi;
} finally {
+ closeStmt(pst);
close(rs);
}
}
@@ -1385,6 +1432,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
Connection dbConn = null;
Statement stmt = null;
+ PreparedStatement pStmt = null;
+ List<PreparedStatement> insertPreparedStmts = null;
ResultSet rs = null;
TxnStore.MutexAPI.LockHandle handle = null;
List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
@@ -1404,7 +1453,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
for (TxnToWriteId txnToWriteId : srcTxnToWriteIds) {
srcTxnIds.add(txnToWriteId.getTxnId());
}
- txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
+ txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn);
if (srcTxnIds.size() != txnIds.size()) {
// Idempotent case where txn was already closed but gets allocate write id event.
// So, just ignore it and return empty list.
@@ -1435,8 +1484,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// The write id would have been already allocated in case of multi-statement txns where
// first write on a table will allocate write id and rest of the writes should re-use it.
prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where"
- + " t2w_database = " + quoteString(dbName)
- + " and t2w_table = " + quoteString(tblName) + " and ");
+ + " t2w_database = ? and t2w_table = ?" + " and ");
suffix.append("");
TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
txnIds, "t2w_txnid", false, false);
@@ -1444,9 +1492,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
long allocatedTxnsCount = 0;
long txnId;
long writeId = 0;
+ List<String> params = Arrays.asList(dbName, tblName);
for (String query : queries) {
- LOG.debug("Going to execute query <" + query + ">");
- rs = stmt.executeQuery(query);
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+ LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ rs = pStmt.executeQuery();
while (rs.next()) {
// If table write ID is already allocated for the given transaction, then just use it
txnId = rs.getLong(1);
@@ -1455,6 +1506,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
allocatedTxnsCount++;
LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
}
+ closeStmt(pStmt);
}
// Batch allocation should always happen atomically. Either write ids for all txns is allocated or none.
@@ -1479,58 +1531,69 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// Get the next write id for the given table and update it with new next write id.
// This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
String s = sqlGenerator.addForUpdateClause(
- "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
- + " and nwi_table = " + quoteString(tblName));
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
+ "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?");
+ closeStmt(pStmt);
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ rs = pStmt.executeQuery();
if (!rs.next()) {
// First allocation of write id should add the table to the next_write_id meta table
// The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
// For repl flow, we need to force set the incoming write id.
writeId = (srcWriteId > 0) ? srcWriteId : 1;
- s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
- + quoteString(dbName) + "," + quoteString(tblName) + "," + (writeId + numOfWriteIds) + ")";
- LOG.debug("Going to execute insert <" + s + ">");
- stmt.execute(s);
+ s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, "
+ + Long.toString(writeId + numOfWriteIds) + ")";
+ closeStmt(pStmt);
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ pStmt.execute();
} else {
long nextWriteId = rs.getLong(1);
writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId;
// Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
- s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
- + " where nwi_database = " + quoteString(dbName)
- + " and nwi_table = " + quoteString(tblName);
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
+ s = "update NEXT_WRITE_ID set nwi_next = " + Long.toString(writeId + numOfWriteIds)
+ + " where nwi_database = ? and nwi_table = ?";
+ closeStmt(pStmt);
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ pStmt.executeUpdate();
// For repl flow, if the source write id is mismatching with target next write id, then current
// metadata in TXN_TO_WRITE_ID is stale for this table and hence need to clean-up TXN_TO_WRITE_ID.
// This is possible in case of first incremental repl after bootstrap where concurrent write
// and drop table was performed at source during bootstrap dump.
if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) {
- s = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
- + " and t2w_table = " + quoteString(tblName);
- LOG.debug("Going to execute delete <" + s + ">");
- stmt.executeUpdate(s);
+ s = "delete from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ?";
+ closeStmt(pStmt);
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute delete <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ pStmt.executeUpdate();
}
}
// Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
// write ids
List<String> rows = new ArrayList<>();
+ List<List<String>> paramsList = new ArrayList<>();
for (long txn : txnIds) {
- rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+ rows.add(txn + ", ?, ?, " + writeId);
txnToWriteIds.add(new TxnToWriteId(txn, writeId));
+ paramsList.add(params);
LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
writeId++;
}
// Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
- List<String> inserts = sqlGenerator.createInsertValuesStmt(
- "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
- for (String insert : inserts) {
- LOG.debug("Going to execute insert <" + insert + ">");
- stmt.execute(insert);
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+ "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows,
+ paramsList);
+ for (PreparedStatement pst : insertPreparedStmts) {
+ pst.execute();
}
if (transactionalListeners != null) {
@@ -1550,6 +1613,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
+ if (insertPreparedStmts != null) {
+ for (PreparedStatement pst : insertPreparedStmts) {
+ closeStmt(pst);
+ }
+ }
+ closeStmt(pStmt);
close(rs, stmt, dbConn);
if(handle != null) {
handle.releaseLocks();
@@ -1565,12 +1634,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throws MetaException {
try {
Connection dbConn = null;
- Statement stmt = null;
+ PreparedStatement pst = null;
TxnStore.MutexAPI.LockHandle handle = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
//since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry
@@ -1579,11 +1647,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// First allocation of write id should add the table to the next_write_id meta table
// The initial value for write id should be 1 and hence we add 1 with number of write ids
// allocated here
- String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
- + quoteString(rqst.getDbName()) + "," + quoteString(rqst.getTblName()) + "," +
- Long.toString(rqst.getSeeWriteId() + 1) + ")";
- LOG.debug("Going to execute insert <" + s + ">");
- stmt.execute(s);
+ String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, "
+ + Long.toString(rqst.getSeeWriteId() + 1) + ")";
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(rqst.getDbName(), rqst.getTblName()));
+ LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(rqst.getDbName()), quoteString(rqst.getTblName()));
+ pst.execute();
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
@@ -1593,7 +1662,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database "
+ StringUtils.stringifyException(e));
} finally {
- close(null, stmt, dbConn);
+ close(null, pst, dbConn);
if(handle != null) {
handle.releaseLocks();
}
@@ -1677,7 +1746,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
commitHighWaterMark = lowestOpenTxnId;
}
int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
- LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+ LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt);
dbConn.commit();
} catch (SQLException ex) {
LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
@@ -1713,12 +1782,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// We are composing a query that returns a single row if an update happened after
// the materialization was created. Otherwise, query returns 0 rows.
Connection dbConn = null;
- Statement stmt = null;
+ PreparedStatement pst = null;
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- stmt.setMaxRows(1);
+ List<String> params = new ArrayList<>();
StringBuilder query = new StringBuilder();
// compose a query that select transactions containing an update...
query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND (");
@@ -1730,7 +1798,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
query.append("OR");
}
String[] names = TxnUtils.getDbTableName(fullyQualifiedName);
- query.append(" (ctc_database=" + quoteString(names[0]) + " AND ctc_table=" + quoteString(names[1]));
+ assert(names.length == 2);
+ query.append(" (ctc_database=? AND ctc_table=?");
+ params.add(names[0]);
+ params.add(names[1]);
ValidWriteIdList tblValidWriteIdList =
validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
if (tblValidWriteIdList == null) {
@@ -1756,7 +1827,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (LOG.isDebugEnabled()) {
LOG.debug("Going to execute query <" + s + ">");
}
- rs = stmt.executeQuery(s);
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ pst.setMaxRows(1);
+ rs = pst.executeQuery();
return new Materialization(rs.next());
} catch (SQLException ex) {
@@ -1764,7 +1837,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to retrieve materialization invalidation information due to " +
StringUtils.stringifyException(ex));
} finally {
- close(rs, stmt, dbConn);
+ close(rs, pst, dbConn);
}
}
@@ -1778,7 +1851,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
TxnStore.MutexAPI.LockHandle handle = null;
Connection dbConn = null;
- Statement stmt = null;
+ PreparedStatement pst = null;
ResultSet rs = null;
try {
lockInternal();
@@ -1789,13 +1862,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
*/
handle = getMutexAPI().acquireLock(MUTEX_KEY.MaterializationRebuild.name());
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
+ List<String> params = Arrays.asList(dbName, tableName);
String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS where" +
- " mrl_db_name =" + quoteString(dbName) +
- " AND mrl_tbl_name=" + quoteString(tableName);
- LOG.debug("Going to execute query <" + selectQ + ">");
- rs = stmt.executeQuery(selectQ);
+ " mrl_db_name = ? AND mrl_tbl_name = ?";
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, selectQ, params);
+ LOG.debug("Going to execute query <" + selectQ.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tableName));
+ rs = pst.executeQuery();
if(rs.next()) {
LOG.info("Ignoring request to rebuild " + dbName + "/" + tableName +
" since it is already being rebuilt");
@@ -1803,9 +1877,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " +
"(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values (" + txnId +
- ", '" + dbName + "', '" + tableName + "', " + Instant.now().toEpochMilli() + ")";
- LOG.debug("Going to execute update <" + insertQ + ">");
- stmt.executeUpdate(insertQ);
+ ", ?, ?, " + Instant.now().toEpochMilli() + ")";
+ closeStmt(pst);
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, insertQ, params);
+ LOG.debug("Going to execute update <" + insertQ.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tableName));
+ pst.executeUpdate();
LOG.debug("Going to commit");
dbConn.commit();
return new LockResponse(txnId, LockState.ACQUIRED);
@@ -1814,7 +1891,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to retrieve materialization invalidation information due to " +
StringUtils.stringifyException(ex));
} finally {
- close(rs, stmt, dbConn);
+ close(rs, pst, dbConn);
if(handle != null) {
handle.releaseLocks();
}
@@ -1827,19 +1904,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throws MetaException {
try {
Connection dbConn = null;
- Statement stmt = null;
- ResultSet rs = null;
+ PreparedStatement pst = null;
try {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
String s = "update MATERIALIZATION_REBUILD_LOCKS" +
" set mrl_last_heartbeat = " + Instant.now().toEpochMilli() +
" where mrl_txn_id = " + txnId +
- " AND mrl_db_name =" + quoteString(dbName) +
- " AND mrl_tbl_name=" + quoteString(tableName);
- LOG.debug("Going to execute update <" + s + ">");
- int rc = stmt.executeUpdate(s);
+ " AND mrl_db_name = ?" +
+ " AND mrl_tbl_name = ?";
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tableName));
+ LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tableName));
+ int rc = pst.executeUpdate();
if (rc < 1) {
LOG.debug("Going to rollback");
dbConn.rollback();
@@ -1860,7 +1937,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to heartbeat rebuild lock due to " +
StringUtils.stringifyException(e));
} finally {
- close(rs, stmt, dbConn);
+ close(null, pst, dbConn);
unlockInternal();
}
} catch (RetryException e) {
@@ -1995,6 +2072,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Connection dbConn = null;
try {
Statement stmt = null;
+ PreparedStatement pStmt = null;
+ List<PreparedStatement> insertPreparedStmts = null;
ResultSet rs = null;
ResultSet lockHandle = null;
try {
@@ -2032,6 +2111,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (txnid > 0) {
List<String> rows = new ArrayList<>();
+ List<List<String>> paramsList = new ArrayList<>();
// For each component in this lock request,
// add an entry to the txn_components table
for (LockComponent lc : rqst.getComponent()) {
@@ -2091,30 +2171,41 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
// may return empty result sets.
// Get the write id allocated by this txn for the given table writes
s = "select t2w_writeid from TXN_TO_WRITE_ID where"
- + " t2w_database = " + quoteString(dbName)
- + " and t2w_table = " + quoteString(tblName)
- + " and t2w_txnid = " + txnid;
- LOG.debug("Going to execute query <" + s + ">");
- rs = stmt.executeQuery(s);
+ + " t2w_database = ? and t2w_table = ? and t2w_txnid = " + txnid;
+ pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName));
+ LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+ quoteString(dbName), quoteString(tblName));
+ rs = pStmt.executeQuery();
if (rs.next()) {
writeId = rs.getLong(1);
}
}
- rows.add(txnid + ", '" + dbName + "', " +
- (tblName == null ? "null" : "'" + tblName + "'") + ", " +
- (partName == null ? "null" : "'" + partName + "'")+ "," +
+ rows.add(txnid + ", ?, " +
+ (tblName == null ? "null" : "?") + ", " +
+ (partName == null ? "null" : "?")+ "," +
quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+ "," +
(writeId == null ? "null" : writeId));
+ List<String> params = new ArrayList<>();
+ params.add(dbName);
+ if (tblName != null) {
+ params.add(tblName);
+ }
+ if (partName != null) {
+ params.add(partName);
+ }
+ paramsList.add(params);
}
- List<String> queries = sqlGenerator.createInsertValuesStmt(
- "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
- for(String query : queries) {
- LOG.debug("Going to execute update <" + query + ">");
- int modCount = stmt.executeUpdate(query);
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+ "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)",
+ rows, paramsList);
+ for(PreparedStatement pst : insertPreparedStmts) {
+ int modCount = pst.executeUpdate();
+ closeStmt(pst);
}
+ insertPreparedStmts = null;
}
-
List<String> rows = new ArrayList<>();
+ List<List<String>> paramsList = new ArrayList<>();
long intLockId = 0;
for (LockComponent lc : rqst.getComponent()) {
if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
@@ -2146,24 +2237,40 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
break;
}
long now = getDbTime(dbConn);
- rows.add(extLockId + ", " + intLockId + "," + txnid + ", " +
- quoteString(dbName) + ", " +
- valueOrNullLiteral(tblName) + ", " +
- valueOrNullLiteral(partName) + ", " +
+ rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " +
+ ((tblName == null) ? "null" : "?") + ", " +
+ ((partName == null) ? "null" : "?") + ", " +
quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
//for locks associated with a txn, we always heartbeat txn and timeout based on that
(isValidTxn(txnid) ? 0 : now) + ", " +
- valueOrNullLiteral(rqst.getUser()) + ", " +
- valueOrNullLiteral(rqst.getHostname()) + ", " +
- valueOrNullLiteral(rqst.getAgentInfo()));// + ")";
+ ((rqst.getUser() == null) ? "null" : "?") + ", " +
+ ((rqst.getHostname() == null) ? "null" : "?") + ", " +
+ ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")";
+ List<String> params = new ArrayList<>();
+ params.add(dbName);
+ if (tblName != null) {
+ params.add(tblName);
+ }
+ if (partName != null) {
+ params.add(partName);
+ }
+ if (rqst.getUser() != null) {
+ params.add(rqst.getUser());
+ }
+ if (rqst.getHostname() != null) {
+ params.add(rqst.getHostname());
+ }
+ if (rqst.getAgentInfo() != null) {
+ params.add(rqst.getAgentInfo());
+ }
+ paramsList.add(params);
}
- List<String> queries = sqlGenerator.createInsertValuesStmt(
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
"HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, " +
"hl_table, hl_partition,hl_lock_state, hl_lock_type, " +
- "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows);
- for(String query : queries) {
- LOG.debug("Going to execute update <" + query + ">");
- int modCount = stmt.executeUpdate(query);
+ "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows, paramsList);
+ for(PreparedStatement pst : insertPreparedStmts) {
+ int modCount = pst.executeUpdate();
}
dbConn.commit();
success = true;
@@ -2175,7 +2282,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to update transaction database " +
StringUtils.stringifyException(e));
} finally {
+ if (insertPreparedStmts != null) {
+ for (PreparedStatement pst : insertPreparedStmts) {
+ closeStmt(pst);
+ }
+ }
close(lockHandle);
+ closeStmt(pStmt);
close(rs, stmt, null);
if (!success) {
/* This needs to return a "live" connection to be used by operation that follows it.
@@ -2375,10 +2488,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
ShowLocksResponse rsp = new ShowLocksResponse();
List<ShowLocksResponseElement> elems = new ArrayList<>();
List<LockInfoExt> sortedList = new ArrayList<>();
- Statement stmt = null;
+ PreparedStatement pst = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
"hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
@@ -2388,22 +2500,26 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
String dbName = rqst.getDbname();
String tableName = rqst.getTablename();
String partName = rqst.getPartname();
+ List<String> params = new ArrayList<>();
StringBuilder filter = new StringBuilder();
if (dbName != null && !dbName.isEmpty()) {
- filter.append("hl_db=").append(quoteString(dbName));
+ filter.append("hl_db=?");
+ params.add(dbName);
}
if (tableName != null && !tableName.isEmpty()) {
if (filter.length() > 0) {
filter.append(" and ");
}
- filter.append("hl_table=").append(quoteString(tableName));
+ filter.append("hl_table=?");
+ params.add(tableName);
}
if (partName != null && !partName.isEmpty()) {
if (filter.length() > 0) {
filter.append(" and ");
}
- filter.append("hl_partition=").append(quoteString(partName));
+ filter.append("hl_partition=?");
+ params.add(partName);
}
String whereClause = filter.toString();
@@ -2411,8 +2527,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
s = s + " where " + whereClause;
}
- LOG.debug("Doing to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+ LOG.debug("Going to execute query <" + s + ">");
+ ResultSet rs = pst.executeQuery();
while (rs.next()) {
ShowLocksResponseElement e = new ShowLocksResponseElement();
e.setLockid(rs.getLong(1));
@@ -2457,7 +2574,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to select from transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeStmt(stmt);
+ closeStmt(pst);
closeDbConn(dbConn);
}
//this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined
@@ -2584,6 +2701,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return id;
}
}
+
@Override
@RetrySemantics.Idempotent
public CompactionResponse compact(CompactionRequest rqst) throws MetaException {
@@ -2591,6 +2709,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
try {
Connection dbConn = null;
Statement stmt = null;
+ PreparedStatement pst = null;
TxnStore.MutexAPI.LockHandle handle = null;
try {
lockInternal();
@@ -2605,20 +2724,24 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
long id = generateCompactionQueueId(stmt);
+ List<String> params = new ArrayList<>();
StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where").
append(" cq_state IN(").append(quoteChar(INITIATED_STATE)).
append(",").append(quoteChar(WORKING_STATE)).
- append(") AND cq_database=").append(quoteString(rqst.getDbname())).
- append(" AND cq_table=").append(quoteString(rqst.getTablename())).append(" AND ");
+ append(") AND cq_database=?").
+ append(" AND cq_table=?").append(" AND ");
+ params.add(rqst.getDbname());
+ params.add(rqst.getTablename());
if(rqst.getPartitionname() == null) {
sb.append("cq_partition is null");
- }
- else {
- sb.append("cq_partition=").append(quoteString(rqst.getPartitionname()));
+ } else {
+ sb.append("cq_partition=?");
+ params.add(rqst.getPartitionname());
}
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params);
LOG.debug("Going to execute query <" + sb.toString() + ">");
- ResultSet rs = stmt.executeQuery(sb.toString());
+ ResultSet rs = pst.executeQuery();
if(rs.next()) {
long enqueuedId = rs.getLong(1);
String state = compactorStateToResponse(rs.getString(2).charAt(0));
@@ -2628,6 +2751,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
return new CompactionResponse(enqueuedId, state, false);
}
close(rs);
+ closeStmt(pst);
+ params.clear();
StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
"cq_table, ");
String partName = rqst.getPartitionname();
@@ -2639,14 +2764,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
if (rqst.getRunas() != null) buf.append(", cq_run_as");
buf.append(") values (");
buf.append(id);
- buf.append(", '");
- buf.append(rqst.getDbname());
- buf.append("', '");
- buf.append(rqst.getTablename());
- buf.append("', '");
+ buf.append(", ?");
+ buf.append(", ?");
+ buf.append(", ");
+ params.add(rqst.getDbname());
+ params.add(rqst.getTablename());
if (partName != null) {
- buf.append(partName);
- buf.append("', '");
+ buf.append("?, '");
+ params.add(partName);
+ } else {
+ buf.append("'");
}
buf.append(INITIATED_STATE);
buf.append("', '");
@@ -2664,18 +2791,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
dbConn.rollback();
throw new MetaException("Unexpected compaction type " + rqst.getType().toString());
}
+ buf.append("'");
if (rqst.getProperties() != null) {
- buf.append("', '");
- buf.append(new StringableMap(rqst.getProperties()).toString());
+ buf.append(", ?");
+ params.add(new StringableMap(rqst.getProperties()).toString());
}
if (rqst.getRunas() != null) {
- buf.append("', '");
- buf.append(rqst.getRunas());
+ buf.append(", ?");
+ params.add(rqst.getRunas());
}
- buf.append("')");
+ buf.append(")");
String s = buf.toString();
+ pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
+ pst.executeUpdate();
LOG.debug("Going to commit");
dbConn.commit();
return new CompactionResponse(id, INITIATED_RESPONSE, true);
@@ -2686,6 +2815,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to select from transaction database " +
StringUtils.stringifyException(e));
} finally {
+ closeStmt(pst);
closeStmt(stmt);
closeDbConn(dbConn);
if(handle != null) {
@@ -2794,6 +2924,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Connection dbConn = null;
Statement stmt = null;
ResultSet lockHandle = null;
+ List<PreparedStatement> insertPreparedStmts = null;
try {
try {
lockInternal();
@@ -2813,18 +2944,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
Long writeId = rqst.getWriteid();
List<String> rows = new ArrayList<>();
+ List<List<String>> paramsList = new ArrayList<>();
for (String partName : rqst.getPartitionnames()) {
- rows.add(rqst.getTxnid() + "," + quoteString(normalizeCase(rqst.getDbname()))
- + "," + quoteString(normalizeCase(rqst.getTablename())) +
- "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId);
+ rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," + writeId);
+ List<String> params = new ArrayList<>();
+ params.add(normalizeCase(rqst.getDbname()));
+ params.add(normalizeCase(rqst.getTablename()));
+ params.add(partName);
+ paramsList.add(params);
}
int modCount = 0;
//record partitions that were written to
- List<String> queries = sqlGenerator.createInsertValuesStmt(
- "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
- for(String query : queries) {
- LOG.debug("Going to execute update <" + query + ">");
- modCount = stmt.executeUpdate(query);
+ insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+ "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)",
+ rows, paramsList);
+ for(PreparedStatement pst : insertPreparedStmts) {
+ modCount = pst.executeUpdate();
}
LOG.debug("Going to commit");
dbConn.commit();
@@ -2835,6 +2970,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
throw new MetaException("Unable to insert into from transaction database " +
StringUtils.stringifyException(e));
} finally {
+ if (insertPreparedStmts != null) {
+ for(PreparedStatement pst : insertPreparedStmts) {
+ closeStmt(pst);
+ }
+ }
close(lockHandle, stmt, dbConn);
unlockInternal();
}