You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2015/10/05 21:41:22 UTC

[03/23] hive git commit: HIVE-11998 - Improve Compaction process logging (Eugene Koifman, reviewed by Jason Dere)

HIVE-11998 - Improve Compaction process logging (Eugene Koifman, reviewed by Jason Dere)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a1bac802
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a1bac802
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a1bac802

Branch: refs/heads/llap
Commit: a1bac802a21efef8a2c10d616b2aeb680ffedd9c
Parents: b1eb0c0
Author: Eugene Koifman <ek...@hortonworks.com>
Authored: Fri Oct 2 10:12:29 2015 -0700
Committer: Eugene Koifman <ek...@hortonworks.com>
Committed: Fri Oct 2 10:12:29 2015 -0700

----------------------------------------------------------------------
 .../metastore/txn/CompactionTxnHandler.java     | 36 ++++++++++++++------
 .../hive/ql/txn/AcidHouseKeeperService.java     |  5 +--
 .../hadoop/hive/ql/txn/compactor/Cleaner.java   |  3 +-
 .../hive/ql/txn/compactor/CompactorThread.java  |  9 ++---
 .../hadoop/hive/ql/txn/compactor/Initiator.java |  5 ++-
 5 files changed, 38 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 328a65c..44ee5c6 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -122,8 +122,9 @@ public class CompactionTxnHandler extends TxnHandler {
         stmt = dbConn.createStatement();
         String s = "update COMPACTION_QUEUE set cq_run_as = '" + user + "' where cq_id = " + cq_id;
         LOG.debug("Going to execute update <" + s + ">");
-        if (stmt.executeUpdate(s) != 1) {
-          LOG.error("Unable to update compaction record");
+        int updCnt = stmt.executeUpdate(s);
+        if (updCnt != 1) {
+          LOG.error("Unable to set cq_run_as=" + user + " for compaction record with cq_id=" + cq_id + ".  updCnt=" + updCnt);
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
@@ -182,8 +183,10 @@ public class CompactionTxnHandler extends TxnHandler {
         s = "update COMPACTION_QUEUE set cq_worker_id = '" + workerId + "', " +
           "cq_start = " + now + ", cq_state = '" + WORKING_STATE + "' where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
-        if (stmt.executeUpdate(s) != 1) {
-          LOG.error("Unable to update compaction record");
+        int updCount = stmt.executeUpdate(s);
+        if (updCount != 1) {
+          LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " +
+            info + ". updCnt=" + updCount);
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
@@ -221,8 +224,9 @@ public class CompactionTxnHandler extends TxnHandler {
         String s = "update COMPACTION_QUEUE set cq_state = '" + READY_FOR_CLEANING + "', " +
           "cq_worker_id = null where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
-        if (stmt.executeUpdate(s) != 1) {
-          LOG.error("Unable to update compaction record");
+        int updCnt = stmt.executeUpdate(s);
+        if (updCnt != 1) {
+          LOG.error("Unable to set cq_state=" + READY_FOR_CLEANING + " for compaction record: " + info + ". updCnt=" + updCnt);
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
@@ -298,6 +302,17 @@ public class CompactionTxnHandler extends TxnHandler {
   /**
    * This will remove an entry from the queue after
    * it has been compacted.
+   * 
+   * todo: possibly a problem?  Worker will start with DB in state X (wrt this partition).
+   * while it's working more txns will happen, against partition it's compacting.
+   * then this will delete state up to X and since then.  There may be new delta files created
+   * between compaction starting and cleaning.  These will not be compacted until more
+   * transactions happen.  So this ideally should only delete
+   * up to TXN_ID that was compacted (i.e. HWM in Worker?)  Then this can also run
+   * at READ_COMMITTED
+   * 
+   * Also, by using this method when Worker fails, we prevent future compactions from
+   * running until more data is written to tale or compaction is invoked explicitly
    * @param info info on the compaction entry to remove
    */
   public void markCleaned(CompactionInfo info) throws MetaException {
@@ -309,8 +324,9 @@ public class CompactionTxnHandler extends TxnHandler {
         stmt = dbConn.createStatement();
         String s = "delete from COMPACTION_QUEUE where cq_id = " + info.id;
         LOG.debug("Going to execute update <" + s + ">");
-        if (stmt.executeUpdate(s) != 1) {
-          LOG.error("Unable to delete compaction record");
+        int updCount = stmt.executeUpdate(s);
+        if (updCount != 1) {
+          LOG.error("Unable to delete compaction record: " + info +  ".  Update count=" + updCount);
           LOG.debug("Going to rollback");
           dbConn.rollback();
         }
@@ -348,7 +364,7 @@ public class CompactionTxnHandler extends TxnHandler {
             else buf.append(", ");
             buf.append(id);
           }
-
+          //because 1 txn may include different partitions/tables even in auto commit mode
           buf.append(") and tc_database = '");
           buf.append(info.dbname);
           buf.append("' and tc_table = '");
@@ -415,7 +431,7 @@ public class CompactionTxnHandler extends TxnHandler {
           String bufStr = buf.toString();
           LOG.debug("Going to execute update <" + bufStr + ">");
           int rc = stmt.executeUpdate(bufStr);
-          LOG.debug("Removed " + rc + " records from txns");
+          LOG.info("Removed " + rc + "  empty Aborted transactions: " + txnids + " from TXNS");
           LOG.debug("Going to commit");
           dbConn.commit();
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
index d22ca8d..23a77e6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/AcidHouseKeeperService.java
@@ -84,9 +84,10 @@ public class AcidHouseKeeperService implements HouseKeeperService {
     @Override
     public void run() {
       try {
+        long startTime = System.currentTimeMillis();
         txnHandler.performTimeOuts();
-        owner.isAliveCounter.incrementAndGet();
-        LOG.info("timeout reaper ran");
+        int count = owner.isAliveCounter.incrementAndGet();
+        LOG.info("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds.  isAliveCounter=" + count);
       }
       catch(Throwable t) {
         LOG.fatal("Serious error in " + Thread.currentThread().getName() + ": " + t.getMessage(), t);

http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 16d2c81..622bf54 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -212,7 +212,7 @@ public class Cleaner extends CompactorThread {
       if (runJobAsSelf(ci.runAs)) {
         removeFiles(location, txnList);
       } else {
-        LOG.info("Cleaning as user " + ci.runAs);
+        LOG.info("Cleaning as user " + ci.runAs + " for " + ci.getFullPartitionName());
         UserGroupInformation ugi = UserGroupInformation.createProxyUser(ci.runAs,
             UserGroupInformation.getLoginUser());
         ugi.doAs(new PrivilegedExceptionAction<Object>() {
@@ -245,6 +245,7 @@ public class Cleaner extends CompactorThread {
           ", that hardly seems right.");
       return;
     }
+    LOG.info("About to remove " + filesToDelete.size() + " obsolete directories from " + location);
     FileSystem fs = filesToDelete.get(0).getFileSystem(conf);
 
     for (Path dead : filesToDelete) {

http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 38cd95e..c956f58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -119,8 +119,8 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
         throw e;
       }
       if (parts.size() != 1) {
-        LOG.error(ci.getFullPartitionName() + " does not refer to a single partition");
-        throw new MetaException("Too many partitions");
+        LOG.error(ci.getFullPartitionName() + " does not refer to a single partition. " + parts);
+        throw new MetaException("Too many partitions for : " + ci.getFullPartitionName());
       }
       return parts.get(0);
     } else {
@@ -179,8 +179,9 @@ abstract class CompactorThread extends Thread implements MetaStoreThread {
         return wrapper.get(0);
       }
     }
-    LOG.error("Unable to stat file as either current user or table owner, giving up");
-    throw new IOException("Unable to stat file");
+    LOG.error("Unable to stat file " + p + " as either current user(" + UserGroupInformation.getLoginUser() +
+      ") or table owner(" + t.getOwner() + "), giving up");
+    throw new IOException("Unable to stat file: " + p);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/a1bac802/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 9bf725d..f265311 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -99,7 +99,7 @@ public class Initiator extends CompactorThread {
 
               // check if no compaction set for this table
               if (noAutoCompactSet(t)) {
-                LOG.info("Table " + tableName(t) + " marked true so we will not compact it.");
+                LOG.info("Table " + tableName(t) + " marked " + hive_metastoreConstants.TABLE_NO_AUTO_COMPACT + "=true so we will not compact it.");
                 continue;
               }
 
@@ -297,11 +297,10 @@ public class Initiator extends CompactorThread {
   }
 
   private void requestCompaction(CompactionInfo ci, String runAs, CompactionType type) throws MetaException {
-    String s = "Requesting " + type.toString() + " compaction for " + ci.getFullPartitionName();
-    LOG.info(s);
     CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName, type);
     if (ci.partName != null) rqst.setPartitionname(ci.partName);
     rqst.setRunas(runAs);
+    LOG.info("Requesting compaction: " + rqst);
     txnHandler.compact(rqst);
   }