You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2017/11/01 01:58:37 UTC

hive git commit: HIVE-17635: Add unit tests to CompactionTxnHandler and use PreparedStatements for queries (Andrew Sherman, reviewed by Sahil Takiar)

Repository: hive
Updated Branches:
  refs/heads/master 5b8ffe2d9 -> 2a2f64270


HIVE-17635: Add unit tests to CompactionTxnHandler and use PreparedStatements for queries (Andrew Sherman, reviewed by Sahil Takiar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a2f6427
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a2f6427
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a2f6427

Branch: refs/heads/master
Commit: 2a2f6427014045b9119714d205d7b8face9f7d92
Parents: 5b8ffe2
Author: Andrew Sherman <as...@cloudera.com>
Authored: Tue Oct 31 18:57:52 2017 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Tue Oct 31 18:57:52 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hive/beeline/HiveSchemaTool.java |  34 ++--
 .../metastore/txn/TestCompactionTxnHandler.java |  63 +++++++
 .../metastore/txn/CompactionTxnHandler.java     | 168 +++++++++++++------
 .../hadoop/hive/metastore/txn/TxnUtils.java     |  52 +++++-
 .../hadoop/hive/metastore/txn/TestTxnUtils.java |  38 ++++-
 5 files changed, 280 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
index 5350311..04576ae 100644
--- a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
+++ b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
@@ -667,27 +667,31 @@ public class HiveSchemaTool {
       for (String seqName : seqNameToTable.keySet()) {
         String tableName = seqNameToTable.get(seqName).getLeft();
         String tableKey = seqNameToTable.get(seqName).getRight();
+        String fullSequenceName = "org.apache.hadoop.hive.metastore.model." + seqName;
         String seqQuery = needsQuotedIdentifier ?
-            ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.\"SEQUENCE_NAME\" ")
-            : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.SEQUENCE_NAME ");
+            ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"=? order by t.\"SEQUENCE_NAME\" ")
+            : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME=? order by t.SEQUENCE_NAME ");
         String maxIdQuery = needsQuotedIdentifier ?
             ("select max(\"" + tableKey + "\") from \"" + tableName + "\"")
             : ("select max(" + tableKey + ") from " + tableName);
 
-          ResultSet res = stmt.executeQuery(maxIdQuery);
-          if (res.next()) {
-             long maxId = res.getLong(1);
-             if (maxId > 0) {
-               ResultSet resSeq = stmt.executeQuery(seqQuery);
-               if (!resSeq.next()) {
-                 isValid = false;
-                 System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE");
-               } else if (resSeq.getLong(1) < maxId) {
-                 isValid = false;
-                 System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max("+ tableKey + ") in " + tableName);
-               }
-             }
+        ResultSet res = stmt.executeQuery(maxIdQuery);
+        if (res.next()) {
+          long maxId = res.getLong(1);
+          if (maxId > 0) {
+            PreparedStatement pStmt = conn.prepareStatement(seqQuery);
+            pStmt.setString(1, fullSequenceName);
+            ResultSet resSeq = pStmt.executeQuery();
+            if (!resSeq.next()) {
+              isValid = false;
+              System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE");
+            } else if (resSeq.getLong(1) < maxId) {
+              isValid = false;
+              System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max(" +
+                  tableKey + ") in " + tableName);
+            }
           }
+        }
       }
 
       System.out.println((isValid ? "Succeeded" :"Failed") + " in sequence number validation for SEQUENCE_TABLE.");

http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 96005b4..34a1600 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -49,6 +49,7 @@ import java.util.SortedSet;
 import java.util.TreeSet;
 
 import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
 import static junit.framework.Assert.assertNotNull;
 import static junit.framework.Assert.assertNull;
 import static junit.framework.Assert.assertTrue;
@@ -64,6 +65,10 @@ public class TestCompactionTxnHandler {
 
   public TestCompactionTxnHandler() throws Exception {
     TxnDbUtil.setConfValues(conf);
+    // Set config so that TxnUtils.buildQueryWithINClauseStrings() will
+    // produce multiple queries
+    conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1);
+    conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10);
     tearDown();
   }
 
@@ -224,6 +229,64 @@ public class TestCompactionTxnHandler {
   }
 
   @Test
