You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/05/18 17:36:50 UTC
hive git commit: HIVE-13691 No record with CQ_ID=0 found in
COMPACTION_QUEUE (Eugene Koifman, reviewed by Wei Zheng)
Repository: hive
Updated Branches:
refs/heads/master 3726ce590 -> 4959ff5bb
HIVE-13691 No record with CQ_ID=0 found in COMPACTION_QUEUE (Eugene Koifman, reviewed by Wei Zheng)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4959ff5b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4959ff5b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4959ff5b
Branch: refs/heads/master
Commit: 4959ff5bb7dc590e21f680b9d9be0f2270414309
Parents: 3726ce5
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Wed May 18 10:36:45 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Wed May 18 10:36:45 2016 -0700
----------------------------------------------------------------------
.../metastore/txn/CompactionTxnHandler.java | 27 +++++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 36 +++++----
.../hadoop/hive/ql/txn/compactor/Initiator.java | 5 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 84 +++++++++++++-------
4 files changed, 99 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index ab7da68..d2d6462 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -691,7 +691,7 @@ class CompactionTxnHandler extends TxnHandler {
}
/**
- * For any given compactable entity (partition, table if not partitioned) the history of compactions
+ * For any given compactable entity (partition; table if not partitioned) the history of compactions
* may look like "sssfffaaasffss", for example. The idea is to retain the tail (most recent) of the
* history such that a configurable number of each type of state is present. Any other entries
* can be purged. This scheme has advantage of always retaining the last failure/success even if
@@ -793,7 +793,7 @@ class CompactionTxnHandler extends TxnHandler {
"CC_DATABASE = " + quoteString(ci.dbname) + " and " +
"CC_TABLE = " + quoteString(ci.tableName) +
(ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
- " order by CC_ID desc");
+ " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
int numFailed = 0;
int numTotal = 0;
int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
@@ -824,8 +824,8 @@ class CompactionTxnHandler extends TxnHandler {
/**
* If there is an entry in compaction_queue with ci.id, remove it
* Make entry in completed_compactions with status 'f'.
- *
- * but what abount markCleaned() which is called when table is had been deleted...
+ * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction,
+ * which we record as ATTEMPTED_STATE entry in history.
*/
public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
//todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
@@ -845,12 +845,27 @@ class CompactionTxnHandler extends TxnHandler {
int updCnt = stmt.executeUpdate(s);
}
else {
- throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+ if(ci.id > 0) {
+ //the record with valid CQ_ID has disappeared - this is a sign of something wrong
+ throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+ }
+ }
+ if(ci.id == 0) {
+ //The failure occurred before we even made an entry in COMPACTION_QUEUE
+ //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+ ci.id = generateCompactionQueueId(stmt);
+ //mostly this indicates that the Initiator is paying attention to some table even though
+ //compactions are not happening.
+ ci.state = ATTEMPTED_STATE;
+ //this is not strictly accurate, but 'type' cannot be null.
+ ci.type = CompactionType.MINOR;
+ }
+ else {
+ ci.state = FAILED_STATE;
}
close(rs, stmt, null);
pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
- ci.state = FAILED_STATE;
CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
int updCount = pStmt.executeUpdate();
LOG.debug("Going to commit");
http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index f061767..bc818e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.metastore.txn;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Service;
import com.jolbox.bonecp.BoneCPConfig;
import com.jolbox.bonecp.BoneCPDataSource;
import org.apache.commons.dbcp.ConnectionFactory;
@@ -1252,6 +1253,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException {
+ // Get the id for the next entry in the queue
+ String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
+ LOG.debug("going to execute query <" + s + ">");
+ ResultSet rs = stmt.executeQuery(s);
+ if (!rs.next()) {
+ throw new IllegalStateException("Transaction tables not properly initiated, " +
+ "no record found in next_compaction_queue_id");
+ }
+ long id = rs.getLong(1);
+ s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
+ LOG.debug("Going to execute update <" + s + ">");
+ stmt.executeUpdate(s);
+ return id;
+ }
public long compact(CompactionRequest rqst) throws MetaException {
// Put a compaction request in the queue.
try {
@@ -1261,21 +1277,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
lockInternal();
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
-
- // Get the id for the next entry in the queue
- String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
- LOG.debug("going to execute query <" + s + ">");
- ResultSet rs = stmt.executeQuery(s);
- if (!rs.next()) {
- LOG.debug("Going to rollback");
- dbConn.rollback();
- throw new MetaException("Transaction tables not properly initiated, " +
- "no record found in next_compaction_queue_id");
- }
- long id = rs.getLong(1);
- s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
- LOG.debug("Going to execute update <" + s + ">");
- stmt.executeUpdate(s);
+
+ long id = generateCompactionQueueId(stmt);
StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
"cq_table, ");
@@ -1315,7 +1318,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
buf.append(rqst.getRunas());
}
buf.append("')");
- s = buf.toString();
+ String s = buf.toString();
LOG.debug("Going to execute update <" + s + ">");
stmt.executeUpdate(s);
LOG.debug("Going to commit");
@@ -1366,6 +1369,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
+ case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break;
default:
//do nothing to handle RU/D if we add another status
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 949cbd5..a55fa1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -126,8 +126,9 @@ public class Initiator extends CompactorThread {
continue;
}
if(txnHandler.checkFailedCompactions(ci)) {
- //todo: make 'a' state entry in completed_compactions
- LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed.");
+ LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last "
+ + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed.");
+ txnHandler.markFailed(ci);
continue;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 903337d..d80a03e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -492,6 +492,15 @@ public class TestTxnCommands2 {
Assert.assertEquals("Insert overwrite partition failed", stringifyValues(updatedData), rs2);
//insert overwrite not supported for ACID tables
}
+ private static void checkCompactionState(CompactionsByState expected, CompactionsByState actual) {
+ Assert.assertEquals(TxnStore.ATTEMPTED_RESPONSE, expected.attempted, actual.attempted);
+ Assert.assertEquals(TxnStore.FAILED_RESPONSE, expected.failed, actual.failed);
+ Assert.assertEquals(TxnStore.INITIATED_RESPONSE, expected.initiated, actual.initiated);
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, expected.readyToClean, actual.readyToClean);
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, expected.succeeded, actual.succeeded);
+ Assert.assertEquals(TxnStore.WORKING_RESPONSE, expected.working, actual.working);
+ Assert.assertEquals("total", expected.total, actual.total);
+ }
/**
* HIVE-12353
* @throws Exception
@@ -519,58 +528,60 @@ public class TestTxnCommands2 {
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
runWorker(hiveConf);
}
- //this should not schedule a new compaction due to prior failures
+ //this should not schedule a new compaction due to prior failures, but will create Attempted entry
Initiator init = new Initiator();
init.setThreadId((int)init.getId());
init.setHiveConf(hiveConf);
init.init(stop, new AtomicBoolean());
init.run();
-
- CompactionsByState cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed);
- Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total);
+ int numAttemptedCompactions = 1;
+ checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed);
- Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total);
+ checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
runWorker(hiveConf);//will fail
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
runWorker(hiveConf);//will fail
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed);
- Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total);
+ init.run();
+ numAttemptedCompactions++;
+ init.run();
+ numAttemptedCompactions++;
+ checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions + 2,0,0,0,0,numFailedCompactions + 2 + numAttemptedCompactions), countCompacts(txnHandler));
+
runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have
//COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total);
-
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,0,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)), countCompacts(txnHandler));
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
- //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated);
- Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+ //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated (explicitly by user)
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),1,0,0,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean);
- Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
-
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,1,0,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
+
runCleaner(hiveConf); // transition to Success state
runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes
- cbs = countCompacts(txnHandler);
- Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
- Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
- Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+ checkCompactionState(new CompactionsByState(
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,1,0,
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+ hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
}
/**
@@ -624,6 +635,18 @@ public class TestTxnCommands2 {
private int succeeded;
private int working;
private int total;
+ CompactionsByState() {
+ this(0,0,0,0,0,0,0);
+ }
+ CompactionsByState(int attempted, int failed, int initiated, int readyToClean, int succeeded, int working, int total) {
+ this.attempted = attempted;
+ this.failed = failed;
+ this.initiated = initiated;
+ this.readyToClean = readyToClean;
+ this.succeeded = succeeded;
+ this.working = working;
+ this.total = total;
+ }
}
private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
@@ -648,6 +671,9 @@ public class TestTxnCommands2 {
else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
compactionsByState.attempted++;
}
+ else {
+ throw new IllegalStateException("Unexpected state: " + compact.getState());
+ }
}
return compactionsByState;
}