You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ek...@apache.org on 2016/05/18 17:36:50 UTC

hive git commit: HIVE-13691 No record with CQ_ID=0 found in COMPACTION_QUEUE (Eugene Koifman, reviewed by Wei Zheng)

Repository: hive
Updated Branches:
  refs/heads/master 3726ce590 -> 4959ff5bb


HIVE-13691 No record with CQ_ID=0 found in COMPACTION_QUEUE (Eugene Koifman, reviewed by Wei Zheng)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4959ff5b
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4959ff5b
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4959ff5b

Branch: refs/heads/master
Commit: 4959ff5bb7dc590e21f680b9d9be0f2270414309
Parents: 3726ce5
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Wed May 18 10:36:45 2016 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Wed May 18 10:36:45 2016 -0700

----------------------------------------------------------------------
 .../metastore/txn/CompactionTxnHandler.java     | 27 +++++--
 .../hadoop/hive/metastore/txn/TxnHandler.java   | 36 +++++----
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  5 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 84 +++++++++++++-------
 4 files changed, 99 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index ab7da68..d2d6462 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -691,7 +691,7 @@ class CompactionTxnHandler extends TxnHandler {
   }
 
   /**
-   * For any given compactable entity (partition, table if not partitioned) the history of compactions
+   * For any given compactable entity (partition; table if not partitioned) the history of compactions
    * may look like "sssfffaaasffss", for example.  The idea is to retain the tail (most recent) of the
    * history such that a configurable number of each type of state is present.  Any other entries
    * can be purged.  This scheme has advantage of always retaining the last failure/success even if
@@ -793,7 +793,7 @@ class CompactionTxnHandler extends TxnHandler {
           "CC_DATABASE = " + quoteString(ci.dbname) + " and " +
           "CC_TABLE = " + quoteString(ci.tableName) +
           (ci.partName != null ? "and CC_PARTITION = " + quoteString(ci.partName) : "") +
-          " order by CC_ID desc");
+          " and CC_STATE != " + quoteChar(ATTEMPTED_STATE) + " order by CC_ID desc");
         int numFailed = 0;
         int numTotal = 0;
         int failedThreshold = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
@@ -824,8 +824,8 @@ class CompactionTxnHandler extends TxnHandler {
   /**
    * If there is an entry in compaction_queue with ci.id, remove it
    * Make entry in completed_compactions with status 'f'.
-   *
-   * but what abount markCleaned() which is called when table is had been deleted...
+   * If there is no entry in compaction_queue, it means Initiator failed to even schedule a compaction,
+   * which we record as ATTEMPTED_STATE entry in history.
    */
   public void markFailed(CompactionInfo ci) throws MetaException {//todo: this should not throw
     //todo: this should take "comment" as parameter to set in CC_META_INFO to provide some context for the failure
@@ -845,12 +845,27 @@ class CompactionTxnHandler extends TxnHandler {
           int updCnt = stmt.executeUpdate(s);
         }
         else {
-          throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+          if(ci.id > 0) {
+            //the record with valid CQ_ID has disappeared - this is a sign of something wrong
+            throw new IllegalStateException("No record with CQ_ID=" + ci.id + " found in COMPACTION_QUEUE");
+          }
+        }
+        if(ci.id == 0) {
+          //The failure occurred before we even made an entry in COMPACTION_QUEUE
+          //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+          ci.id = generateCompactionQueueId(stmt);
+          //mostly this indicates that the Initiator is paying attention to some table even though
+          //compactions are not happening.
+          ci.state = ATTEMPTED_STATE;
+          //this is not strictly accurate, but 'type' cannot be null.
+          ci.type = CompactionType.MINOR;
+        }
+        else {
+          ci.state = FAILED_STATE;
         }
         close(rs, stmt, null);
 
         pStmt = dbConn.prepareStatement("insert into COMPLETED_COMPACTIONS(CC_ID, CC_DATABASE, CC_TABLE, CC_PARTITION, CC_STATE, CC_TYPE, CC_WORKER_ID, CC_START, CC_END, CC_RUN_AS, CC_HIGHEST_TXN_ID, CC_META_INFO, CC_HADOOP_JOB_ID) VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?)");
-        ci.state = FAILED_STATE;
         CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, getDbTime(dbConn));
         int updCount = pStmt.executeUpdate();
         LOG.debug("Going to commit");

http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index f061767..bc818e0 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.metastore.txn;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Service;
 import com.jolbox.bonecp.BoneCPConfig;
 import com.jolbox.bonecp.BoneCPDataSource;
 import org.apache.commons.dbcp.ConnectionFactory;
