You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sa...@apache.org on 2018/10/25 04:57:18 UTC

hive git commit: HIVE-20607: TxnHandler should use PreparedStatement to execute direct SQL queries (Sankar Hariappan, reviewed by Daniel Dai)

Repository: hive
Updated Branches:
  refs/heads/branch-3 507a6f7a9 -> 09b92d3c8


HIVE-20607: TxnHandler should use PreparedStatement to execute direct SQL queries (Sankar Hariappan, reviewed by Daniel Dai)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/09b92d3c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/09b92d3c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/09b92d3c

Branch: refs/heads/branch-3
Commit: 09b92d3c864b00df99923f03a843a8179bd874a0
Parents: 507a6f7
Author: Sankar Hariappan <sa...@apache.org>
Authored: Thu Oct 25 10:26:52 2018 +0530
Committer: Sankar Hariappan <sa...@apache.org>
Committed: Thu Oct 25 10:26:52 2018 +0530

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  44 +-
 .../hive/ql/lockmgr/TestDbTxnManager.java       |   4 +-
 .../hive/metastore/tools/SQLGenerator.java      | 110 +++-
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 556 ++++++++++++-------
 4 files changed, 484 insertions(+), 230 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 909ed56..e5da5d6 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -23,7 +23,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -747,11 +747,14 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
 
       String s = sqlGenerator.addForUpdateClause("select \"WNL_FILES\", \"WNL_ID\" from" +
                       " \"TXN_WRITE_NOTIFICATION_LOG\" " +
-                      "where \"WNL_DATABASE\" = " + quoteString(dbName) +
-                      "and \"WNL_TABLE\" = " + quoteString(tblName) +  " and \"WNL_PARTITION\" = " +
-                      quoteString(partition) + " and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId()));
-      LOG.debug("Going to execute query <" + s + ">");
-      rs = stmt.executeQuery(s);
+                      "where \"WNL_DATABASE\" = ? " +
+                      "and \"WNL_TABLE\" = ? " +  " and \"WNL_PARTITION\" = ? " +
+                      "and \"WNL_TXNID\" = " + Long.toString(acidWriteEvent.getTxnId()));
+      List<String> params = Arrays.asList(dbName, tblName, partition);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(dbName), quoteString(tblName), quoteString(partition));
+      rs = pst.executeQuery();
       if (!rs.next()) {
         // if rs is empty then no lock is taken and thus it can not cause deadlock.
         long nextNLId = getNextNLId(stmt, sqlGenerator,
@@ -760,6 +763,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
                 "(\"WNL_ID\", \"WNL_TXNID\", \"WNL_WRITEID\", \"WNL_DATABASE\", \"WNL_TABLE\", " +
                 "\"WNL_PARTITION\", \"WNL_TABLE_OBJ\", \"WNL_PARTITION_OBJ\", " +
                 "\"WNL_FILES\", \"WNL_EVENT_TIME\") VALUES (?,?,?,?,?,?,?,?,?,?)";
+        closeStmt(pst);
         int currentTime = now();
         pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
         pst.setLong(1, nextNLId);
@@ -792,6 +796,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
                 " \"WNL_FILES\" = ? ," +
                 " \"WNL_EVENT_TIME\" = ?" +
                 " where \"WNL_ID\" = ?";
+        closeStmt(pst);
         pst = dbConn.prepareStatement(sqlGenerator.addEscapeCharacters(s));
         pst.setString(1, tableObj);
         pst.setString(2, partitionObj);
@@ -825,6 +830,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       return;
     }
     Statement stmt = null;
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       stmt = dbConn.createStatement();
@@ -851,21 +857,20 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       long nextNLId = getNextNLId(stmt, sqlGenerator,
               "org.apache.hadoop.hive.metastore.model.MNotificationLog");
 
-      List<String> insert = new ArrayList<>();
+      String insertVal = "(" + nextNLId + "," + nextEventId + "," + now() + ", ?, ?," +
+              quoteString(" ") + ",?, ?)";
 
-      insert.add(0, nextNLId + "," + nextEventId + "," + now() + "," +
-              quoteString(event.getEventType()) + "," + quoteString(event.getDbName()) + "," +
-              quoteString(" ") + "," + quoteString(event.getMessage()) + "," +
-              quoteString(event.getMessageFormat()));
+      s = "insert into \"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " +
+              " \"EVENT_TYPE\", \"DB_NAME\", " +
+              " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\") VALUES " + insertVal;
+      List<String> params = Arrays.asList(
+              event.getEventType(), event.getDbName(), event.getMessage(), event.getMessageFormat());
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
 
-      List<String> sql = sqlGenerator.createInsertValuesStmt(
-              "\"NOTIFICATION_LOG\" (\"NL_ID\", \"EVENT_ID\", \"EVENT_TIME\", " +
-                      " \"EVENT_TYPE\", \"DB_NAME\"," +
-                      " \"TBL_NAME\", \"MESSAGE\", \"MESSAGE_FORMAT\")", insert);
-      for (String q : sql) {
-        LOG.info("Going to execute insert <" + q + ">");
-        stmt.execute(q);
-      }
+      LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(event.getEventType()), quoteString(event.getDbName()),
+              quoteString(event.getMessage()), quoteString(event.getMessageFormat()));
+      pst.execute();
 
       // Set the DB_NOTIFICATION_EVENT_ID for future reference by other listeners.
       if (event.isSetEventId()) {
@@ -878,6 +883,7 @@ public class DbNotificationListener extends TransactionalMetaStoreEventListener
       throw e;
     } finally {
       closeStmt(stmt);
+      closeStmt(pst);
       close(rs);
     }
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
index 2576ba2..cc86afe 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
@@ -98,8 +98,8 @@ public class TestDbTxnManager {
   public void testSingleReadPartition() throws Exception {
     addPartitionInput(newTable(true));
     QueryPlan qp = new MockQueryPlan(this, HiveOperation.QUERY);
-    txnMgr.openTxn(ctx, null);
-    txnMgr.acquireLocks(qp, ctx, null);
+    txnMgr.openTxn(ctx, "fred");
+    txnMgr.acquireLocks(qp, ctx, "fred");
     List<HiveLock> locks = ctx.getHiveLocks();
     Assert.assertEquals(1, locks.size());
     Assert.assertEquals(1,

http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
index d0ac7db..49b737e 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/tools/SQLGenerator.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -47,18 +50,80 @@ public final class SQLGenerator {
   }
 
   /**
-   * Genereates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+   *
+   * @param tblColumns   e.g. "T(a,b,c)"
+   * @param rows         e.g. list of Strings like 3,4,'d'
+   * @param paramsList   List of parameters which in turn is list of Strings to be set in PreparedStatement object
+   * @return List PreparedStatement objects for fully formed INSERT INTO ... statements
+   */
+  public List<PreparedStatement> createInsertValuesPreparedStmt(Connection dbConn,
+                                                                String tblColumns, List<String> rows,
+                                                                List<List<String>> paramsList)
+          throws SQLException {
+    if (rows == null || rows.size() == 0) {
+      return Collections.emptyList();
+    }
+    assert((paramsList == null) || (rows.size() == paramsList.size()));
+
+    List<Integer> rowsCountInStmts = new ArrayList<>();
+    List<String> insertStmts = createInsertValuesStmt(tblColumns, rows, rowsCountInStmts);
+    assert(insertStmts.size() == rowsCountInStmts.size());
+
+    List<PreparedStatement> preparedStmts = new ArrayList<>();
+    int paramsListFromIdx = 0;
+    try {
+      for (int stmtIdx = 0; stmtIdx < insertStmts.size(); stmtIdx++) {
+        String sql = insertStmts.get(stmtIdx);
+        PreparedStatement pStmt = prepareStmtWithParameters(dbConn, sql, null);
+        if (paramsList != null) {
+          int paramIdx = 1;
+          int paramsListToIdx = paramsListFromIdx + rowsCountInStmts.get(stmtIdx);
+          for (int paramsListIdx = paramsListFromIdx; paramsListIdx < paramsListToIdx; paramsListIdx++) {
+            List<String> params = paramsList.get(paramsListIdx);
+            for (int i = 0; i < params.size(); i++, paramIdx++) {
+              pStmt.setString(paramIdx, params.get(i));
+            }
+          }
+          paramsListFromIdx = paramsListToIdx;
+        }
+        preparedStmts.add(pStmt);
+      }
+    } catch (SQLException e) {
+      for (PreparedStatement pst : preparedStmts) {
+        pst.close();
+      }
+      throw e;
+    }
+    return preparedStmts;
+  }
+
+  /**
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
    *
    * @param tblColumns e.g. "T(a,b,c)"
    * @param rows       e.g. list of Strings like 3,4,'d'
    * @return fully formed INSERT INTO ... statements
    */
   public List<String> createInsertValuesStmt(String tblColumns, List<String> rows) {
+    return createInsertValuesStmt(tblColumns, rows, null);
+  }
+
+  /**
+   * Generates "Insert into T(a,b,c) values(1,2,'f'),(3,4,'c')" for appropriate DB
+   *
+   * @param tblColumns e.g. "T(a,b,c)"
+   * @param rows       e.g. list of Strings like 3,4,'d'
+   * @param rowsCountInStmts Output the number of rows in each insert statement returned.
+   * @return fully formed INSERT INTO ... statements
+   */
+  private List<String> createInsertValuesStmt(String tblColumns, List<String> rows, List<Integer> rowsCountInStmts) {
     if (rows == null || rows.size() == 0) {
       return Collections.emptyList();
     }
     List<String> insertStmts = new ArrayList<>();
     StringBuilder sb = new StringBuilder();
+    int numRowsInCurrentStmt = 0;
     switch (dbProduct) {
     case ORACLE:
       if (rows.size() > 1) {
@@ -69,15 +134,23 @@ public final class SQLGenerator {
             if (numRows > 0) {
               sb.append(" select * from dual");
               insertStmts.add(sb.toString());
+              if (rowsCountInStmts != null) {
+                rowsCountInStmts.add(numRowsInCurrentStmt);
+              }
+              numRowsInCurrentStmt = 0;
             }
             sb.setLength(0);
             sb.append("insert all ");
           }
           sb.append("into ").append(tblColumns).append(" values(").append(rows.get(numRows))
               .append(") ");
+          numRowsInCurrentStmt++;
         }
         sb.append("select * from dual");
         insertStmts.add(sb.toString());
+        if (rowsCountInStmts != null) {
+          rowsCountInStmts.add(numRowsInCurrentStmt);
+        }
         return insertStmts;
       }
       //fall through
@@ -89,13 +162,21 @@ public final class SQLGenerator {
         if (numRows % MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_VALUES_CLAUSE) == 0) {
           if (numRows > 0) {
             insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma
+            if (rowsCountInStmts != null) {
+              rowsCountInStmts.add(numRowsInCurrentStmt);
+            }
+            numRowsInCurrentStmt = 0;
           }
           sb.setLength(0);
           sb.append("insert into ").append(tblColumns).append(" values");
         }
         sb.append('(').append(rows.get(numRows)).append("),");
+        numRowsInCurrentStmt++;
       }
       insertStmts.add(sb.substring(0, sb.length() - 1));//exclude trailing comma
+      if (rowsCountInStmts != null) {
+        rowsCountInStmts.add(numRowsInCurrentStmt);
+      }
       return insertStmts;
     default:
       String msg = "Unrecognized database product name <" + dbProduct + ">";
@@ -171,6 +252,33 @@ public final class SQLGenerator {
     }
   }
 
+  /**
+   * Make PreparedStatement object with list of String type parameters to be set.
+   * It is assumed the input sql string have the number of "?" equal to number of parameters
+   * passed as input.
+   * @param dbConn - Connection object
+   * @param sql - SQL statement with "?" for input parameters.
+   * @param parameters - List of String type parameters to be set in PreparedStatement object
+   * @return PreparedStatement type object
+   * @throws SQLException
+   */
+  public PreparedStatement prepareStmtWithParameters(Connection dbConn, String sql, List<String> parameters)
+          throws SQLException {
+    PreparedStatement pst = dbConn.prepareStatement(addEscapeCharacters(sql));
+    if ((parameters == null) || parameters.isEmpty()) {
+      return pst;
+    }
+    try {
+      for (int i = 1; i <= parameters.size(); i++) {
+        pst.setString(i, parameters.get(i - 1));
+      }
+    } catch (SQLException e) {
+      pst.close();
+      throw e;
+    }
+    return pst;
+  }
+
   public DatabaseProduct getDbProduct() {
     return dbProduct;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/09b92d3c/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 4df75fb..b969efb 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -21,6 +21,7 @@ import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.sql.Connection;
 import java.sql.Driver;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLFeatureNotSupportedException;
@@ -574,10 +575,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           throws SQLException, MetaException {
     int numTxns = rqst.getNum_txns();
     ResultSet rs = null;
+    List<PreparedStatement> insertPreparedStmts = null;
     TxnType txnType = TxnType.DEFAULT;
     try {
       if (rqst.isSetReplPolicy()) {
-        List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt);
+        List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), dbConn);
 
         if (!targetTxnIdList.isEmpty()) {
           if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) {
@@ -607,16 +609,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       List<Long> txnIds = new ArrayList<>(numTxns);
 
       List<String> rows = new ArrayList<>();
+      List<String> params = new ArrayList<>();
+      params.add(rqst.getUser());
+      params.add(rqst.getHostname());
+      List<List<String>> paramsList = new ArrayList<>(numTxns);
       for (long i = first; i < first + numTxns; i++) {
         txnIds.add(i);
-        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ","
-                + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname()) + "," + txnType.getValue());
+        rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + ",?,?," + txnType.getValue());
+        paramsList.add(params);
       }
-      List<String> queries = sqlGenerator.createInsertValuesStmt(
-            "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)", rows);
-      for (String q : queries) {
-        LOG.debug("Going to execute update <" + q + ">");
-        stmt.execute(q);
+      insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+            "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host, txn_type)",
+              rows, paramsList);
+      for (PreparedStatement pst : insertPreparedStmts) {
+        pst.execute();
       }
 
       // Need to register minimum open txnid for current transactions into MIN_HISTORY table.
@@ -648,18 +654,23 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
       if (rqst.isSetReplPolicy()) {
         List<String> rowsRepl = new ArrayList<>();
-
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.close();
+        }
+        insertPreparedStmts.clear();
+        params.clear();
+        paramsList.clear();
+        params.add(rqst.getReplPolicy());
         for (int i = 0; i < numTxns; i++) {
-          rowsRepl.add(
-                  quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+          rowsRepl.add( "?," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i));
+          paramsList.add(params);
         }
 
-        List<String> queriesRepl = sqlGenerator.createInsertValuesStmt(
-                "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl);
-
-        for (String query : queriesRepl) {
-          LOG.info("Going to execute insert <" + query + ">");
-          stmt.execute(query);
+        insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl,
+                paramsList);
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.execute();
         }
       }
 
