You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2017/11/01 01:58:37 UTC
hive git commit: HIVE-17635: Add unit tests to CompactionTxnHandler
and use PreparedStatements for queries (Andrew Sherman,
reviewed by Sahil Takiar)
Repository: hive
Updated Branches:
refs/heads/master 5b8ffe2d9 -> 2a2f64270
HIVE-17635: Add unit tests to CompactionTxnHandler and use PreparedStatements for queries (Andrew Sherman, reviewed by Sahil Takiar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2a2f6427
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2a2f6427
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2a2f6427
Branch: refs/heads/master
Commit: 2a2f6427014045b9119714d205d7b8face9f7d92
Parents: 5b8ffe2
Author: Andrew Sherman <as...@cloudera.com>
Authored: Tue Oct 31 18:57:52 2017 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Tue Oct 31 18:57:52 2017 -0700
----------------------------------------------------------------------
.../org/apache/hive/beeline/HiveSchemaTool.java | 34 ++--
.../metastore/txn/TestCompactionTxnHandler.java | 63 +++++++
.../metastore/txn/CompactionTxnHandler.java | 168 +++++++++++++------
.../hadoop/hive/metastore/txn/TxnUtils.java | 52 +++++-
.../hadoop/hive/metastore/txn/TestTxnUtils.java | 38 ++++-
5 files changed, 280 insertions(+), 75 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
----------------------------------------------------------------------
diff --git a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
index 5350311..04576ae 100644
--- a/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
+++ b/beeline/src/java/org/apache/hive/beeline/HiveSchemaTool.java
@@ -667,27 +667,31 @@ public class HiveSchemaTool {
for (String seqName : seqNameToTable.keySet()) {
String tableName = seqNameToTable.get(seqName).getLeft();
String tableKey = seqNameToTable.get(seqName).getRight();
+ String fullSequenceName = "org.apache.hadoop.hive.metastore.model." + seqName;
String seqQuery = needsQuotedIdentifier ?
- ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.\"SEQUENCE_NAME\" ")
- : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME='org.apache.hadoop.hive.metastore.model." + seqName + "' order by t.SEQUENCE_NAME ");
+ ("select t.\"NEXT_VAL\" from \"SEQUENCE_TABLE\" t WHERE t.\"SEQUENCE_NAME\"=? order by t.\"SEQUENCE_NAME\" ")
+ : ("select t.NEXT_VAL from SEQUENCE_TABLE t WHERE t.SEQUENCE_NAME=? order by t.SEQUENCE_NAME ");
String maxIdQuery = needsQuotedIdentifier ?
("select max(\"" + tableKey + "\") from \"" + tableName + "\"")
: ("select max(" + tableKey + ") from " + tableName);
- ResultSet res = stmt.executeQuery(maxIdQuery);
- if (res.next()) {
- long maxId = res.getLong(1);
- if (maxId > 0) {
- ResultSet resSeq = stmt.executeQuery(seqQuery);
- if (!resSeq.next()) {
- isValid = false;
- System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE");
- } else if (resSeq.getLong(1) < maxId) {
- isValid = false;
- System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max("+ tableKey + ") in " + tableName);
- }
- }
+ ResultSet res = stmt.executeQuery(maxIdQuery);
+ if (res.next()) {
+ long maxId = res.getLong(1);
+ if (maxId > 0) {
+ PreparedStatement pStmt = conn.prepareStatement(seqQuery);
+ pStmt.setString(1, fullSequenceName);
+ ResultSet resSeq = pStmt.executeQuery();
+ if (!resSeq.next()) {
+ isValid = false;
+ System.err.println("Missing SEQUENCE_NAME " + seqName + " from SEQUENCE_TABLE");
+ } else if (resSeq.getLong(1) < maxId) {
+ isValid = false;
+ System.err.println("NEXT_VAL for " + seqName + " in SEQUENCE_TABLE < max(" +
+ tableKey + ") in " + tableName);
+ }
}
+ }
}
System.out.println((isValid ? "Succeeded" :"Failed") + " in sequence number validation for SEQUENCE_TABLE.");
http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 96005b4..34a1600 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -49,6 +49,7 @@ import java.util.SortedSet;
import java.util.TreeSet;
import static junit.framework.Assert.assertEquals;
+import static junit.framework.Assert.assertFalse;
import static junit.framework.Assert.assertNotNull;
import static junit.framework.Assert.assertNull;
import static junit.framework.Assert.assertTrue;
@@ -64,6 +65,10 @@ public class TestCompactionTxnHandler {
public TestCompactionTxnHandler() throws Exception {
TxnDbUtil.setConfValues(conf);
+ // Set config so that TxnUtils.buildQueryWithINClauseStrings() will
+ // produce multiple queries
+ conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_QUERY_LENGTH, 1);
+ conf.setIntVar(HiveConf.ConfVars.METASTORE_DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 10);
tearDown();
}
@@ -224,6 +229,64 @@ public class TestCompactionTxnHandler {
}
@Test
+ public void testMarkFailed() throws Exception {
+ CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
+ rqst.setPartitionname("ds=today");
+ txnHandler.compact(rqst);
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+
+ assertEquals(0, txnHandler.findReadyToClean().size());
+ txnHandler.markFailed(ci);
+ assertNull(txnHandler.findNextToCompact("fred"));
+ boolean failedCheck = txnHandler.checkFailedCompactions(ci);
+ assertFalse(failedCheck);
+ try {
+ // The first call to markFailed() should have removed the record from
+ // COMPACTION_QUEUE, so a repeated call should fail
+ txnHandler.markFailed(ci);
+ fail("The first call to markFailed() must have failed as this call did "
+ + "not throw the expected exception");
+ } catch (IllegalStateException e) {
+ // This is expected
+ assertTrue(e.getMessage().contains("No record with CQ_ID="));
+ }
+
+ // There are not enough failed compactions yet so checkFailedCompactions() should return false.
+ // But note that any sql error will also result in a return of false.
+ assertFalse(txnHandler.checkFailedCompactions(ci));
+
+ // Add more failed compactions so that the total is exactly COMPACTOR_INITIATOR_FAILED_THRESHOLD
+ for (int i = 1 ; i < conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD); i++) {
+ addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today");
+ }
+ // Now checkFailedCompactions() will return true
+ assertTrue(txnHandler.checkFailedCompactions(ci));
+
+ // Now add enough failed compactions to ensure purgeCompactionHistory() will attempt delete;
+ // HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED is enough for this.
+ // But we also want enough to tickle the code in TxnUtils.buildQueryWithINClauseStrings()
+ // so that it produces multiple queries. For that we need at least 290.
+ for (int i = 0 ; i < 300; i++) {
+ addFailedCompaction("foo", "bar", CompactionType.MINOR, "ds=today");
+ }
+ txnHandler.purgeCompactionHistory();
+ }
+
+ private void addFailedCompaction(String dbName, String tableName, CompactionType type,
+ String partitionName) throws MetaException {
+ CompactionRequest rqst;
+ CompactionInfo ci;
+ rqst = new CompactionRequest(dbName, tableName, type);
+ rqst.setPartitionname(partitionName);
+ txnHandler.compact(rqst);
+ ci = txnHandler.findNextToCompact("fred");
+ assertNotNull(ci);
+ txnHandler.markFailed(ci);
+ }
+
+ @Test
public void testRevokeFromLocalWorkers() throws Exception {
CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR);
txnHandler.compact(rqst);
http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 7f1b331..a90b7d4 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -334,13 +334,13 @@ class CompactionTxnHandler extends TxnHandler {
public void markCleaned(CompactionInfo info) throws MetaException {
try {
Connection dbConn = null;
- Statement stmt = null;
PreparedStatement pStmt = null;
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + info.id);
+ pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+ pStmt.setLong(1, info.id);
+ rs = pStmt.executeQuery();
if(rs.next()) {
info = CompactionInfo.loadFullFromCompactionQueue(rs);
}
@@ -348,9 +348,11 @@ class CompactionTxnHandler extends TxnHandler {
throw new IllegalStateException("No record with CQ_ID=" + info.id + " found in COMPACTION_QUEUE");
}
close(rs);
- String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
+ String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setLong(1, info.id);
LOG.debug("Going to execute update <" + s + ">");
- int updCount = stmt.executeUpdate(s);
+ int updCount = pStmt.executeUpdate();
if (updCount != 1) {
LOG.error("Unable to delete compaction record: " + info + ". Update count=" + updCount);
LOG.debug("Going to rollback");
@@ -364,28 +366,55 @@ class CompactionTxnHandler extends TxnHandler {
// Remove entries from completed_txn_components as well, so we don't start looking there
// again but only up to the highest txn ID include in this compaction job.
//highestTxnId will be NULL in upgrade scenarios
- s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = '" + info.dbname + "' and " +
- "ctc_table = '" + info.tableName + "'";
+ s = "delete from COMPLETED_TXN_COMPONENTS where ctc_database = ? and " +
+ "ctc_table = ?";
+ if (info.partName != null) {
+ s += " and ctc_partition = ?";
+ }
+ if(info.highestTxnId != 0) {
+ s += " and ctc_txnid <= ?";
+ }
+ pStmt = dbConn.prepareStatement(s);
+ int paramCount = 1;
+ pStmt.setString(paramCount++, info.dbname);
+ pStmt.setString(paramCount++, info.tableName);
if (info.partName != null) {
- s += " and ctc_partition = '" + info.partName + "'";
+ pStmt.setString(paramCount++, info.partName);
}
if(info.highestTxnId != 0) {
- s += " and ctc_txnid <= " + info.highestTxnId;
+ pStmt.setLong(paramCount++, info.highestTxnId);
}
LOG.debug("Going to execute update <" + s + ">");
- if (stmt.executeUpdate(s) < 1) {
+ if (pStmt.executeUpdate() < 1) {
LOG.error("Expected to remove at least one row from completed_txn_components when " +
"marking compaction entry as clean!");
}
s = "select distinct txn_id from TXNS, TXN_COMPONENTS where txn_id = tc_txnid and txn_state = '" +
- TXN_ABORTED + "' and tc_database = '" + info.dbname + "' and tc_table = '" +
- info.tableName + "'" + (info.highestTxnId == 0 ? "" : " and txn_id <= " + info.highestTxnId);
- if (info.partName != null) s += " and tc_partition = '" + info.partName + "'";
+ TXN_ABORTED + "' and tc_database = ? and tc_table = ?";
+ if (info.highestTxnId != 0) s += " and txn_id <= ?";
+ if (info.partName != null) s += " and tc_partition = ?";
+
+ pStmt = dbConn.prepareStatement(s);
+ paramCount = 1;
+ pStmt.setString(paramCount++, info.dbname);
+ pStmt.setString(paramCount++, info.tableName);
+ if(info.highestTxnId != 0) {
+ pStmt.setLong(paramCount++, info.highestTxnId);
+ }
+ if (info.partName != null) {
+ pStmt.setString(paramCount++, info.partName);
+ }
+
LOG.debug("Going to execute update <" + s + ">");
- rs = stmt.executeQuery(s);
+ rs = pStmt.executeQuery();
List<Long> txnids = new ArrayList<>();
- while (rs.next()) txnids.add(rs.getLong(1));
+ List<String> questions = new ArrayList<>();
+ while (rs.next()) {
+ long id = rs.getLong(1);
+ txnids.add(id);
+ questions.add("?");
+ }
// Remove entries from txn_components, as there may be aborted txn components
if (txnids.size() > 0) {
List<String> queries = new ArrayList<>();
@@ -397,21 +426,34 @@ class CompactionTxnHandler extends TxnHandler {
prefix.append("delete from TXN_COMPONENTS where ");
//because 1 txn may include different partitions/tables even in auto commit mode
- suffix.append(" and tc_database = ");
- suffix.append(quoteString(info.dbname));
- suffix.append(" and tc_table = ");
- suffix.append(quoteString(info.tableName));
+ suffix.append(" and tc_database = ?");
+ suffix.append(" and tc_table = ?");
if (info.partName != null) {
- suffix.append(" and tc_partition = ");
- suffix.append(quoteString(info.partName));
+ suffix.append(" and tc_partition = ?");
}
// Populate the complete query with provided prefix and suffix
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "tc_txnid", true, false);
+ List<Integer> counts = TxnUtils
+ .buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "tc_txnid",
+ true, false);
+ int totalCount = 0;
+ for (int i = 0; i < queries.size(); i++) {
+ String query = queries.get(i);
+ int insertCount = counts.get(i);
- for (String query : queries) {
LOG.debug("Going to execute update <" + query + ">");
- int rc = stmt.executeUpdate(query);
+ pStmt = dbConn.prepareStatement(query);
+ for (int j = 0; j < insertCount; j++) {
+ pStmt.setLong(j + 1, txnids.get(totalCount + j));
+ }
+ totalCount += insertCount;
+ paramCount = insertCount + 1;
+ pStmt.setString(paramCount++, info.dbname);
+ pStmt.setString(paramCount++, info.tableName);
+ if (info.partName != null) {
+ pStmt.setString(paramCount++, info.partName);
+ }
+ int rc = pStmt.executeUpdate();
LOG.debug("Removed " + rc + " records from txn_components");
// Don't bother cleaning from the txns table. A separate call will do that. We don't
@@ -430,8 +472,7 @@ class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- closeStmt(pStmt);
- close(rs, stmt, dbConn);
+ close(rs, pStmt, dbConn);
}
} catch (RetryException e) {
markCleaned(info);
@@ -599,34 +640,38 @@ class CompactionTxnHandler extends TxnHandler {
@RetrySemantics.ReadOnly
public List<String> findColumnsWithStats(CompactionInfo ci) throws MetaException {
Connection dbConn = null;
- Statement stmt = null;
+ PreparedStatement pStmt = null;
ResultSet rs = null;
try {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
String quote = getIdentifierQuoteString(dbConn);
- stmt = dbConn.createStatement();
StringBuilder bldr = new StringBuilder();
bldr.append("SELECT ").append(quote).append("COLUMN_NAME").append(quote)
.append(" FROM ")
.append(quote).append((ci.partName == null ? "TAB_COL_STATS" : "PART_COL_STATS"))
.append(quote)
.append(" WHERE ")
- .append(quote).append("DB_NAME").append(quote).append(" = '").append(ci.dbname)
- .append("' AND ").append(quote).append("TABLE_NAME").append(quote)
- .append(" = '").append(ci.tableName).append("'");
+ .append(quote).append("DB_NAME").append(quote).append(" = ?")
+ .append(" AND ").append(quote).append("TABLE_NAME").append(quote)
+ .append(" = ?");
if (ci.partName != null) {
- bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = '")
- .append(ci.partName).append("'");
+ bldr.append(" AND ").append(quote).append("PARTITION_NAME").append(quote).append(" = ?");
}
String s = bldr.toString();
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setString(1, ci.dbname);
+ pStmt.setString(2, ci.tableName);
+ if (ci.partName != null) {
+ pStmt.setString(3, ci.partName);
+ }
/*String s = "SELECT COLUMN_NAME FROM " + (ci.partName == null ? "TAB_COL_STATS" :
"PART_COL_STATS")
+ " WHERE DB_NAME='" + ci.dbname + "' AND TABLE_NAME='" + ci.tableName + "'"
+ (ci.partName == null ? "" : " AND PARTITION_NAME='" + ci.partName + "'");*/
LOG.debug("Going to execute <" + s + ">");
- rs = stmt.executeQuery(s);
+ rs = pStmt.executeQuery();
List<String> columns = new ArrayList<>();
while (rs.next()) {
columns.add(rs.getString(1));
@@ -642,7 +687,7 @@ class CompactionTxnHandler extends TxnHandler {
throw new MetaException("Unable to connect to transaction database " +
StringUtils.stringifyException(e));
} finally {
- close(rs, stmt, dbConn);
+ close(rs, pStmt, dbConn);
}
} catch (RetryException ex) {
return findColumnsWithStats(ci);
@@ -725,6 +770,7 @@ class CompactionTxnHandler extends TxnHandler {
public void purgeCompactionHistory() throws MetaException {
Connection dbConn = null;
Statement stmt = null;
+ PreparedStatement pStmt = null;
ResultSet rs = null;
List<Long> deleteSet = new ArrayList<>();
RetentionCounters rc = null;
@@ -764,11 +810,22 @@ class CompactionTxnHandler extends TxnHandler {
prefix.append("delete from COMPLETED_COMPACTIONS where ");
suffix.append("");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, deleteSet, "cc_id", false, false);
-
- for (String query : queries) {
+ List<String> questions = new ArrayList<>(deleteSet.size());
+ for (int i = 0; i < deleteSet.size(); i++) {
+ questions.add("?");
+ }
+ List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, questions, "cc_id", false, false);
+ int totalCount = 0;
+ for (int i = 0; i < queries.size(); i++) {
+ String query = queries.get(i);
+ long insertCount = counts.get(i);
LOG.debug("Going to execute update <" + query + ">");
- int count = stmt.executeUpdate(query);
+ pStmt = dbConn.prepareStatement(query);
+ for (int j = 0; j < insertCount; j++) {
+ pStmt.setLong(j + 1, deleteSet.get(totalCount + j));
+ }
+ totalCount += insertCount;
+ int count = pStmt.executeUpdate();
LOG.debug("Removed " + count + " records from COMPLETED_COMPACTIONS");
}
dbConn.commit();
@@ -779,6 +836,7 @@ class CompactionTxnHandler extends TxnHandler {
StringUtils.stringifyException(e));
} finally {
close(rs, stmt, dbConn);
+ closeStmt(pStmt);
}
} catch (RetryException ex) {
purgeCompactionHistory();
@@ -813,17 +871,22 @@ class CompactionTxnHandler extends TxnHandler {
@RetrySemantics.ReadOnly
public boolean checkFailedCompactions(CompactionInfo ci) throws MetaException {
Connection dbConn = null;
- Statement stmt = null;
+ PreparedStatement pStmt = null;
ResultSet rs = null;
try {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
- stmt = dbConn.createStatement();
- rs = stmt.executeQuery("select CC_STATE from COMPLETED_COMPACTIONS where " +
- "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
- "CC_TABLE = " + quoteString(ci.tableName) +
- (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
+ pStmt = dbConn.prepareStatement("select CC_STATE from COMPLETED_COMPACTIONS where " +
+ "CC_DATABASE = ? and " +
+ "CC_TABLE = ? " +
+ (ci.partName != null ? "and CC_PARTITION = ?" : "") +
" and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
+ pStmt.setString(1, ci.dbname);
+ pStmt.setString(2, ci.tableName);
+ if (ci.partName != null) {
+ pStmt.setString(3, ci.partName);
+ }
+ rs = pStmt.executeQuery();
int numFailed = 0;
int numTotal = 0;
int failedThreshold = MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
@@ -838,14 +901,14 @@ class CompactionTxnHandler extends TxnHandler {
return numFailed == failedThreshold;
}
catch (SQLException e) {
- LOG.error("Unable to delete from compaction queue " + e.getMessage());
+ LOG.error("Unable to check for failed compactions " + e.getMessage());
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
checkRetryable(dbConn, e, "checkFailedCompactions(" + ci + ")");
LOG.error("Unable to connect to transaction database " + StringUtils.stringifyException(e));
return false;//weren't able to check
} finally {
- close(rs, stmt, dbConn);
+ close(rs, pStmt, dbConn);
}
} catch (RetryException e) {
return checkFailedCompactions(ci);
@@ -869,12 +932,16 @@ class CompactionTxnHandler extends TxnHandler {
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
- rs = stmt.executeQuery("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = " + ci.id);
+ pStmt = dbConn.prepareStatement("select CQ_ID, CQ_DATABASE, CQ_TABLE, CQ_PARTITION, CQ_STATE, CQ_TYPE, CQ_TBLPROPERTIES, CQ_WORKER_ID, CQ_START, CQ_RUN_AS, CQ_HIGHEST_TXN_ID, CQ_META_INFO, CQ_HADOOP_JOB_ID from COMPACTION_QUEUE WHERE CQ_ID = ?");
+ pStmt.setLong(1, ci.id);
+ rs = pStmt.executeQuery();
if(rs.next()) {
ci = CompactionInfo.loadFullFromCompactionQueue(rs);
- String s = "delete from COMPACTION_QUEUE where cq_id = " + ci.id;
+ String s = "delete from COMPACTION_QUEUE where cq_id = ?";
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setLong(1, ci.id);
LOG.debug("Going to execute update <" + s + ">");
- int updCnt = stmt.executeUpdate(s);
+ int updCnt = pStmt.executeUpdate();
}
else {
if(ci.id > 0) {
@@ -897,6 +964,7 @@ class CompactionTxnHandler extends TxnHandler {
ci.state = FAILED_STATE;
}
close(rs, stmt, null);
+ closeStmt(pStmt);
pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_TBLPROPERTIES, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?)");
CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 2f01233..afb4f6b 100644
--- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
@@ -172,8 +173,9 @@ public class TxnUtils {
* @param addParens IN: add a pair of parenthesis outside the IN lists
* e.g. "(id in (1,2,3) OR id in (4,5,6))"
* @param notIn IN: is this for building a 'NOT IN' composite clause?
+ * @return OUT: a list of the count of IN list values that are in each of the corresponding queries
*/
- public static void buildQueryWithINClause(Configuration conf,
+ public static List<Integer> buildQueryWithINClause(Configuration conf,
List<String> queries,
StringBuilder prefix,
StringBuilder suffix,
@@ -181,6 +183,47 @@ public class TxnUtils {
String inColumn,
boolean addParens,
boolean notIn) {
+ List<String> inListStrings = new ArrayList<>(inList.size());
+ for (Long aLong : inList) {
+ inListStrings.add(aLong.toString());
+ }
+ return buildQueryWithINClauseStrings(conf, queries, prefix, suffix,
+ inListStrings, inColumn, addParens, notIn);
+
+ }
+ /**
+ * Build a query (or queries if one query is too big but only for the case of 'IN'
+ * composite clause. For the case of 'NOT IN' clauses, multiple queries change
+ * the semantics of the intended query.
+ * E.g., Let's assume that input "inList" parameter has [5, 6] and that
+ * _DIRECT_SQL_MAX_QUERY_LENGTH_ configuration parameter only allows one value in a 'NOT IN' clause,
+ * Then having two delete statements changes the semantics of the inteneded SQL statement.
+ * I.e. 'delete from T where a not in (5)' and 'delete from T where a not in (6)' sequence
+ * is not equal to 'delete from T where a not in (5, 6)'.)
+ * with one or multiple 'IN' or 'NOT IN' clauses with the given input parameters.
+ *
+ * Note that this method currently support only single column for
+ * IN/NOT IN clauses and that only covers OR-based composite 'IN' clause and
+ * AND-based composite 'NOT IN' clause.
+ * For example, for 'IN' clause case, the method will build a query with OR.
+ * E.g., "id in (1,2,3) OR id in (4,5,6)".
+ * For 'NOT IN' case, NOT IN list is broken into multiple 'NOT IN" clauses connected by AND.
+ *
+ * Note that, in this method, "a composite 'IN' clause" is defined as "a list of multiple 'IN'
+ * clauses in a query".
+ *
+ * @param queries OUT: Array of query strings
+ * @param prefix IN: Part of the query that comes before IN list
+ * @param suffix IN: Part of the query that comes after IN list
+ * @param inList IN: the list with IN list values
+ * @param inColumn IN: single column name of IN list operator
+ * @param addParens IN: add a pair of parenthesis outside the IN lists
+ * e.g. "(id in (1,2,3) OR id in (4,5,6))"
+ * @param notIn IN: is this for building a 'NOT IN' composite clause?
+ * @return OUT: a list of the count of IN list values that are in each of the corresponding queries
+ */
+ public static List<Integer> buildQueryWithINClauseStrings(Configuration conf, List<String> queries, StringBuilder prefix,
+ StringBuilder suffix, List<String> inList, String inColumn, boolean addParens, boolean notIn) {
// Get configuration parameters
int maxQueryLength = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH);
int batchSize = MetastoreConf.getIntVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE);
@@ -203,6 +246,8 @@ public class TxnUtils {
StringBuilder newInclausePrefix =
new StringBuilder(notIn ? " and " + inColumn + " not in (":
" or " + inColumn + " in (");
+ List<Integer> ret = new ArrayList<>();
+ int currentCount = 0;
// Loop over the given inList elements.
while( cursor4InListArray < inListSize || !nextItemNeeded) {
@@ -257,9 +302,11 @@ public class TxnUtils {
buf.append(suffix);
queries.add(buf.toString());
+ ret.add(currentCount);
// Prepare a new query string.
buf.setLength(0);
+ currentCount = 0;
cursor4queryOfInClauses = cursor4InClauseElements = 0;
querySize = 0;
newInclausePrefixJustAppended = false;
@@ -276,6 +323,7 @@ public class TxnUtils {
cursor4InClauseElements = 0;
} else {
buf.append(nextValue.toString()).append(",");
+ currentCount++;
nextItemNeeded = true;
newInclausePrefixJustAppended = false;
// increment cursor for elements per 'IN'/'NOT IN' clause.
@@ -293,6 +341,8 @@ public class TxnUtils {
}
buf.append(suffix);
queries.add(buf.toString());
+ ret.add(currentCount);
+ return ret;
}
/*
http://git-wip-us.apache.org/repos/asf/hive/blob/2a2f6427/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
----------------------------------------------------------------------
diff --git a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
index 7dd268f..0384f8b 100644
--- a/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
+++ b/standalone-metastore/src/test/java/org/apache/hadoop/hive/metastore/txn/TestTxnUtils.java
@@ -45,6 +45,7 @@ public class TestTxnUtils {
@Test
public void testBuildQueryWithINClause() throws Exception {
List<String> queries = new ArrayList<>();
+ List<Integer> ret;
StringBuilder prefix = new StringBuilder();
StringBuilder suffix = new StringBuilder();
@@ -61,16 +62,21 @@ public class TestTxnUtils {
for (long i = 1; i <= 189; i++) {
inList.add(i);
}
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(1, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
+ Assert.assertEquals(189L, ret.get(0).longValue());
runAgainstDerby(queries);
// Case 2 - Max in list members: 10; Max query string length: 1KB
// The first query has 2 full batches, and the second query only has 1 batch which only contains 1 member
queries.clear();
inList.add((long)190);
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(2, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
+ Assert.assertEquals(189L, ret.get(0).longValue());
+ Assert.assertEquals(1L, ret.get(1).longValue());
runAgainstDerby(queries);
// Case 3.1 - Max in list members: 1000, Max query string length: 1KB, and exact 1000 members in a single IN clause
@@ -80,16 +86,19 @@ public class TestTxnUtils {
for (long i = 191; i <= 1000; i++) {
inList.add(i);
}
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(5, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
+ Assert.assertEquals(267L, ret.get(0).longValue());
runAgainstDerby(queries);
// Case 3.2 - Max in list members: 1000, Max query string length: 10KB, and exact 1000 members in a single IN clause
MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10);
MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_ELEMENTS_IN_CLAUSE, 1000);
queries.clear();
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(1, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
runAgainstDerby(queries);
// Case 3.3 - Now with 2000 entries, try the above settings
@@ -98,19 +107,25 @@ public class TestTxnUtils {
}
MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 1);
queries.clear();
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(10, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
+ Assert.assertEquals(267L, ret.get(0).longValue());
+ Assert.assertEquals(240L, ret.get(1).longValue());
runAgainstDerby(queries);
MetastoreConf.setLongVar(conf, ConfVars.DIRECT_SQL_MAX_QUERY_LENGTH, 10);
queries.clear();
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(1, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
+ Assert.assertEquals(2000L, ret.get(0).longValue());
runAgainstDerby(queries);
// Case 4 - NOT IN list
queries.clear();
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, true);
Assert.assertEquals(1, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
runAgainstDerby(queries);
// Case 5 - Max in list members: 1000; Max query string length: 10KB
@@ -118,16 +133,21 @@ public class TestTxnUtils {
for (long i = 2001; i <= 4321; i++) {
inList.add(i);
}
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", true, false);
Assert.assertEquals(3, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
runAgainstDerby(queries);
// Case 6 - No parenthesis
queries.clear();
suffix.setLength(0);
suffix.append("");
- TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
+ ret = TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, inList, "TXN_ID", false, false);
Assert.assertEquals(3, queries.size());
+ Assert.assertEquals(queries.size(), ret.size());
+ Assert.assertEquals(2255L, ret.get(0).longValue());
+ Assert.assertEquals(2033L, ret.get(1).longValue());
+ Assert.assertEquals(33L, ret.get(2).longValue());
runAgainstDerby(queries);
}