@@ -1252,6 +1253,21 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  long generateCompactionQueueId(Statement stmt) throws SQLException, MetaException {
+    // Get the id for the next entry in the queue
+    String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
+    LOG.debug("going to execute query <" + s + ">");
+    ResultSet rs = stmt.executeQuery(s);
+    if (!rs.next()) {
+      throw new IllegalStateException("Transaction tables not properly initiated, " +
+        "no record found in next_compaction_queue_id");
+    }
+    long id = rs.getLong(1);
+    s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
+    LOG.debug("Going to execute update <" + s + ">");
+    stmt.executeUpdate(s);
+    return id;
+  }
   public long compact(CompactionRequest rqst) throws MetaException {
     // Put a compaction request in the queue.
     try {
@@ -1261,21 +1277,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
         lockInternal();
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
-
-        // Get the id for the next entry in the queue
-        String s = addForUpdateClause("select ncq_next from NEXT_COMPACTION_QUEUE_ID");
-        LOG.debug("going to execute query <" + s + ">");
-        ResultSet rs = stmt.executeQuery(s);
-        if (!rs.next()) {
-          LOG.debug("Going to rollback");
-          dbConn.rollback();
-          throw new MetaException("Transaction tables not properly initiated, " +
-            "no record found in next_compaction_queue_id");
-        }
-        long id = rs.getLong(1);
-        s = "update NEXT_COMPACTION_QUEUE_ID set ncq_next = " + (id + 1);
-        LOG.debug("Going to execute update <" + s + ">");
-        stmt.executeUpdate(s);
+        
+        long id = generateCompactionQueueId(stmt);
 
         StringBuilder buf = new StringBuilder("insert into COMPACTION_QUEUE (cq_id, cq_database, " +
           "cq_table, ");
@@ -1315,7 +1318,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
           buf.append(rqst.getRunas());
         }
         buf.append("')");
-        s = buf.toString();
+        String s = buf.toString();
         LOG.debug("Going to execute update <" + s + ">");
         stmt.executeUpdate(s);
         LOG.debug("Going to commit");
@@ -1366,6 +1369,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
             case READY_FOR_CLEANING: e.setState(CLEANING_RESPONSE); break;
             case FAILED_STATE: e.setState(FAILED_RESPONSE); break;
             case SUCCEEDED_STATE: e.setState(SUCCEEDED_RESPONSE); break;
+            case ATTEMPTED_STATE: e.setState(ATTEMPTED_RESPONSE); break;
             default:
               //do nothing to handle RU/D if we add another status
           }

http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 949cbd5..a55fa1c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -126,8 +126,9 @@ public class Initiator extends CompactorThread {
                 continue;
               }
               if(txnHandler.checkFailedCompactions(ci)) {
-                //todo: make 'a' state entry in completed_compactions
-                LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last 3 attempts to compact it failed.");
+                LOG.warn("Will not initiate compaction for " + ci.getFullPartitionName() + " since last "
+                  + HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD + " attempts to compact it failed.");
+                txnHandler.markFailed(ci);
                 continue;
               }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/4959ff5b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 903337d..d80a03e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -492,6 +492,15 @@ public class TestTxnCommands2 {
     Assert.assertEquals("Insert overwrite partition failed", stringifyValues(updatedData), rs2);
     //insert overwrite not supported for ACID tables
   }