@@ -669,12 +680,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       }
       return txnIds;
     } finally {
+      if (insertPreparedStmts != null) {
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.close();
+        }
+      }
       close(rs);
     }
   }
 
-  private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Statement stmt)
+  private List<Long> getTargetTxnIdList(String replPolicy, List<Long> sourceTxnIdList, Connection dbConn)
           throws SQLException {
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       List<String> inQueries = new ArrayList<>();
@@ -682,15 +699,18 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       StringBuilder suffix = new StringBuilder();
       List<Long> targetTxnIdList = new ArrayList<>();
       prefix.append("select RTM_TARGET_TXN_ID from REPL_TXN_MAP where ");
-      suffix.append(" and RTM_REPL_POLICY = " + quoteString(replPolicy));
+      suffix.append(" and RTM_REPL_POLICY = ?");
       TxnUtils.buildQueryWithINClause(conf, inQueries, prefix, suffix, sourceTxnIdList,
               "RTM_SRC_TXN_ID", false, false);
+      List<String> params = Arrays.asList(replPolicy);
       for (String query : inQueries) {
-        LOG.debug("Going to execute select <" + query + ">");
-        rs = stmt.executeQuery(query);
+        LOG.debug("Going to execute select <" + query.replaceAll("\\?", "{}") + ">", quoteString(replPolicy));
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+        rs = pst.executeQuery();
         while (rs.next()) {
           targetTxnIdList.add(rs.getLong(1));
         }
+        closeStmt(pst);
       }
       LOG.debug("targetTxnid for srcTxnId " + sourceTxnIdList.toString() + " is " + targetTxnIdList.toString());
       return targetTxnIdList;
@@ -698,6 +718,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       LOG.warn("failed to get target txn ids " + e.getMessage());
       throw e;
     } finally {
+      closeStmt(pst);
       close(rs);
     }
   }
