You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/25 10:51:35 UTC

[hive] branch master updated: HIVE-21917: COMPLETED_TXN_COMPONENTS table is never cleaned up unless Compactor runs (Denys Kuzmenko reviewed by Craig Condit and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 47d3b59  HIVE-21917: COMPLETED_TXN_COMPONENTS table is never cleaned up unless Compactor runs (Denys Kuzmenko reviewed by Craig Condit and Peter Vary)
47d3b59 is described below

commit 47d3b59da6517ac69c5e0a341be457a162d4faad
Author: denys kuzmenko <dk...@cloudera.com>
AuthorDate: Mon Nov 25 11:50:34 2019 +0100

    HIVE-21917: COMPLETED_TXN_COMPONENTS table is never cleaned up unless Compactor runs (Denys Kuzmenko reviewed by Craig Condit and Peter Vary)
---
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 10 +++++--
 .../metastore/txn/TestCompactionTxnHandler.java    | 19 ++++++++++++
 .../hive/metastore/txn/CompactionTxnHandler.java   | 34 +++++++++++++++++-----
 .../hadoop/hive/metastore/txn/TxnHandler.java      | 24 +++++++++++++++
 .../apache/hadoop/hive/metastore/txn/TxnStore.java |  8 +++--
 5 files changed, 81 insertions(+), 14 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 610cf05..7a0e324 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -68,10 +68,10 @@ public class Initiator extends MetaStoreCompactorThread {
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
 
   static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold.";
-  Map<String, String> tblNameOwnersCache = new HashMap<>();
-
+  private Map<String, String> tblNameOwnersCache = new HashMap<>();
 
   private long checkInterval;
+  private long prevStart = -1;
 
   @Override
   public void run() {
@@ -95,9 +95,13 @@ public class Initiator extends MetaStoreCompactorThread {
         try {
           handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
           startedAt = System.currentTimeMillis();
+
+          long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - prevStart)/1000;
+          prevStart = startedAt;
+
           //todo: add method to only get current i.e. skip history - more efficient
           ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
-          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold)
+          Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval)
                   .stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet());
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index b28b577..e589554 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
 
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertFalse;
@@ -379,6 +380,24 @@ public class TestCompactionTxnHandler {
     }
     assertTrue(sawMyTable);
     assertTrue(sawYourTable);
+
+    potentials = txnHandler.findPotentialCompactions(100, 1);
+    assertEquals(2, potentials.size());
+
+    //simulate auto-compaction interval
+    TimeUnit.SECONDS.sleep(2);
+
+    potentials = txnHandler.findPotentialCompactions(100, 1);
+    assertEquals(0, potentials.size());
+
+    //simulate prev failed compaction
+    CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MINOR);
+    txnHandler.compact(rqst);
+    CompactionInfo ci = txnHandler.findNextToCompact("fred");
+    txnHandler.markFailed(ci);
+
+    potentials = txnHandler.findPotentialCompactions(100, 1);
+    assertEquals(1, potentials.size());
   }
 
   // TODO test changes to mark cleaned to clean txns and txn_components
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 8253ccb..aded6f5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -53,14 +53,19 @@ class CompactionTxnHandler extends TxnHandler {
    * This will look through the completed_txn_components table and look for partitions or tables
    * that may be ready for compaction.  Also, look through txns and txn_components tables for
    * aborted transactions that we should add to the list.
-   * @param maxAborted Maximum number of aborted queries to allow before marking this as a
-   *                   potential compaction.
+   * @param abortedThreshold  number of aborted queries forming a potential compaction request.
    * @return list of CompactionInfo structs.  These will not have id, type,
    * or runAs set since these are only potential compactions not actual ones.
    */
   @Override
   @RetrySemantics.ReadOnly
-  public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+  public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException {
+    return findPotentialCompactions(abortedThreshold, -1);
+  }
+
+  @Override
+  @RetrySemantics.ReadOnly
+  public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException {
     Connection dbConn = null;
     Set<CompactionInfo> response = new HashSet<>();
     Statement stmt = null;
@@ -70,8 +75,21 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
         stmt = dbConn.createStatement();
         // Check for completed transactions
-        String s = "select distinct ctc_database, ctc_table, " +
-          "ctc_partition from COMPLETED_TXN_COMPONENTS";
+        String s = "select distinct tc.ctc_database, tc.ctc_table, tc.ctc_partition " +
+          "from COMPLETED_TXN_COMPONENTS tc " + (checkInterval > 0 ?
+          "left join ( " +
+          "  select c1.* from COMPLETED_COMPACTIONS c1 " +
+          "  inner join ( " +
+          "    select max(cc_id) cc_id from COMPLETED_COMPACTIONS " +
+          "    group by cc_database, cc_table, cc_partition" +
+          "  ) c2 " +
+          "  on c1.cc_id = c2.cc_id " +
+          "  where c1.cc_state IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" +
+          ") c " +
+          "on tc.ctc_database = c.cc_database and tc.ctc_table = c.cc_table " +
+          "  and (tc.ctc_partition = c.cc_partition or (tc.ctc_partition is null and c.cc_partition is null)) " +
+          "where c.cc_id is not null or " + isWithinCheckInterval("tc.ctc_timestamp", checkInterval) : "");
+
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
         while (rs.next()) {
@@ -88,7 +106,7 @@ class CompactionTxnHandler extends TxnHandler {
           "from TXNS, TXN_COMPONENTS " +
           "where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
           "group by tc_database, tc_table, tc_partition " +
-          "having count(*) > " + maxAborted;
+          "having count(*) > " + abortedThreshold;
 
         LOG.debug("Going to execute query <" + s + ">");
         rs = stmt.executeQuery(s);
@@ -105,14 +123,14 @@ class CompactionTxnHandler extends TxnHandler {
         dbConn.rollback();
       } catch (SQLException e) {
         LOG.error("Unable to connect to transaction database " + e.getMessage());
-        checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
+        checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + abortedThreshold + ")");
       } finally {
         close(rs, stmt, dbConn);
       }
       return response;
     }
     catch (RetryException e) {
-      return findPotentialCompactions(maxAborted);
+      return findPotentialCompactions(abortedThreshold, checkInterval);
     }
   }
   /**
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 2680387..5fb6d86 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3917,6 +3917,30 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
     }
   }
 
+  protected String isWithinCheckInterval(String expr, long interval) throws MetaException {
+    String condition;
+    switch (dbProduct) {
+      case DERBY:
+        condition = " {fn TIMESTAMPDIFF(sql_tsi_second, " + expr + ", current_timestamp)} <= " + interval;
+        break;
+      case MYSQL:
+      case POSTGRES:
+        condition = expr + " => current_timestamp - interval '" + interval + "' second";
+        break;
+      case SQLSERVER:
+        condition = "DATEDIFF(second, " + expr + ", current_timestamp) <= " + interval;
+        break;
+      case ORACLE:
+        condition = expr + " => current_timestamp - numtodsinterval(" + interval + " , 'second')";
+        break;
+      default:
+        String msg = "Unknown database product: " + dbProduct.toString();
+        LOG.error(msg);
+        throw new MetaException(msg);
+    }
+    return condition;
+  }
+
   /**
    * Determine the String that should be used to quote identifiers.
    * @param conn Active connection
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index e840758..41d2e79 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -314,13 +314,15 @@ public interface TxnStore extends Configurable {
    * This will look through the completed_txn_components table and look for partitions or tables
    * that may be ready for compaction.  Also, look through txns and txn_components tables for
    * aborted transactions that we should add to the list.
-   * @param maxAborted Maximum number of aborted queries to allow before marking this as a
-   *                   potential compaction.
+   * @param abortedThreshold  number of aborted queries forming a potential compaction request.
    * @return list of CompactionInfo structs.  These will not have id, type,
    * or runAs set since these are only potential compactions not actual ones.
    */
   @RetrySemantics.ReadOnly
-  Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+  Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException;
+
+  @RetrySemantics.ReadOnly
+  Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException;
 
   /**
    * This updates COMPACTION_QUEUE.  Set runAs username for the case where the request was