+  public void testMarkFailed() throws Exception {
+    CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+    rqst.setPartitionname("ds=today");
+    txnHandler.compact(rqst);
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+
+    assertEquals(0, txnHandler.findReadyToClean().size());
+    txnHandler.markFailed(ci);
+    assertNull(txnHandler.findNextToCompact("fred"));
+    boolean failedCheck = txnHandler.checkFailedCompactions(ci);
+    assertFalse(failedCheck);
+    try {
+      // The first call to markFailed() should have removed the record from
+      // COMPACTION_QUEUE, so a repeated call should fail
+      txnHandler.markFailed(ci);
+      fail("The first call to markFailed() must have failed as this call did "
+          + "not throw the expected exception");
+    } catch (IllegalStateException e) {
+      // This is expected
+      assertTrue(e.getMessage().contains("No record with CQ_ID="));
+    }
+
+    // There are not enough failed compactions yet so checkFailedCompactions() should return false.
+    // But note that any sql error will also result in a return of false.
+    assertFalse(txnHandler.checkFailedCompactions(ci));
+
+    // Add more failed compactions so that the total is exactly COMPACTOR_INITIATOR_FAILED_THRESHOLD
+    for (int i = 1 ; i <  conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); i++) {
+      addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today");
+    }
+    // Now checkFailedCompactions() will return true
+    assertTrue(txnHandler.checkFailedCompactions(ci));
+
+    // Now add enough failed compactions to ensure purgeCompactionHistory() will attempt delete;
+    // HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED is enough for this.
+    // But we also want enough to tickle the code in TxnUtils.buildQueryWithINClauseStrings()
+    // so that it produces multiple queries. For that we need at least 290.
+    for (int i = 0 ; i < 300; i++) {
+      addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today");
+    }
+    txnHandler.purgeCompactionHistory();
+  }
+
+  private void addFailedCompaction(String dbName, String tableName, CompactionType type,
+      String partitionName) throws MetaException {
+    CompactionRequest rqst;
+    CompactionInfo ci;
+    rqst = new CompactionRequest(dbName, tableName, type);
+    rqst.setPartitionname(partitionName);
+    txnHandler.compact(rqst);
+    ci = txnHandler.findNextToCompact("fred");
+    assertNotNull(ci);
+    txnHandler.markFailed(ci);
+  }
+
+  @Test
   public void testRevokeFromLocalWorkers() throws Exception {
     CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
     txnHandler.compact(rqst);

http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 7f1b331..a90b7d4 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -334,13 +334,13 @@ class CompactionTxnHandler extends TxnHandler {
   public void markCleaned(CompactionInfo info) throws MetaException {
     try {
       Connection dbConn = null;
-      Statement stmt = null;
       PreparedStatement pStmt = null;
       ResultSet rs = null;
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
+        pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+        pStmt.setLong(1, info.id);
+        rs = pStmt.executeQuery();
         if(rs.next()) {
           info = CompactionInfo.loadFullFromCompactionQueue(rs);
         }
@@ -348,9 +348,11 @@ class CompactionTxnHandler extends TxnHandler {
           throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
         }
         close(rs);
-        String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
+        String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+        pStmt = dbConn.prepareStatement(s);
+        pStmt.setLong(1, info.id);
         LOG.debug("Going to execute update <" + s + ">");
-        int updCount = stmt.executeUpdate(s);
+        int updCount = pStmt.executeUpdate();
         if (updCount != 1) {
           LOG.error("Unable to delete compaction record: " + info +  ".  Update count=" + updCount);
           LOG.debug("Going to rollback");
@@ -364,28 +366,55 @@ class CompactionTxnHandler extends TxnHandler {
         // Remove entries from completed_txn_components as well, so we don't start looking there
         // again but only up to the highest txn ID include in this compaction job.
         //highestTxnId will be NULL in upgrade scenarios
-        s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " +
-          "ctc_table = '" + info.tableName + "'";
+        s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " +
+            "ctc_table = ?";
+        if (info.partName != null) {
+          s += " and ctc_partition = ?";
+        }
+        if(info.highestTxnId != 0) {
+          s += " and ctc_txnid <= ?";
+        }
+        pStmt = dbConn.prepareStatement(s);
+        int paramCount = 1;
+        pStmt.setString(paramCount++, info.dbname);
+        pStmt.setString(paramCount++, info.tableName);
         if (info.partName != null) {
-          s += " and ctc_partition = '" + info.partName + "'";
+          pStmt.setString(paramCount++, info.partName);
         }
         if(info.highestTxnId != 0) {
-          s += " and ctc_txnid <= " + info.highestTxnId;
+          pStmt.setLong(paramCount++, info.highestTxnId);
         }
         LOG.debug("Going to execute update <" + s + ">");
-        if (stmt.executeUpdate(s) < 1) {
+        if (pStmt.executeUpdate() < 1) {
           LOG.error("Expected to remove at least one row from completed_txn_components when " +
             "marking compaction entry as clean!");
         }
 
         s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
-          TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
-          info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId);
-        if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
+          TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
+        if (info.highestTxnId != 0) s += " and txn_id <= ?";
+        if (info.partName != null) s += " and tc_partition = ?";
+
+        pStmt = dbConn.prepareStatement(s);
+        paramCount = 1;
+        pStmt.setString(paramCount++, info.dbname);
+        pStmt.setString(paramCount++, info.tableName);
+        if(info.highestTxnId != 0) {
+          pStmt.setLong(paramCount++, info.highestTxnId);
+        }
+        if (info.partName != null) {
+          pStmt.setString(paramCount++, info.partName);
+        }
+
         LOG.debug("Going to execute update <" + s + ">");
-        rs = stmt.executeQuery(s);
+        rs = pStmt.executeQuery();
         List<Long> txnids = new ArrayList<>();
-        while (rs.next()) txnids.add(rs.getLong(1));
+        List<String> questions = new ArrayList<>();
+        while (rs.next()) {
+          long id = rs.getLong(1);
+          txnids.add(id);
+          questions.add("?");
+        }
         // Remove entries from txn_components, as there may be aborted txn components
         if (txnids.size() > 0) {
           List<String> queries = new ArrayList<>();
@@ -397,21 +426,34 @@ class CompactionTxnHandler extends TxnHandler {
           prefix.append("delete from TXN_COMPONENTS where ");
 
           //because 1 txn may include different partitions/tables even in auto commit mode
-          suffix.append(" and tc_database = ");
-          suffix.append(quoteString(info.dbname));
-          suffix.append(" and tc_table = ");
-          suffix.append(quoteString(info.tableName));
+          suffix.append(" and tc_database = ?");
+          suffix.append(" and tc_table = ?");
           if (info.partName != null) {
-            suffix.append(" and tc_partition = ");
-            suffix.append(quoteString(info.partName));
+            suffix.append(" and tc_partition = ?");
           }
 
           // Populate the complete query with provided prefix and suffix
-          TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
+          List<Integer> counts = TxnUtils
+              .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid",
+                  true, false);
+          int totalCount = 0;
+          for (int i = 0; i < queries.size(); i++) {
+            String query = queries.get(i);
+            int insertCount = counts.get(i);
 
-          for (String query : queries) {
             LOG.debug("Going to execute update <" + query + ">");
-            int rc = stmt.executeUpdate(query);
+            pStmt = dbConn.prepareStatement(query);
+            for (int j = 0; j < insertCount; j++) {
+              pStmt.setLong(j + 1, txnids.get(totalCount + j));
+            }
+            totalCount += insertCount;
+            paramCount = insertCount + 1;
+            pStmt.setString(paramCount++, info.dbname);
+            pStmt.setString(paramCount++, info.tableName);
+            if (info.partName != null) {
+              pStmt.setString(paramCount++, info.partName);
+            }
+            int rc = pStmt.executeUpdate();
             LOG.debug("Removed " + rc + " records from txn_components");
 
             // Don't bother cleaning from the txns table.  A separate call will do that.  We don't
@@ -430,8 +472,7 @@ class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        closeStmt(pStmt);
-        close(rs, stmt, dbConn);
+        close(rs, pStmt, dbConn);
       }
     } catch (RetryException e) {
       markCleaned(info);
@@ -599,34 +640,38 @@ class CompactionTxnHandler extends TxnHandler {
   @RetrySemantics.ReadOnly
   public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
     Connection dbConn = null;
-    Statement stmt = null;
+    PreparedStatement pStmt = null;
     ResultSet rs = null;
     try {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         String quote = getIdentifierQuoteString(dbConn);
-        stmt = dbConn.createStatement();
         StringBuilder bldr = new StringBuilder();
         bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
           .append(" FROM ")
           .append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS"))
           .append(quote)
           .append(" WHERE ")
-          .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname)
-          .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
-          .append(" = '").append(ci.tableName).append("'");
+          .append(quote).append("DB_NAME").append(quote).append(" = ?")
+          .append(" AND ").append(quote).append("TABLE_NAME").append(quote)
+          .append(" = ?");
         if (ci.partName != null) {
-          bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
-            .append(ci.partName).append("'");
+          bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?");
         }
         String s = bldr.toString();
+        pStmt = dbConn.prepareStatement(s);
+        pStmt.setString(1, ci.dbname);
+        pStmt.setString(2, ci.tableName);
+        if (ci.partName != null) {
+          pStmt.setString(3, ci.partName);
+        }
 
       /*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" :
           "PART_COL_STATS")
          + " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
         + (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/
         LOG.debug("Going to execute <" + s + ">");
-        rs = stmt.executeQuery(s);
+        rs = pStmt.executeQuery();
         List<String> columns = new ArrayList<>();
         while (rs.next()) {
           columns.add(rs.getString(1));
@@ -642,7 +687,7 @@ class CompactionTxnHandler extends TxnHandler {
         throw new MetaException("Unable to connect to transaction database " +
           StringUtils.stringifyException(e));
       } finally {
-        close(rs, stmt, dbConn);
+        close(rs, pStmt, dbConn);
       }
     } catch (RetryException ex) {
       return findColumnsWithStats(ci);
@@ -725,6 +770,7 @@ class CompactionTxnHandler extends TxnHandler {
   public void purgeCompactionHistory() throws MetaException {
     Connection dbConn = null;
     Statement stmt = null;
+    PreparedStatement pStmt = null;
     ResultSet rs = null;
     List<Long> deleteSet = new ArrayList<>();
     RetentionCounters rc = null;
@@ -764,11 +810,22 @@ class CompactionTxnHandler extends TxnHandler {
         prefix.append("delete from COMPLETED_COMPACTIONS where ");
         suffix.append("");
 
-        TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
-
-        for (String query : queries) {
+        List<String> questions = new ArrayList<>(deleteSet.size());
+        for (int  i = 0; i < deleteSet.size(); i++) {
+          questions.add("?");
+        }
+        List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false);
+        int totalCount = 0;
+        for (int i = 0; i < queries.size(); i++) {
+          String query = queries.get(i);
+          long insertCount = counts.get(i);
           LOG.debug("Going to execute update <" + query + ">");
-          int count = stmt.executeUpdate(query);
+          pStmt = dbConn.prepareStatement(query);
+          for (int j = 0; j < insertCount; j++) {
+            pStmt.setLong(j + 1, deleteSet.get(totalCount + j));
+          }
+          totalCount += insertCount;
+          int count = pStmt.executeUpdate();
           LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS");
         }
         dbConn.commit();
@@ -779,6 +836,7 @@ class CompactionTxnHandler extends TxnHandler {
           StringUtils.stringifyException(e));
       } finally {
         close(rs, stmt, dbConn);
+        closeStmt(pStmt);
       }
     } catch (RetryException ex) {
       purgeCompactionHistory();
@@ -813,17 +871,22 @@ class CompactionTxnHandler extends TxnHandler {
   @RetrySemantics.ReadOnly
   public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
     Connection dbConn = null;
-    Statement stmt = null;
+    PreparedStatement pStmt = null;
     ResultSet rs = null;
     try {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
-        stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " +
-          "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
-          "CC_TABLE = " + quoteString(ci.tableName) +
-          (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
+        pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " +
+          "CC_DATABASE = ? and " +
+          "CC_TABLE = ? " +
+          (ci.partName != null ? "and CC_PARTITION = ?" : "") +
           " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
+        pStmt.setString(1, ci.dbname);
+        pStmt.setString(2, ci.tableName);
+        if (ci.partName != null) {
+          pStmt.setString(3, ci.partName);
+        }
+        rs = pStmt.executeQuery();
         int numFailed = 0;
         int numTotal = 0;
         int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
@@ -838,14 +901,14 @@ class CompactionTxnHandler extends TxnHandler {
         return numFailed == failedThreshold;
       }
       catch (SQLException e) {
-        LOG.error("Unable to delete from compaction queue " + e.getMessage());
+        LOG.error("Unable to check for failed compactions " + e.getMessage());
         LOG.debug("Going to rollback");
         rollbackDBConn(dbConn);
         checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
         LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
         return false;//weren't able to check
       } finally {
-        close(rs, stmt, dbConn);
+        close(rs, pStmt, dbConn);
       }
     } catch (RetryException e) {
       return checkFailedCompactions(ci);
@@ -869,12 +932,16 @@ class CompactionTxnHandler extends TxnHandler {
       try {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-        rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
+        pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+        pStmt.setLong(1, ci.id);
+        rs = pStmt.executeQuery();
         if(rs.next()) {
           ci = CompactionInfo.loadFullFromCompactionQueue(rs);
-          String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
+          String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+          pStmt = dbConn.prepareStatement(s);
+          pStmt.setLong(1, ci.id);
           LOG.debug("Going to execute update <" + s + ">");
-          int updCnt = stmt.executeUpdate(s);
+          int updCnt = pStmt.executeUpdate();
         }
         else {
           if(ci.id > 0) {
@@ -897,6 +964,7 @@ class CompactionTxnHandler extends TxnHandler {
           ci.state = FAILED_STATE;
         }
         close(rs, stmt, null);
+        closeStmt(pStmt);
 
         pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
         CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));

http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 2f01233..afb4f6b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
@@ -172,8 +173,9 @@ public class TxnUtils {
    * @param addParens IN:  add a pair of parenthesis outside the IN lists
    *                       e.g. "(id in (1,2,3) OR id in (4,5,6))"
    * @param notIn     IN:  is this for building a 'NOT IN' composite clause?
+   * @return          OUT: a list of the count of IN list values that are in each of the corresponding queries
    */
-  public static void buildQueryWithINClause(Configuration conf,
+  public static List<Integer> buildQueryWithINClause(Configuration conf,
                                             List<String> queries,
                                             StringBuilder prefix,
                                             StringBuilder suffix,
@@ -181,6 +183,47 @@ public class TxnUtils {
                                             String inColumn,
                                             boolean addParens,
                                             boolean notIn) {
+    List<String> inListStrings = new ArrayList<>(inList.size());
+    for (Long aLong : inList) {
+      inListStrings.add(aLong.toString());
+    }
+    return buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
+        inListStrings, inColumn, addParens, notIn);
+
+  }
+  /**
+   * Build a query (or queries if one query is too big but only for the case of 'IN'
+   * composite clause. For the case of 'NOT IN' clauses, multiple queries change
+   * the semantics of the intended query.
+   * E.g., Let's assume that input "inList" parameter has [5, 6] and that
+   * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
+   * Then having two delete statements changes the semantics of the inteneded SQL statement.
+   * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
+   * is not equal to 'delete from T where a not in (5, 6)'.)
+   * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
+   *
+   * Note that this method currently support only single column for
+   * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
+   * AND-based composite 'NOT IN' clause.
+   * For example, for 'IN' clause case, the method will build a query with OR.
+   * E.g., "id in (1,2,3) OR id in (4,5,6)".
+   * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
+   *
+   * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
+   * clauses in a query".
+   *
+   * @param queries   OUT: Array of query strings
+   * @param prefix    IN:  Part of the query that comes before IN list
+   * @param suffix    IN:  Part of the query that comes after IN list
+   * @param inList    IN:  the list with IN list values
+   * @param inColumn  IN:  single column name of IN list operator
+   * @param addParens IN:  add a pair of parenthesis outside the IN lists
+   *                       e.g. "(id in (1,2,3) OR id in (4,5,6))"
+   * @param notIn     IN:  is this for building a 'NOT IN' composite clause?
+   * @return          OUT: a list of the count of IN list values that are in each of the corresponding queries
+   */
+  public static List<Integer> buildQueryWithINClauseStrings(Configuration conf, List<String> queries, StringBuilder prefix,
+      StringBuilder suffix, List<String> inList, String inColumn, boolean addParens, boolean notIn) {
     // Get configuration parameters
     int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH);
     int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
@@ -203,6 +246,8 @@ public class TxnUtils {
     StringBuilder newInclausePrefix =
       new StringBuilder(notIn ? " and " + inColumn + " not in (":
 	                        " or " + inColumn + " in (");
+    List<Integer> ret = new ArrayList<>();
+    int currentCount = 0;
 
     // Loop over the given inList elements.
     while( cursor4InListArray < inListSize || !nextItemNeeded) {
@@ -257,9 +302,11 @@ public class TxnUtils {
 
         buf.append(suffix);
         queries.add(buf.toString());
+        ret.add(currentCount);
 
         // Prepare a new query string.
         buf.setLength(0);
+        currentCount = 0;
         cursor4queryOfInClauses = cursor4InClauseElements = 0;
         querySize = 0;
         newInclausePrefixJustAppended = false;
@@ -276,6 +323,7 @@ public class TxnUtils {
         cursor4InClauseElements = 0;
       } else {
         buf.append(nextValue.toString()).append(",");
+        currentCount++;
         nextItemNeeded = true;
         newInclausePrefixJustAppended = false;
         // increment cursor for elements per 'IN'/'NOT IN' clause.
@@ -293,6 +341,8 @@ public class TxnUtils {
     }
     buf.append(suffix);
     queries.add(buf.toString());
+    ret.add(currentCount);
+    return ret;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
index 7dd268f..0384f8b 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
@@ -45,6 +45,7 @@ public class TestTxnUtils {
   @Test
   public void testBuildQueryWithINClause() throws Exception {
     List<String> queries = new ArrayList<>();
+    List<Integer> ret;
 
     StringBuilder prefix = new StringBuilder();
     StringBuilder suffix = new StringBuilder();
@@ -61,16 +62,21 @@ public class TestTxnUtils {
     for (long i = 1; i <= 189; i++) {
       inList.add(i);
     }
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(1, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
+    Assert.assertEquals(189L, ret.get(0).longValue());
     runAgainstDerby(queries);
 
     // Case 2 - Max in list members: 10; Max query string length: 1KB
     //          The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member
     queries.clear();
     inList.add((long)190);
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(2, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
+    Assert.assertEquals(189L, ret.get(0).longValue());
+    Assert.assertEquals(1L, ret.get(1).longValue());
     runAgainstDerby(queries);
 
     // Case 3.1 - Max in list members: 1000, Max query string length: 1KB, and exact 1000 members in a single IN clause
@@ -80,16 +86,19 @@ public class TestTxnUtils {
     for (long i = 191; i <= 1000; i++) {
       inList.add(i);
     }
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(5, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
+    Assert.assertEquals(267L, ret.get(0).longValue());
     runAgainstDerby(queries);
 
     // Case 3.2 - Max in list members: 1000, Max query string length: 10KB, and exact 1000 members in a single IN clause
     MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10);
     MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000);
     queries.clear();
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(1, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
     runAgainstDerby(queries);
 
     // Case 3.3 - Now with 2000 entries, try the above settings
@@ -98,19 +107,25 @@ public class TestTxnUtils {
     }
     MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 1);
     queries.clear();
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(10, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
+    Assert.assertEquals(267L, ret.get(0).longValue());
+    Assert.assertEquals(240L, ret.get(1).longValue());
     runAgainstDerby(queries);
     MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10);
     queries.clear();
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(1, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
+    Assert.assertEquals(2000L, ret.get(0).longValue());
     runAgainstDerby(queries);
 
     // Case 4 - NOT IN list
     queries.clear();
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
     Assert.assertEquals(1, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
     runAgainstDerby(queries);
 
     // Case 5 - Max in list members: 1000; Max query string length: 10KB
@@ -118,16 +133,21 @@ public class TestTxnUtils {
     for (long i = 2001; i <= 4321; i++) {
       inList.add(i);
     }
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
     Assert.assertEquals(3, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
     runAgainstDerby(queries);
 
     // Case 6 - No parenthesis
     queries.clear();
     suffix.setLength(0);
     suffix.append("");
-    TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
+    ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
     Assert.assertEquals(3, queries.size());
+    Assert.assertEquals(queries.size(), ret.size());
+    Assert.assertEquals(2255L, ret.get(0).longValue());
+    Assert.assertEquals(2033L, ret.get(1).longValue());
+    Assert.assertEquals(33L, ret.get(2).longValue());
     runAgainstDerby(queries);
   }