@@ -707,12 +728,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
   public long getTargetTxnId(String replPolicy, long sourceTxnId) throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), stmt);
+        List<Long> targetTxnIds = getTargetTxnIdList(replPolicy, Collections.singletonList(sourceTxnId), dbConn);
         if (targetTxnIds.isEmpty()) {
           LOG.info("Txn {} not present for repl policy {}", sourceTxnId, replPolicy);
           return -1;
@@ -726,7 +745,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to get target transaction id "
                 + StringUtils.stringifyException(e));
       } finally {
-        close(null, stmt, dbConn);
+        closeDbConn(dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -734,6 +753,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  private void deleteReplTxnMapEntry(Connection dbConn, long sourceTxnId, String replPolicy) throws SQLException {
+    String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId + " and RTM_REPL_POLICY = ?";
+    try (PreparedStatement pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(replPolicy))) {
+      LOG.info("Going to execute  <" + s.replaceAll("\\?", "{}") + ">", quoteString(replPolicy));
+      pst.executeUpdate();
+    }
+  }
+
   @Override
   @RetrySemantics.Idempotent
   public void abortTxn(AbortTxnRequest rqst) throws NoSuchTxnException, MetaException, TxnAbortedException {
@@ -750,7 +777,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (rqst.isSetReplPolicy()) {
           sourceTxnId = rqst.getTxnid();
           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
-                  Collections.singletonList(sourceTxnId), stmt);
+                  Collections.singletonList(sourceTxnId), dbConn);
           if (targetTxnIds.isEmpty()) {
             // Idempotent case where txn was already closed or abort txn event received without
             // corresponding open txn event.
@@ -768,10 +795,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             if (rqst.isSetReplPolicy()) {
               // in case of replication, idempotent is taken care by getTargetTxnId
               LOG.warn("Invalid state ABORTED for transactions started using replication replay task");
-              String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-                      " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-              LOG.info("Going to execute  <" + s + ">");
-              stmt.executeUpdate(s);
+              deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
             }
             LOG.info("abortTxn(" + JavaUtils.txnIdToString(txnid) +
               ") requested by it is already " + TxnStatus.ABORTED);
@@ -781,10 +805,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         }
 
         if (rqst.isSetReplPolicy()) {
-          String s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-              " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-          LOG.info("Going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
+          deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
 
         if (transactionalListeners != null) {
@@ -883,6 +904,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet lockHandle = null;
       ResultSet commitIdRs = null, rs;
       try {
@@ -893,7 +915,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (rqst.isSetReplPolicy()) {
           sourceTxnId = rqst.getTxnid();
           List<Long> targetTxnIds = getTargetTxnIdList(rqst.getReplPolicy(),
-                  Collections.singletonList(sourceTxnId), stmt);
+                  Collections.singletonList(sourceTxnId), dbConn);
           if (targetTxnIds.isEmpty()) {
             // Idempotent case where txn was already closed or commit txn event received without
             // corresponding open txn event.
@@ -1042,8 +1064,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
                   "ctc_table, ctc_partition, ctc_writeid, ctc_update_delete) select tc_txnid, tc_database, tc_table, " +
                   "tc_partition, tc_writeid, '" + isUpdateDelete + "' from TXN_COMPONENTS where tc_txnid = " + txnid;
           LOG.debug("Going to execute insert <" + s + ">");
-          int modCount = 0;
-          if ((modCount = stmt.executeUpdate(s)) < 1) {
+
+          if ((stmt.executeUpdate(s)) < 1) {
             //this can be reasonable for an empty txn START/COMMIT or read-only txn
             //also an IUD with DP that didn't match any rows.
             LOG.info("Expected to move at least one record from txn_components to " +
@@ -1052,27 +1074,29 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         } else {
           if (rqst.isSetWriteEventInfos()) {
             List<String> rows = new ArrayList<>();
+            List<List<String>> paramsList = new ArrayList<>();
             for (WriteEventInfo writeEventInfo : rqst.getWriteEventInfos()) {
-              rows.add(txnid + "," + quoteString(writeEventInfo.getDatabase()) + "," +
-                      quoteString(writeEventInfo.getTable()) + "," +
-                      quoteString(writeEventInfo.getPartition()) + "," +
+              rows.add(txnid + ", ?, ?, ?," +
                       writeEventInfo.getWriteId() + "," +
                       quoteChar(isUpdateDelete));
+              List<String> params = new ArrayList<>();
+              params.add(writeEventInfo.getDatabase());
+              params.add(writeEventInfo.getTable());
+              params.add(writeEventInfo.getPartition());
+              paramsList.add(params);
             }
-            List<String> queries = sqlGenerator.createInsertValuesStmt("COMPLETED_TXN_COMPONENTS " +
-                   "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)", rows);
-            for (String q : queries) {
-              LOG.debug("Going to execute insert  <" + q + "> ");
-              stmt.execute(q);
+            insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                    "COMPLETED_TXN_COMPONENTS " +
+                   "(ctc_txnid," + " ctc_database, ctc_table, ctc_partition, ctc_writeid, ctc_update_delete)",
+                    rows, paramsList);
+            for (PreparedStatement pst : insertPreparedStmts) {
+              pst.execute();
             }
           }
-
-          s = "delete from REPL_TXN_MAP where RTM_SRC_TXN_ID = " + sourceTxnId +
-                  " and RTM_REPL_POLICY = " + quoteString(rqst.getReplPolicy());
-          LOG.info("Repl going to execute  <" + s + ">");
-          stmt.executeUpdate(s);
+          deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy());
         }
 
+        // cleanup all txn related metadata
         s = "delete from TXN_COMPONENTS where tc_txnid = " + txnid;
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
@@ -1106,6 +1130,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
           + StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
         close(commitIdRs);
         close(lockHandle, stmt, dbConn);
         unlockInternal();
@@ -1133,8 +1162,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      PreparedStatement pStmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
       TxnStore.MutexAPI.LockHandle handle = null;
+      List<String> params = Arrays.asList(dbName, tblName);
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
@@ -1142,11 +1174,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         // Check if this txn state is already replicated for this given table. If yes, then it is
         // idempotent case and just return.
-        String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
-                        + " and nwi_table = " + quoteString(tblName);
-        LOG.debug("Going to execute query <" + sql + ">");
-
-        rs = stmt.executeQuery(sql);
+        String sql = "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?";
+        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
+        LOG.debug("Going to execute query <" + sql.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tblName));
+        rs = pStmt.executeQuery();
         if (rs.next()) {
           LOG.info("Idempotent flow: WriteId state <" + validWriteIdList + "> is already applied for the table: "
                   + dbName + "." + tblName);
@@ -1162,19 +1194,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
           // Map each aborted write id with each allocated txn.
           List<String> rows = new ArrayList<>();
+          List<List<String>> paramsList = new ArrayList<>();
           int i = 0;
           for (long txn : txnIds) {
             long writeId = abortedWriteIds.get(i++);
-            rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+            rows.add(txn + ", ?, ?, " + writeId);
+            paramsList.add(params);
             LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
           }
 
           // Insert entries to TXN_TO_WRITE_ID for aborted write ids
-          List<String> inserts = sqlGenerator.createInsertValuesStmt(
-                  "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
-          for (String insert : inserts) {
-            LOG.debug("Going to execute insert <" + insert + ">");
-            stmt.execute(insert);
+          insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                  "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows,
+                  paramsList);
+          for (PreparedStatement pst : insertPreparedStmts) {
+            pst.execute();
           }
 
           // Abort all the allocated txns so that the mapped write ids are referred as aborted ones.
@@ -1189,11 +1223,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         long nextWriteId = validWriteIdList.getHighWatermark() + 1;
 
         // First allocation of write id (hwm+1) should add the table to the next_write_id meta table.
-        sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-                + quoteString(dbName) + "," + quoteString(tblName) + ","
+        sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, "
                 + Long.toString(nextWriteId) + ")";
-        LOG.debug("Going to execute insert <" + sql + ">");
-        stmt.execute(sql);
+        closeStmt(pStmt);
+        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, sql, params);
+        LOG.debug("Going to execute insert <" + sql.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tblName));
+        pStmt.execute();
 
         LOG.info("WriteId state <" + validWriteIdList + "> is applied for the table: " + dbName + "." + tblName);
         LOG.debug("Going to commit");
@@ -1205,6 +1241,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
                 + StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
+        closeStmt(pStmt);
         close(rs, stmt, dbConn);
         if(handle != null) {
           handle.releaseLocks();
@@ -1246,7 +1288,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           throws NoSuchTxnException, MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
       ValidTxnList validTxnList;
 
       // We should prepare the valid write ids list based on validTxnList of current txn.
@@ -1263,12 +1304,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
          * This runs at READ_COMMITTED for exactly the same reason as {@link #getOpenTxnsInfo()}
          */
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         // Get the valid write id list for all the tables read by the current txn
         List<TableValidWriteIds> tblValidWriteIdsList = new ArrayList<>();
         for (String fullTableName : rqst.getFullTableNames()) {
-          tblValidWriteIdsList.add(getValidWriteIdsForTable(stmt, fullTableName, validTxnList));
+          tblValidWriteIdsList.add(getValidWriteIdsForTable(dbConn, fullTableName, validTxnList));
         }
 
         LOG.debug("Going to rollback");
@@ -1282,7 +1322,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to select from transaction database, "
                 + StringUtils.stringifyException(e));
       } finally {
-        close(null, stmt, dbConn);
+        closeDbConn(dbConn);
       }
     } catch (RetryException e) {
       return getValidWriteIds(rqst);
@@ -1291,10 +1331,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
   // Method to get the Valid write ids list for the given table
   // Input fullTableName is expected to be of format <db_name>.<table_name>
-  private TableValidWriteIds getValidWriteIdsForTable(Statement stmt, String fullTableName,
+  private TableValidWriteIds getValidWriteIdsForTable(Connection dbConn, String fullTableName,
                                                ValidTxnList validTxnList) throws SQLException {
+    PreparedStatement pst = null;
     ResultSet rs = null;
     String[] names = TxnUtils.getDbTableName(fullTableName);
+    assert(names.length == 2);
+    List<String> params = Arrays.asList(names[0], names[1]);
     try {
       // Need to initialize to 0 to make sure if nobody modified this table, then current txn
       // shouldn't read any data.
@@ -1309,11 +1352,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       // Find the writeId high water mark based upon txnId high water mark. If found, then, need to
       // traverse through all write Ids less than writeId HWM to make exceptions list.
       // The writeHWM = min(NEXT_WRITE_ID.nwi_next-1, max(TXN_TO_WRITE_ID.t2w_writeid under txnHwm))
-      String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + txnHwm
-              + " and t2w_database = " + quoteString(names[0])
-              + " and t2w_table = " + quoteString(names[1]);
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
+      String s = "select max(t2w_writeid) from TXN_TO_WRITE_ID where t2w_txnid <= " + Long.toString(txnHwm)
+              + " and t2w_database = ? and t2w_table = ?";
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(names[0]), quoteString(names[1]));
+      rs = pst.executeQuery();
       if (rs.next()) {
         writeIdHwm = rs.getLong(1);
       }
@@ -1322,10 +1366,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       if (writeIdHwm <= 0) {
         // Need to subtract 1 as nwi_next would be the next write id to be allocated but we need highest
         // allocated write id.
-        s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = " + quoteString(names[0])
-                + " and nwi_table = " + quoteString(names[1]);
-        LOG.debug("Going to execute query<" + s + ">");
-        rs = stmt.executeQuery(s);
+        s = "select nwi_next-1 from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?";
+        closeStmt(pst);
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+        LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+                quoteString(names[0]), quoteString(names[1]));
+        rs = pst.executeQuery();
         if (rs.next()) {
           long maxWriteId = rs.getLong(1);
           if (maxWriteId > 0) {
@@ -1339,13 +1385,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       // then will be added to invalid list. The results should be sorted in ascending order based
       // on write id. The sorting is needed as exceptions list in ValidWriteIdList would be looked-up
       // using binary search.
-      s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + writeIdHwm
-              + " and t2w_database = " + quoteString(names[0])
-              + " and t2w_table = " + quoteString(names[1])
-              + " order by t2w_writeid asc";
-
-      LOG.debug("Going to execute query<" + s + ">");
-      rs = stmt.executeQuery(s);
+      s = "select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where t2w_writeid <= " + Long.toString(writeIdHwm)
+              + " and t2w_database = ? and t2w_table = ? order by t2w_writeid asc";
+      closeStmt(pst);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      LOG.debug("Going to execute query<" + s.replaceAll("\\?", "{}") + ">",
+              quoteString(names[0]), quoteString(names[1]));
+      rs = pst.executeQuery();
       while (rs.next()) {
         long txnId = rs.getLong(1);
         long writeId = rs.getLong(2);
@@ -1371,6 +1417,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       }
       return owi;
     } finally {
+      closeStmt(pst);
       close(rs);
     }
   }
@@ -1385,6 +1432,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      PreparedStatement pStmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
       TxnStore.MutexAPI.LockHandle handle = null;
       List<TxnToWriteId> txnToWriteIds = new ArrayList<>();
@@ -1404,7 +1453,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           for (TxnToWriteId txnToWriteId :  srcTxnToWriteIds) {
             srcTxnIds.add(txnToWriteId.getTxnId());
           }
-          txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, stmt);
+          txnIds = getTargetTxnIdList(rqst.getReplPolicy(), srcTxnIds, dbConn);
           if (srcTxnIds.size() != txnIds.size()) {
             // Idempotent case where txn was already closed but gets allocate write id event.
             // So, just ignore it and return empty list.
@@ -1435,8 +1484,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // The write id would have been already allocated in case of multi-statement txns where
         // first write on a table will allocate write id and rest of the writes should re-use it.
         prefix.append("select t2w_txnid, t2w_writeid from TXN_TO_WRITE_ID where"
-                        + " t2w_database = " + quoteString(dbName)
-                        + " and t2w_table = " + quoteString(tblName) + " and ");
+                        + " t2w_database = ? and t2w_table = ?" + " and ");
         suffix.append("");
         TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix,
                 txnIds, "t2w_txnid", false, false);
@@ -1444,9 +1492,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         long allocatedTxnsCount = 0;
         long txnId;
         long writeId = 0;
+        List<String> params = Arrays.asList(dbName, tblName);
         for (String query : queries) {
-          LOG.debug("Going to execute query <" + query + ">");
-          rs = stmt.executeQuery(query);
+          pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, query, params);
+          LOG.debug("Going to execute query <" + query.replaceAll("\\?", "{}") + ">",
+                  quoteString(dbName), quoteString(tblName));
+          rs = pStmt.executeQuery();
           while (rs.next()) {
             // If table write ID is already allocated for the given transaction, then just use it
             txnId = rs.getLong(1);
@@ -1455,6 +1506,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             allocatedTxnsCount++;
             LOG.info("Reused already allocated writeID: " + writeId + " for txnId: " + txnId);
           }
+          closeStmt(pStmt);
         }
 
         // Batch allocation should always happen atomically. Either write ids for all txns is allocated or none.
@@ -1479,58 +1531,69 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // Get the next write id for the given table and update it with new next write id.
         // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID
         String s = sqlGenerator.addForUpdateClause(
-                "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName)
-                        + " and nwi_table = " + quoteString(tblName));
-        LOG.debug("Going to execute query <" + s + ">");
-        rs = stmt.executeQuery(s);
+                "select nwi_next from NEXT_WRITE_ID where nwi_database = ? and nwi_table = ?");
+        closeStmt(pStmt);
+        pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+        LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tblName));
+        rs = pStmt.executeQuery();
         if (!rs.next()) {
           // First allocation of write id should add the table to the next_write_id meta table
           // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here
           // For repl flow, we need to force set the incoming write id.
           writeId = (srcWriteId > 0) ? srcWriteId : 1;
-          s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-                  + quoteString(dbName) + "," + quoteString(tblName) + "," + (writeId + numOfWriteIds) + ")";
-          LOG.debug("Going to execute insert <" + s + ">");
-          stmt.execute(s);
+          s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, "
+                  + Long.toString(writeId + numOfWriteIds) + ")";
+          closeStmt(pStmt);
+          pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+          LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
+                  quoteString(dbName), quoteString(tblName));
+          pStmt.execute();
         } else {
           long nextWriteId = rs.getLong(1);
           writeId = (srcWriteId > 0) ? srcWriteId : nextWriteId;
 
           // Update the NEXT_WRITE_ID for the given table after incrementing by number of write ids allocated
-          s = "update NEXT_WRITE_ID set nwi_next = " + (writeId + numOfWriteIds)
-                  + " where nwi_database = " + quoteString(dbName)
-                  + " and nwi_table = " + quoteString(tblName);
-          LOG.debug("Going to execute update <" + s + ">");
-          stmt.executeUpdate(s);
+          s = "update NEXT_WRITE_ID set nwi_next = " + Long.toString(writeId + numOfWriteIds)
+                  + " where nwi_database = ? and nwi_table = ?";
+          closeStmt(pStmt);
+          pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+          LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">",
+                  quoteString(dbName), quoteString(tblName));
+          pStmt.executeUpdate();
 
           // For repl flow, if the source write id is mismatching with target next write id, then current
           // metadata in TXN_TO_WRITE_ID is stale for this table and hence need to clean-up TXN_TO_WRITE_ID.
           // This is possible in case of first incremental repl after bootstrap where concurrent write
           // and drop table was performed at source during bootstrap dump.
           if ((srcWriteId > 0) && (srcWriteId != nextWriteId)) {
-            s = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName)
-                    + " and t2w_table = " + quoteString(tblName);
-            LOG.debug("Going to execute delete <" + s + ">");
-            stmt.executeUpdate(s);
+            s = "delete from TXN_TO_WRITE_ID where t2w_database = ? and t2w_table = ?";
+            closeStmt(pStmt);
+            pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+            LOG.debug("Going to execute delete <" + s.replaceAll("\\?", "{}") + ">",
+                    quoteString(dbName), quoteString(tblName));
+            pStmt.executeUpdate();
           }
         }
 
         // Map the newly allocated write ids against the list of txns which doesn't have pre-allocated
         // write ids
         List<String> rows = new ArrayList<>();