+  private static void checkCompactionState(CompactionsByState expected, CompactionsByState actual) {
+    Assert.assertEquals(TxnStore.ATTEMPTED_RESPONSE, expected.attempted, actual.attempted);
+    Assert.assertEquals(TxnStore.FAILED_RESPONSE, expected.failed, actual.failed);
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, expected.initiated, actual.initiated);
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, expected.readyToClean, actual.readyToClean);
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, expected.succeeded, actual.succeeded);
+    Assert.assertEquals(TxnStore.WORKING_RESPONSE, expected.working, actual.working);
+    Assert.assertEquals("total", expected.total, actual.total);
+  }
   /**
    * HIVE-12353
    * @throws Exception
@@ -519,58 +528,60 @@ public class TestTxnCommands2 {
       txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
       runWorker(hiveConf);
     }
-    //this should not schedule a new compaction due to prior failures
+    //this should not schedule a new compaction due to prior failures, but will create Attempted entry
     Initiator init = new Initiator();
     init.setThreadId((int)init.getId());
     init.setHiveConf(hiveConf);
     init.init(stop, new AtomicBoolean());
     init.run();
-
-    CompactionsByState cbs = countCompacts(txnHandler);
-    Assert.assertEquals("Unexpected number of failed compactions", numFailedCompactions, cbs.failed);
-    Assert.assertEquals("Unexpected total number of compactions", numFailedCompactions, cbs.total);
+    int numAttemptedCompactions = 1;
+    checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
     
     hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
     AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
     runHouseKeeperService(compactionHistoryService, hiveConf);//should not remove anything from history
-    cbs = countCompacts(txnHandler);
-    Assert.assertEquals("Number of failed compactions after History clean", numFailedCompactions, cbs.failed);
-    Assert.assertEquals("Total number of compactions after History clean", numFailedCompactions, cbs.total);
+    checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
 
     txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
     runWorker(hiveConf);//will fail
     txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
     runWorker(hiveConf);//will fail
-    cbs = countCompacts(txnHandler);
-    Assert.assertEquals("Unexpected num failed1", numFailedCompactions + 2, cbs.failed);
-    Assert.assertEquals("Unexpected num total1", numFailedCompactions + 2, cbs.total);
+    init.run();
+    numAttemptedCompactions++;
+    init.run();
+    numAttemptedCompactions++;
+    checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions + 2,0,0,0,0,numFailedCompactions + 2 + numAttemptedCompactions), countCompacts(txnHandler));
+
     runHouseKeeperService(compactionHistoryService, hiveConf);//should remove history so that we have
     //COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
-    cbs = countCompacts(txnHandler);
-    Assert.assertEquals("Unexpected num failed2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
-    Assert.assertEquals("Unexpected num total2", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.total);
-    
+    checkCompactionState(new CompactionsByState(
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,0,0,
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)), countCompacts(txnHandler));
     
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, false);
     txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MINOR));
-    //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated
-    cbs = countCompacts(txnHandler);
-    Assert.assertEquals("Unexpected num failed3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
-    Assert.assertEquals("Unexpected num initiated", 1, cbs.initiated);
-    Assert.assertEquals("Unexpected num total3", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+    //at this point "show compactions" should have (COMPACTOR_HISTORY_RETENTION_FAILED) failed + 1 initiated (explicitly by user)
+    checkCompactionState(new CompactionsByState(
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),1,0,0,0,
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+        hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
 
     runWorker(hiveConf);//will succeed and transition to Initiated->Working->Ready for Cleaning
-    cbs = countCompacts(txnHandler);
-    Assert.assertEquals("Unexpected num failed4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
-    Assert.assertEquals("Unexpected num ready to clean", 1, cbs.readyToClean);
-    Assert.assertEquals("Unexpected num total4", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
-    
+    checkCompactionState(new CompactionsByState(
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,1,0,0,
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+        hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
+
     runCleaner(hiveConf); // transition to Success state
     runHouseKeeperService(compactionHistoryService, hiveConf);//should not purge anything as all items within retention sizes
-    cbs = countCompacts(txnHandler);
-    Assert.assertEquals("Unexpected num failed5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED), cbs.failed);
-    Assert.assertEquals("Unexpected num succeeded", 1, cbs.succeeded);
-    Assert.assertEquals("Unexpected num total5", hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) + 1, cbs.total);
+    checkCompactionState(new CompactionsByState(
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,1,0,
+      hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED) +
+        hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
   }
 
   /**
@@ -624,6 +635,18 @@ public class TestTxnCommands2 {
     private int succeeded;
     private int working;
     private int total;
+    CompactionsByState() {
+      this(0,0,0,0,0,0,0);
+    }
+    CompactionsByState(int attempted, int failed, int initiated, int readyToClean, int succeeded, int working, int total) {
+      this.attempted = attempted;
+      this.failed = failed;
+      this.initiated = initiated;
+      this.readyToClean = readyToClean;
+      this.succeeded = succeeded;
+      this.working = working;
+      this.total = total;
+    }
   }
   private static CompactionsByState countCompacts(TxnStore txnHandler) throws MetaException {
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
@@ -648,6 +671,9 @@ public class TestTxnCommands2 {
       else if(TxnStore.ATTEMPTED_RESPONSE.equals(compact.getState())) {
         compactionsByState.attempted++;
       }
+      else {
+        throw new IllegalStateException("Unexpected state: " + compact.getState());
+      }
     }
     return compactionsByState;
   }