You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/11/25 10:51:35 UTC
[hive] branch master updated: HIVE-21917: COMPLETED_TXN_COMPONENTS
table is never cleaned up unless Compactor runs (Denys Kuzmenko reviewed by
Craig Condit and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 47d3b59 HIVE-21917: COMPLETED_TXN_COMPONENTS table is never cleaned up unless Compactor runs (Denys Kuzmenko reviewed by Craig Condit and Peter Vary)
47d3b59 is described below
commit 47d3b59da6517ac69c5e0a341be457a162d4faad
Author: denys kuzmenko <dk...@cloudera.com>
AuthorDate: Mon Nov 25 11:50:34 2019 +0100
HIVE-21917: COMPLETED_TXN_COMPONENTS table is never cleaned up unless Compactor runs (Denys Kuzmenko reviewed by Craig Condit and Peter Vary)
---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 10 +++++--
.../metastore/txn/TestCompactionTxnHandler.java | 19 ++++++++++++
.../hive/metastore/txn/CompactionTxnHandler.java | 34 +++++++++++++++++-----
.../hadoop/hive/metastore/txn/TxnHandler.java | 24 +++++++++++++++
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 8 +++--
5 files changed, 81 insertions(+), 14 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 610cf05..7a0e324 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -68,10 +68,10 @@ public class Initiator extends MetaStoreCompactorThread {
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold.";
- Map<String, String> tblNameOwnersCache = new HashMap<>();
-
+ private Map<String, String> tblNameOwnersCache = new HashMap<>();
private long checkInterval;
+ private long prevStart = -1;
@Override
public void run() {
@@ -95,9 +95,13 @@ public class Initiator extends MetaStoreCompactorThread {
try {
handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Initiator.name());
startedAt = System.currentTimeMillis();
+
+ long compactionInterval = (prevStart < 0) ? prevStart : (startedAt - prevStart)/1000;
+ prevStart = startedAt;
+
//todo: add method to only get current i.e. skip history - more efficient
ShowCompactResponse currentCompactions = txnHandler.showCompact(new ShowCompactRequest());
- Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold)
+ Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold, compactionInterval)
.stream().filter(ci -> checkCompactionElig(ci)).collect(Collectors.toSet());
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index b28b577..e589554 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertFalse;
@@ -379,6 +380,24 @@ public class TestCompactionTxnHandler {
}
assertTrue(sawMyTable);
assertTrue(sawYourTable);
+
+ potentials = txnHandler.findPotentialCompactions(100, 1);
+ assertEquals(2, potentials.size());
+
+ //simulate auto-compaction interval
+ TimeUnit.SECONDS.sleep(2);
+
+ potentials = txnHandler.findPotentialCompactions(100, 1);
+ assertEquals(0, potentials.size());
+
+ //simulate prev failed compaction
+ CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MINOR);
+ txnHandler.compact(rqst);
+ CompactionInfo ci = txnHandler.findNextToCompact("fred");
+ txnHandler.markFailed(ci);
+
+ potentials = txnHandler.findPotentialCompactions(100, 1);
+ assertEquals(1, potentials.size());
}
// TODO test changes to mark cleaned to clean txns and txn_components
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 8253ccb..aded6f5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -53,14 +53,19 @@ class CompactionTxnHandler extends TxnHandler {
* This will look through the completed_txn_components table and look for partitions or tables
* that may be ready for compaction. Also, look through txns and txn_components tables for
* aborted transactions that we should add to the list.
- * @param maxAborted Maximum number of aborted queries to allow before marking this as a
- * potential compaction.
+ * @param abortedThreshold number of aborted queries forming a potential compaction request.
* @return list of CompactionInfo structs. These will not have id, type,
* or runAs set since these are only potential compactions not actual ones.
*/
@Override
@RetrySemantics.ReadOnly
- public Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException {
+ public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException {
+ return findPotentialCompactions(abortedThreshold, -1);
+ }
+
+ @Override
+ @RetrySemantics.ReadOnly
+ public Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException {
Connection dbConn = null;
Set<CompactionInfo> response = new HashSet<>();
Statement stmt = null;
@@ -70,8 +75,21 @@ class CompactionTxnHandler extends TxnHandler {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
stmt = dbConn.createStatement();
// Check for completed transactions
- String s = "select distinct ctc_database, ctc_table, " +
- "ctc_partition from COMPLETED_TXN_COMPONENTS";
+ String s = "select distinct tc.ctc_database, tc.ctc_table, tc.ctc_partition " +
+ "from COMPLETED_TXN_COMPONENTS tc " + (checkInterval > 0 ?
+ "left join ( " +
+ " select c1.* from COMPLETED_COMPACTIONS c1 " +
+ " inner join ( " +
+ " select max(cc_id) cc_id from COMPLETED_COMPACTIONS " +
+ " group by cc_database, cc_table, cc_partition" +
+ " ) c2 " +
+ " on c1.cc_id = c2.cc_id " +
+ " where c1.cc_state IN (" + quoteChar(ATTEMPTED_STATE) + "," + quoteChar(FAILED_STATE) + ")" +
+ ") c " +
+ "on tc.ctc_database = c.cc_database and tc.ctc_table = c.cc_table " +
+ " and (tc.ctc_partition = c.cc_partition or (tc.ctc_partition is null and c.cc_partition is null)) " +
+ "where c.cc_id is not null or " + isWithinCheckInterval("tc.ctc_timestamp", checkInterval) : "");
+
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
while (rs.next()) {
@@ -88,7 +106,7 @@ class CompactionTxnHandler extends TxnHandler {
"from TXNS, TXN_COMPONENTS " +
"where txn_id = tc_txnid and txn_state = '" + TXN_ABORTED + "' " +
"group by tc_database, tc_table, tc_partition " +
- "having count(*) > " + maxAborted;
+ "having count(*) > " + abortedThreshold;
LOG.debug("Going to execute query <" + s + ">");
rs = stmt.executeQuery(s);
@@ -105,14 +123,14 @@ class CompactionTxnHandler extends TxnHandler {
dbConn.rollback();
} catch (SQLException e) {
LOG.error("Unable to connect to transaction database " + e.getMessage());
- checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + maxAborted + ")");
+ checkRetryable(dbConn, e, "findPotentialCompactions(maxAborted:" + abortedThreshold + ")");
} finally {
close(rs, stmt, dbConn);
}
return response;
}
catch (RetryException e) {
- return findPotentialCompactions(maxAborted);
+ return findPotentialCompactions(abortedThreshold, checkInterval);
}
}
/**
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 2680387..5fb6d86 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3917,6 +3917,30 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
}
}
+ protected String isWithinCheckInterval(String expr, long interval) throws MetaException {
+ String condition;
+ switch (dbProduct) {
+ case DERBY:
+ condition = " {fn TIMESTAMPDIFF(sql_tsi_second, " + expr + ", current_timestamp)} <= " + interval;
+ break;
+ case MYSQL:
+ case POSTGRES:
+ condition = expr + " => current_timestamp - interval '" + interval + "' second";
+ break;
+ case SQLSERVER:
+ condition = "DATEDIFF(second, " + expr + ", current_timestamp) <= " + interval;
+ break;
+ case ORACLE:
+ condition = expr + " => current_timestamp - numtodsinterval(" + interval + " , 'second')";
+ break;
+ default:
+ String msg = "Unknown database product: " + dbProduct.toString();
+ LOG.error(msg);
+ throw new MetaException(msg);
+ }
+ return condition;
+ }
+
/**
* Determine the String that should be used to quote identifiers.
* @param conn Active connection
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index e840758..41d2e79 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -314,13 +314,15 @@ public interface TxnStore extends Configurable {
* This will look through the completed_txn_components table and look for partitions or tables
* that may be ready for compaction. Also, look through txns and txn_components tables for
* aborted transactions that we should add to the list.
- * @param maxAborted Maximum number of aborted queries to allow before marking this as a
- * potential compaction.
+ * @param abortedThreshold number of aborted queries forming a potential compaction request.
* @return list of CompactionInfo structs. These will not have id, type,
* or runAs set since these are only potential compactions not actual ones.
*/
@RetrySemantics.ReadOnly
- Set<CompactionInfo> findPotentialCompactions(int maxAborted) throws MetaException;
+ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold) throws MetaException;
+
+ @RetrySemantics.ReadOnly
+ Set<CompactionInfo> findPotentialCompactions(int abortedThreshold, long checkInterval) throws MetaException;
/**
* This updates COMPACTION_QUEUE. Set runAs username for the case where the request was