+        List<List<String>> paramsList = new ArrayList<>();
         for (long txn : txnIds) {
-          rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId);
+          rows.add(txn + ", ?, ?, " + writeId);
           txnToWriteIds.add(new TxnToWriteId(txn, writeId));
+          paramsList.add(params);
           LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn);
           writeId++;
         }
 
         // Insert entries to TXN_TO_WRITE_ID for newly allocated write ids
-        List<String> inserts = sqlGenerator.createInsertValuesStmt(
-                "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows);
-        for (String insert : inserts) {
-          LOG.debug("Going to execute insert <" + insert + ">");
-          stmt.execute(insert);
+        insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+                "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows,
+                paramsList);
+        for (PreparedStatement pst : insertPreparedStmts) {
+          pst.execute();
         }
 
         if (transactionalListeners != null) {
@@ -1550,6 +1613,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
                 + StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
+        closeStmt(pStmt);
         close(rs, stmt, dbConn);
         if(handle != null) {
           handle.releaseLocks();
@@ -1565,12 +1634,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
+      PreparedStatement pst = null;
       TxnStore.MutexAPI.LockHandle handle = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name());
         //since this is on conversion from non-acid to acid, NEXT_WRITE_ID should not have an entry
@@ -1579,11 +1647,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         // First allocation of write id should add the table to the next_write_id meta table
         // The initial value for write id should be 1 and hence we add 1 with number of write ids
         // allocated here
-        String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values ("
-            + quoteString(rqst.getDbName()) + "," + quoteString(rqst.getTblName()) + "," +
-            Long.toString(rqst.getSeeWriteId() + 1) + ")";
-        LOG.debug("Going to execute insert <" + s + ">");
-        stmt.execute(s);
+        String s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (?, ?, "
+                + Long.toString(rqst.getSeeWriteId() + 1) + ")";
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(rqst.getDbName(), rqst.getTblName()));
+        LOG.debug("Going to execute insert <" + s.replaceAll("\\?", "{}") + ">",
+                quoteString(rqst.getDbName()), quoteString(rqst.getTblName()));
+        pst.execute();
         LOG.debug("Going to commit");
         dbConn.commit();
       } catch (SQLException e) {
@@ -1593,7 +1662,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database "
             + StringUtils.stringifyException(e));
       } finally {
-        close(null, stmt, dbConn);
+        close(null, pst, dbConn);
         if(handle != null) {
           handle.releaseLocks();
         }
@@ -1677,7 +1746,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         commitHighWaterMark = lowestOpenTxnId;
       }
       int delCnt = stmt.executeUpdate("delete from WRITE_SET where ws_commit_id < " + commitHighWaterMark);
-      LOG.info("Deleted " + delCnt + " obsolete rows from WRTIE_SET");
+      LOG.info("Deleted {} obsolete rows from WRITE_SET", delCnt);
       dbConn.commit();
     } catch (SQLException ex) {
       LOG.warn("WriteSet GC failed due to " + getMessage(ex), ex);
@@ -1713,12 +1782,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     // We are composing a query that returns a single row if an update happened after
     // the materialization was created. Otherwise, query returns 0 rows.
     Connection dbConn = null;
-    Statement stmt = null;
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
-      stmt.setMaxRows(1);
+      List<String> params = new ArrayList<>();
       StringBuilder query = new StringBuilder();
       // compose a query that select transactions containing an update...
       query.append("select ctc_update_delete from COMPLETED_TXN_COMPONENTS where ctc_update_delete='Y' AND (");
@@ -1730,7 +1798,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           query.append("OR");
         }
         String[] names = TxnUtils.getDbTableName(fullyQualifiedName);
-        query.append(" (ctc_database=" + quoteString(names[0]) + " AND ctc_table=" + quoteString(names[1]));
+        assert(names.length == 2);
+        query.append(" (ctc_database=? AND ctc_table=?");
+        params.add(names[0]);
+        params.add(names[1]);
         ValidWriteIdList tblValidWriteIdList =
             validReaderWriteIdList.getTableValidWriteIdList(fullyQualifiedName);
         if (tblValidWriteIdList == null) {
@@ -1756,7 +1827,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Going to execute query <" + s + ">");
       }
-      rs = stmt.executeQuery(s);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+      pst.setMaxRows(1);
+      rs = pst.executeQuery();
 
       return new Materialization(rs.next());
     } catch (SQLException ex) {
@@ -1764,7 +1837,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       throw new MetaException("Unable to retrieve materialization invalidation information due to " +
           StringUtils.stringifyException(ex));
     } finally {
-      close(rs, stmt, dbConn);
+      close(rs, pst, dbConn);
     }
   }
 
@@ -1778,7 +1851,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
     TxnStore.MutexAPI.LockHandle handle = null;
     Connection dbConn = null;
-    Statement stmt = null;
+    PreparedStatement pst = null;
     ResultSet rs = null;
     try {
       lockInternal();
@@ -1789,13 +1862,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
        */
       handle = getMutexAPI().acquireLock(MUTEX_KEY.MaterializationRebuild.name());
       dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-      stmt = dbConn.createStatement();
 
+      List<String> params = Arrays.asList(dbName, tableName);
       String selectQ = "select mrl_txn_id from MATERIALIZATION_REBUILD_LOCKS where" +
-          " mrl_db_name =" + quoteString(dbName) +
-          " AND mrl_tbl_name=" + quoteString(tableName);
-      LOG.debug("Going to execute query <" + selectQ + ">");
-      rs = stmt.executeQuery(selectQ);
+          " mrl_db_name = ? AND mrl_tbl_name = ?";
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, selectQ, params);
+      LOG.debug("Going to execute query <" + selectQ.replaceAll("\\?", "{}") + ">",
+              quoteString(dbName), quoteString(tableName));
+      rs = pst.executeQuery();
       if(rs.next()) {
         LOG.info("Ignoring request to rebuild " + dbName + "/" + tableName +
             " since it is already being rebuilt");
@@ -1803,9 +1877,12 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       }
       String insertQ = "insert into MATERIALIZATION_REBUILD_LOCKS " +
           "(mrl_txn_id, mrl_db_name, mrl_tbl_name, mrl_last_heartbeat) values (" + txnId +
-          ", '" + dbName + "', '" + tableName + "', " + Instant.now().toEpochMilli() + ")";
-      LOG.debug("Going to execute update <" + insertQ + ">");
-      stmt.executeUpdate(insertQ);
+          ", ?, ?, " + Instant.now().toEpochMilli() + ")";
+      closeStmt(pst);
+      pst = sqlGenerator.prepareStmtWithParameters(dbConn, insertQ, params);
+      LOG.debug("Going to execute update <" + insertQ.replaceAll("\\?", "{}") + ">",
+              quoteString(dbName), quoteString(tableName));
+      pst.executeUpdate();
       LOG.debug("Going to commit");
       dbConn.commit();
       return new LockResponse(txnId, LockState.ACQUIRED);
@@ -1814,7 +1891,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       throw new MetaException("Unable to retrieve materialization invalidation information due to " +
           StringUtils.stringifyException(ex));
     } finally {
-      close(rs, stmt, dbConn);
+      close(rs, pst, dbConn);
       if(handle != null) {
         handle.releaseLocks();
       }
@@ -1827,19 +1904,19 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
-      ResultSet rs = null;
+      PreparedStatement pst = null;
       try {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
         String s = "update MATERIALIZATION_REBUILD_LOCKS" +
             " set mrl_last_heartbeat = " + Instant.now().toEpochMilli() +
             " where mrl_txn_id = " + txnId +
-            " AND mrl_db_name =" + quoteString(dbName) +
-            " AND mrl_tbl_name=" + quoteString(tableName);
-        LOG.debug("Going to execute update <" + s + ">");
-        int rc = stmt.executeUpdate(s);
+            " AND mrl_db_name = ?" +
+            " AND mrl_tbl_name = ?";
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tableName));
+        LOG.debug("Going to execute update <" + s.replaceAll("\\?", "{}") + ">",
+                quoteString(dbName), quoteString(tableName));
+        int rc = pst.executeUpdate();
         if (rc < 1) {
           LOG.debug("Going to rollback");
           dbConn.rollback();
@@ -1860,7 +1937,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to heartbeat rebuild lock due to " +
             StringUtils.stringifyException(e));
       } finally {
-        close(rs, stmt, dbConn);
+        close(null, pst, dbConn);
         unlockInternal();
       }
     } catch (RetryException e) {
@@ -1995,6 +2072,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     Connection dbConn = null;
     try {
       Statement stmt = null;
+      PreparedStatement pStmt = null;
+      List<PreparedStatement> insertPreparedStmts = null;
       ResultSet rs = null;
       ResultSet lockHandle = null;
       try {
@@ -2032,6 +2111,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         if (txnid > 0) {
           List<String> rows = new ArrayList<>();
+          List<List<String>> paramsList = new ArrayList<>();
           // For each component in this lock request,
           // add an entry to the txn_components table
           for (LockComponent lc : rqst.getComponent()) {
@@ -2091,30 +2171,41 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               // may return empty result sets.
               // Get the write id allocated by this txn for the given table writes
               s = "select t2w_writeid from TXN_TO_WRITE_ID where"
-                      + " t2w_database = " + quoteString(dbName)
-                      + " and t2w_table = " + quoteString(tblName)
-                      + " and t2w_txnid = " + txnid;
-              LOG.debug("Going to execute query <" + s + ">");
-              rs = stmt.executeQuery(s);
+                      + " t2w_database = ? and t2w_table = ? and t2w_txnid = " + txnid;
+              pStmt = sqlGenerator.prepareStmtWithParameters(dbConn, s, Arrays.asList(dbName, tblName));
+              LOG.debug("Going to execute query <" + s.replaceAll("\\?", "{}") + ">",
+                      quoteString(dbName), quoteString(tblName));
+              rs = pStmt.executeQuery();
               if (rs.next()) {
                 writeId = rs.getLong(1);
               }
             }
-            rows.add(txnid + ", '" + dbName + "', " +
-                    (tblName == null ? "null" : "'" + tblName + "'") + ", " +
-                    (partName == null ? "null" : "'" + partName + "'")+ "," +
+            rows.add(txnid + ", ?, " +
+                    (tblName == null ? "null" : "?") + ", " +
+                    (partName == null ? "null" : "?")+ "," +
                     quoteString(OpertaionType.fromDataOperationType(lc.getOperationType()).toString())+ "," +
                     (writeId == null ? "null" : writeId));
+            List<String> params = new ArrayList<>();
+            params.add(dbName);
+            if (tblName != null) {
+              params.add(tblName);
+            }
+            if (partName != null) {
+              params.add(partName);
+            }
+            paramsList.add(params);
           }
-          List<String> queries = sqlGenerator.createInsertValuesStmt(
-              "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
-          for(String query : queries) {
-            LOG.debug("Going to execute update <" + query + ">");
-            int modCount = stmt.executeUpdate(query);
+          insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+              "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)",
+                  rows, paramsList);
+          for(PreparedStatement pst : insertPreparedStmts) {
+            int modCount = pst.executeUpdate();
+            closeStmt(pst);
           }
+          insertPreparedStmts = null;
         }
-
         List<String> rows = new ArrayList<>();
+        List<List<String>> paramsList = new ArrayList<>();
         long intLockId = 0;
         for (LockComponent lc : rqst.getComponent()) {
           if(lc.isSetOperationType() && lc.getOperationType() == DataOperationType.UNSET &&
@@ -2146,24 +2237,40 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
               break;
           }
           long now = getDbTime(dbConn);
-            rows.add(extLockId + ", " + intLockId + "," + txnid + ", " +
-            quoteString(dbName) + ", " +
-            valueOrNullLiteral(tblName) + ", " +
-            valueOrNullLiteral(partName) + ", " +
+          rows.add(extLockId + ", " + intLockId + "," + txnid + ", ?, " +
+                  ((tblName == null) ? "null" : "?") + ", " +
+                  ((partName == null) ? "null" : "?") + ", " +
             quoteChar(LOCK_WAITING) + ", " + quoteChar(lockChar) + ", " +
             //for locks associated with a txn, we always heartbeat txn and timeout based on that
             (isValidTxn(txnid) ? 0 : now) + ", " +
-            valueOrNullLiteral(rqst.getUser()) + ", " +
-            valueOrNullLiteral(rqst.getHostname()) + ", " +
-            valueOrNullLiteral(rqst.getAgentInfo()));// + ")";
+                  ((rqst.getUser() == null) ? "null" : "?")  + ", " +
+                  ((rqst.getHostname() == null) ? "null" : "?") + ", " +
+                  ((rqst.getAgentInfo() == null) ? "null" : "?"));// + ")";
+          List<String> params = new ArrayList<>();
+          params.add(dbName);
+          if (tblName != null) {
+            params.add(tblName);
+          }
+          if (partName != null) {
+            params.add(partName);
+          }
+          if (rqst.getUser() != null) {
+            params.add(rqst.getUser());
+          }
+          if (rqst.getHostname() != null) {
+            params.add(rqst.getHostname());
+          }
+          if (rqst.getAgentInfo() != null) {
+            params.add(rqst.getAgentInfo());
+          }
+          paramsList.add(params);
         }
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
+        insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
           "HIVE_LOCKS (hl_lock_ext_id, hl_lock_int_id, hl_txnid, hl_db, " +
             "hl_table, hl_partition,hl_lock_state, hl_lock_type, " +
-            "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows);
-        for(String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          int modCount = stmt.executeUpdate(query);
+            "hl_last_heartbeat, hl_user, hl_host, hl_agent_info)", rows, paramsList);
+        for(PreparedStatement pst : insertPreparedStmts) {
+          int modCount = pst.executeUpdate();
         }
         dbConn.commit();
         success = true;
@@ -2175,7 +2282,13 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to update transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for (PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
         close(lockHandle);
+        closeStmt(pStmt);
         close(rs, stmt, null);
         if (!success) {
           /* This needs to return a "live" connection to be used by operation that follows it.
@@ -2375,10 +2488,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       ShowLocksResponse rsp = new ShowLocksResponse();
       List<ShowLocksResponseElement> elems = new ArrayList<>();
       List<LockInfoExt> sortedList = new ArrayList<>();
-      Statement stmt = null;
+      PreparedStatement pst = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
 
         String s = "select hl_lock_ext_id, hl_txnid, hl_db, hl_table, hl_partition, hl_lock_state, " +
           "hl_lock_type, hl_last_heartbeat, hl_acquired_at, hl_user, hl_host, hl_lock_int_id," +
@@ -2388,22 +2500,26 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         String dbName = rqst.getDbname();
         String tableName = rqst.getTablename();
         String partName = rqst.getPartname();
+        List<String> params = new ArrayList<>();
 
         StringBuilder filter = new StringBuilder();
         if (dbName != null && !dbName.isEmpty()) {
-          filter.append("hl_db=").append(quoteString(dbName));
+          filter.append("hl_db=?");
+          params.add(dbName);
         }
         if (tableName != null && !tableName.isEmpty()) {
           if (filter.length() > 0) {
             filter.append(" and ");
           }
-          filter.append("hl_table=").append(quoteString(tableName));
+          filter.append("hl_table=?");
+          params.add(tableName);
         }
         if (partName != null && !partName.isEmpty()) {
           if (filter.length() > 0) {
             filter.append(" and ");
           }
-          filter.append("hl_partition=").append(quoteString(partName));
+          filter.append("hl_partition=?");
+          params.add(partName);
         }
         String whereClause = filter.toString();
 
@@ -2411,8 +2527,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           s = s + " where " + whereClause;
         }
 
-        LOG.debug("Doing to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
+        LOG.debug("Going to execute query <" + s + ">");
+        ResultSet rs = pst.executeQuery();
         while (rs.next()) {
           ShowLocksResponseElement e = new ShowLocksResponseElement();
           e.setLockid(rs.getLong(1));
@@ -2457,7 +2574,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeStmt(stmt);
+        closeStmt(pst);
         closeDbConn(dbConn);
       }
       //this ensures that "SHOW LOCKS" prints the locks in the same order as they are examined
@@ -2584,6 +2701,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
       return id;
     }
   }
+
   @Override
   @RetrySemantics.Idempotent
   public CompactionResponse compact(CompactionRequest rqst) throws MetaException {
@@ -2591,6 +2709,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     try {
       Connection dbConn = null;
       Statement stmt = null;
+      PreparedStatement pst = null;
       TxnStore.MutexAPI.LockHandle handle = null;
       try {
         lockInternal();
@@ -2605,20 +2724,24 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         long id = generateCompactionQueueId(stmt);
 
+        List<String> params = new ArrayList<>();
         StringBuilder sb = new StringBuilder("select cq_id, cq_state from COMPACTION_QUEUE where").
           append(" cq_state IN(").append(quoteChar(INITIATED_STATE)).
             append(",").append(quoteChar(WORKING_STATE)).
-          append(") AND cq_database=").append(quoteString(rqst.getDbname())).
-          append(" AND cq_table=").append(quoteString(rqst.getTablename())).append(" AND ");
+          append(") AND cq_database=?").
+          append(" AND cq_table=?").append(" AND ");
+        params.add(rqst.getDbname());
+        params.add(rqst.getTablename());
         if(rqst.getPartitionname() == null) {
           sb.append("cq_partition is null");
-        }
-        else {
-          sb.append("cq_partition=").append(quoteString(rqst.getPartitionname()));
+        } else {
+          sb.append("cq_partition=?");
+          params.add(rqst.getPartitionname());
         }
 
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, sb.toString(), params);
         LOG.debug("Going to execute query <" + sb.toString() + ">");
-        ResultSet rs = stmt.executeQuery(sb.toString());
+        ResultSet rs = pst.executeQuery();
         if(rs.next()) {
           long enqueuedId = rs.getLong(1);
           String state = compactorStateToResponse(rs.getString(2).charAt(0));
@@ -2628,6 +2751,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           return new CompactionResponse(enqueuedId, state, false);
         }
         close(rs);
+        closeStmt(pst);
+        params.clear();
         StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
           "cq_table, ");
         String partName = rqst.getPartitionname();
@@ -2639,14 +2764,16 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         if (rqst.getRunas() != null) buf.append(", cq_run_as");
         buf.append(") values (");
         buf.append(id);
-        buf.append(", '");
-        buf.append(rqst.getDbname());
-        buf.append("', '");
-        buf.append(rqst.getTablename());
-        buf.append("', '");
+        buf.append(", ?");
+        buf.append(", ?");
+        buf.append(", ");
+        params.add(rqst.getDbname());
+        params.add(rqst.getTablename());
         if (partName != null) {
-          buf.append(partName);
-          buf.append("', '");
+          buf.append("?, '");
+          params.add(partName);
+        } else {
+          buf.append("'");
         }
         buf.append(INITIATED_STATE);
         buf.append("', '");
@@ -2664,18 +2791,20 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             dbConn.rollback();
             throw new MetaException("Unexpected compaction type " + rqst.getType().toString());
         }
+        buf.append("'");
         if (rqst.getProperties() != null) {
-          buf.append("', '");
-          buf.append(new StringableMap(rqst.getProperties()).toString());
+          buf.append(", ?");
+          params.add(new StringableMap(rqst.getProperties()).toString());
         }
         if (rqst.getRunas() != null) {
-          buf.append("', '");
-          buf.append(rqst.getRunas());
+          buf.append(", ?");
+          params.add(rqst.getRunas());
         }
-        buf.append("')");
+        buf.append(")");
         String s = buf.toString();
+        pst = sqlGenerator.prepareStmtWithParameters(dbConn, s, params);
         LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
+        pst.executeUpdate();
         LOG.debug("Going to commit");
         dbConn.commit();
         return new CompactionResponse(id, INITIATED_RESPONSE, true);
@@ -2686,6 +2815,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to select from transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        closeStmt(pst);
         closeStmt(stmt);
         closeDbConn(dbConn);
         if(handle != null) {
@@ -2794,6 +2924,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     Connection dbConn = null;
     Statement stmt = null;
     ResultSet lockHandle = null;
+    List<PreparedStatement> insertPreparedStmts = null;
     try {
       try {
         lockInternal();
@@ -2813,18 +2944,22 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
 
         Long writeId = rqst.getWriteid();
         List<String> rows = new ArrayList<>();
+        List<List<String>> paramsList = new ArrayList<>();
         for (String partName : rqst.getPartitionnames()) {
-          rows.add(rqst.getTxnid() + "," + quoteString(normalizeCase(rqst.getDbname()))
-              + "," + quoteString(normalizeCase(rqst.getTablename())) +
-              "," + quoteString(partName) + "," + quoteChar(ot.sqlConst) + "," + writeId);
+          rows.add(rqst.getTxnid() + ",?,?,?," + quoteChar(ot.sqlConst) + "," + writeId);
+          List<String> params = new ArrayList<>();
+          params.add(normalizeCase(rqst.getDbname()));
+          params.add(normalizeCase(rqst.getTablename()));
+          params.add(partName);
+          paramsList.add(params);
         }
         int modCount = 0;
         //record partitions that were written to
-        List<String> queries = sqlGenerator.createInsertValuesStmt(
-            "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)", rows);
-        for(String query : queries) {
-          LOG.debug("Going to execute update <" + query + ">");
-          modCount = stmt.executeUpdate(query);
+        insertPreparedStmts = sqlGenerator.createInsertValuesPreparedStmt(dbConn,
+            "TXN_COMPONENTS (tc_txnid, tc_database, tc_table, tc_partition, tc_operation_type, tc_writeid)",
+                rows, paramsList);
+        for(PreparedStatement pst : insertPreparedStmts) {
+          modCount = pst.executeUpdate();
         }
         LOG.debug("Going to commit");
         dbConn.commit();
@@ -2835,6 +2970,11 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         throw new MetaException("Unable to insert into from transaction database " +
           StringUtils.stringifyException(e));
       } finally {
+        if (insertPreparedStmts != null) {
+          for(PreparedStatement pst : insertPreparedStmts) {
+            closeStmt(pst);
+          }
+        }
         close(lockHandle, stmt, dbConn);
         unlockInternal();
       }