You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2020/05/07 11:18:23 UTC
[hive] branch master updated: HIVE-23325: Consolidate acid-related
cleanup tasks (Marton Bod, reviewed by Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f349b8c HIVE-23325: Consolidate acid-related cleanup tasks (Marton Bod, reviewed by Denys Kuzmenko)
f349b8c is described below
commit f349b8c45d94f67add3ec1c64eea6ac88f4e5ed4
Author: Marton Bod <mb...@cloudera.com>
AuthorDate: Thu May 7 13:17:52 2020 +0200
HIVE-23325: Consolidate acid-related cleanup tasks (Marton Bod, reviewed by Denys Kuzmenko)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 8 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 12 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 14 +-
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 20 ---
.../hadoop/hive/ql/TestTxnCommandsForMmTable.java | 10 --
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 6 +-
.../hive/ql/txn/compactor/TestInitiator.java | 37 ----
.../hadoop/hive/metastore/conf/MetastoreConf.java | 35 ++--
.../hadoop/hive/metastore/HiveMetaStore.java | 2 -
.../MaterializationsRebuildLockCleanerTask.java | 6 +-
.../txn/AcidCompactionHistoryService.java | 71 --------
.../hive/metastore/txn/AcidHouseKeeperService.java | 44 ++++-
.../metastore/txn/AcidOpenTxnsCounterService.java | 17 +-
...eSetService.java => AcidTxnCleanerService.java} | 30 ++--
.../hadoop/hive/metastore/txn/TxnHandler.java | 5 +-
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 4 +-
.../hive/metastore/conf/TestMetastoreConf.java | 9 +-
.../metastore/txn/TestAcidTxnCleanerService.java | 193 +++++++++++++++++++++
18 files changed, 298 insertions(+), 225 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 32b0d91..60ae06a 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2927,25 +2927,25 @@ public class HiveConf extends Configuration {
new RangeValidator(0, 100), "Determines how many attempted compaction records will be " +
"retained in compaction history for a given table/partition."),
/**
- * @deprecated Use MetastoreConf.COMPACTOR_HISTORY_REAPER_INTERVAL
+ * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_INTERVAL
*/
@Deprecated
COMPACTOR_HISTORY_REAPER_INTERVAL("hive.compactor.history.reaper.interval", "2m",
new TimeValidator(TimeUnit.MILLISECONDS), "Determines how often compaction history reaper runs"),
/**
- * @deprecated Use MetastoreConf.TIMEDOUT_TXN_REAPER_START
+ * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_START
*/
@Deprecated
HIVE_TIMEDOUT_TXN_REAPER_START("hive.timedout.txn.reaper.start", "100s",
new TimeValidator(TimeUnit.MILLISECONDS), "Time delay of 1st reaper run after metastore start"),
/**
- * @deprecated Use MetastoreConf.TIMEDOUT_TXN_REAPER_INTERVAL
+ * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_INTERVAL
*/
@Deprecated
HIVE_TIMEDOUT_TXN_REAPER_INTERVAL("hive.timedout.txn.reaper.interval", "180s",
new TimeValidator(TimeUnit.MILLISECONDS), "Time interval describing how often the reaper runs"),
/**
- * @deprecated Use MetastoreConf.WRITE_SET_REAPER_INTERVAL
+ * @deprecated Use MetastoreConf.ACID_HOUSEKEEPER_SERVICE_INTERVAL
*/
@Deprecated
WRITE_SET_REAPER_INTERVAL("hive.writeset.reaper.interval", "60s",
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 2557809..fa2ede3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -114,7 +114,9 @@ public class Initiator extends MetaStoreCompactorThread {
Set<CompactionInfo> potentials = txnHandler.findPotentialCompactions(abortedThreshold,
abortedTimeThreshold, compactionInterval)
- .stream().filter(ci -> checkCompactionElig(ci, currentCompactions)).collect(Collectors.toSet());
+ .stream()
+ .filter(ci -> isEligibleForCompaction(ci, currentCompactions))
+ .collect(Collectors.toSet());
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
@@ -153,12 +155,6 @@ public class Initiator extends MetaStoreCompactorThread {
// Check for timed out remote workers.
recoverFailedCompactions(true);
-
- // Clean anything from the txns table that has no components left in txn_components.
- txnHandler.cleanEmptyAbortedAndCommittedTxns();
-
- // Clean TXN_TO_WRITE_ID table for entries under min_uncommitted_txn referred by any open txns.
- txnHandler.cleanTxnToWriteIdTable();
} catch (Throwable t) {
LOG.error("Initiator loop caught unexpected exception this time through the loop: " +
StringUtils.stringifyException(t));
@@ -437,7 +433,7 @@ public class Initiator extends MetaStoreCompactorThread {
return false;
}
- private boolean checkCompactionElig(CompactionInfo ci, ShowCompactResponse currentCompactions) {
+ private boolean isEligibleForCompaction(CompactionInfo ci, ShowCompactResponse currentCompactions) {
LOG.info("Checking to see if we should compact " + ci.getFullPartitionName());
// Check if we already have initiated or are working on a compaction for this partition
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 48bf852..366282a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -47,7 +48,7 @@ import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService;
+import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -1026,6 +1027,7 @@ public class TestTxnCommands2 {
runStatementOnDriver("insert into " + tblName + " values(" + (i + 1) + ", 'foo'),(" + (i + 2) + ", 'bar'),(" + (i + 3) + ", 'baz')");
}
hiveConf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_INITIATOR_ON, true);
int numFailedCompactions = hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
AtomicBoolean stop = new AtomicBoolean(true);
@@ -1045,9 +1047,9 @@ public class TestTxnCommands2 {
checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
hiveConf.setTimeVar(HiveConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, 10, TimeUnit.MILLISECONDS);
- AcidCompactionHistoryService compactionHistoryService = new AcidCompactionHistoryService();
- compactionHistoryService.setConf(hiveConf);
- compactionHistoryService.run();
+ MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
+ houseKeeper.setConf(hiveConf);
+ houseKeeper.run();
checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions,0,0,0,0,numFailedCompactions + numAttemptedCompactions), countCompacts(txnHandler));
txnHandler.compact(new CompactionRequest("default", tblName, CompactionType.MAJOR));
@@ -1060,7 +1062,7 @@ public class TestTxnCommands2 {
numAttemptedCompactions++;
checkCompactionState(new CompactionsByState(numAttemptedCompactions,numFailedCompactions + 2,0,0,0,0,numFailedCompactions + 2 + numAttemptedCompactions), countCompacts(txnHandler));
- compactionHistoryService.run();
+ houseKeeper.run();
//COMPACTOR_HISTORY_RETENTION_FAILED failed compacts left (and no other since we only have failed ones here)
checkCompactionState(new CompactionsByState(
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
@@ -1084,7 +1086,7 @@ public class TestTxnCommands2 {
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED)+ 1), countCompacts(txnHandler));
runCleaner(hiveConf); // transition to Success state
- compactionHistoryService.run();
+ houseKeeper.run();
checkCompactionState(new CompactionsByState(
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED),
hiveConf.getIntVar(HiveConf.ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED),0,0,1,0,
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 5b8c670..e1f669a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsInfoResponse;
import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
@@ -430,10 +429,6 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0));
runCleaner(hiveConf);
- runInitiator(hiveConf);//to make sure any (which is not in this case)
- // 'empty aborted' TXNS metadata is removed
- openResp = txnHandler.getOpenTxns();
- Assert.assertEquals(openResp.toString(), 1, openResp.getOpen_txnsSize());
//we still have 1 aborted (compactor) txn
Assert.assertTrue(openResp.toString(), BitSet.valueOf(openResp.getAbortedBits()).get(0));
Assert.assertEquals(1, TxnDbUtil.countQueryAgent(hiveConf,
@@ -464,10 +459,6 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
//delete metadata about aborted txn from txn_components and files (if any)
runCleaner(hiveConf);
- runInitiator(hiveConf);//to clean 'empty aborted'
- openResp = txnHandler.getOpenTxns();
- //now the aborted compactor txn is gone
- Assert.assertEquals(openResp.toString(), 0, openResp.getOpen_txnsSize());
}
/**
@@ -479,15 +470,9 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
- // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
- hiveConf.set("metastore.txn.opentxn.timeout", "1");
- runInitiator(hiveConf);
- hiveConf.set("metastore.txn.opentxn.timeout", "1000");
- assertOneTxn();
assertTableIsEmpty("TXN_COMPONENTS");
runCleaner(hiveConf);
- assertOneTxn();
assertTableIsEmpty("TXN_COMPONENTS");
}
@@ -504,14 +489,9 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
runWorker(hiveConf);
- // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
- hiveConf.set("metastore.txn.opentxn.timeout", "1");
- runInitiator(hiveConf);
- hiveConf.set("metastore.txn.opentxn.timeout", "1000");
assertTableIsEmpty("TXN_COMPONENTS");
runCleaner(hiveConf);
- assertOneTxn();
assertTableIsEmpty("TXN_COMPONENTS");
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
index eac2c63..535bf11 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommandsForMmTable.java
@@ -465,11 +465,6 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
- // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
- Thread.sleep(1000);
- runInitiator(hiveConf);
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
// Initiate a major compaction request on the table.
runStatementOnDriver("alter table " + TableExtended.MMTBL + " compact 'MAJOR'");
@@ -478,16 +473,11 @@ public class TestTxnCommandsForMmTable extends TxnCommandsBaseForTests {
runWorker(hiveConf);
verifyDirAndResult(2, true);
- // Clean committed after TXN_OPENTXN_TIMEOUT, one transaction should always remain
- Thread.sleep(1000);
- runInitiator(hiveConf);
// Run Cleaner.
runCleaner(hiveConf);
Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from COMPLETED_TXN_COMPONENTS"),
0,
TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPLETED_TXN_COMPONENTS"));
- Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXNS"),
- 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXNS"));
verifyDirAndResult(0, true);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 1687425..2adabe7 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hive.metastore.api.ShowLocksRequest;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService;
+import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.ql.TestTxnCommands2;
import org.junit.Assert;
import org.apache.hadoop.hive.common.FileUtils;
@@ -1167,7 +1167,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
Assert.assertEquals("WRITE_SET mismatch: " + TxnDbUtil.queryToString(conf, "select * from \"WRITE_SET\""),
1, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\""));
- AcidWriteSetService houseKeeper = new AcidWriteSetService();
+ MetastoreTaskThread houseKeeper = new AcidHouseKeeperService();
houseKeeper.setConf(conf);
houseKeeper.run();
//since T3 overlaps with Long Running (still open) GC does nothing
@@ -1277,7 +1277,7 @@ public class TestDbTxnManager2 extends DbTxnManagerEndToEndTestBase{
txnMgr.openTxn(ctx, "Long Running");
Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
// Now we can clean the write_set
- MetastoreTaskThread writeSetService = new AcidWriteSetService();
+ MetastoreTaskThread writeSetService = new AcidHouseKeeperService();
writeSetService.setConf(conf);
writeSetService.run();
Assert.assertEquals(0, TxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\""));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 058430f..279de19 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -19,26 +19,21 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
-import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.LockComponent;
import org.apache.hadoop.hive.metastore.api.LockLevel;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockType;
-import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
-import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -202,38 +197,6 @@ public class TestInitiator extends CompactorTest {
Assert.assertEquals(0, rsp.getCompactsSize());
}
- @Test
- public void cleanEmptyAbortedTxns() throws Exception {
- // Test that we are cleaning aborted transactions with no components left in txn_components.
- // Put one aborted transaction with an entry in txn_components to make sure we don't
- // accidently clean it too.
- Table t = newTable("default", "ceat", false);
-
- long txnid = openTxn();
- LockComponent comp = new LockComponent(LockType.SHARED_WRITE, LockLevel.TABLE, "default");
- comp.setTablename("ceat");
- comp.setOperationType(DataOperationType.UPDATE);
- List<LockComponent> components = new ArrayList<LockComponent>(1);
- components.add(comp);
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnid);
- LockResponse res = txnHandler.lock(req);
- txnHandler.abortTxn(new AbortTxnRequest(txnid));
- txnHandler.setOpenTxnTimeOutMillis(30000);
- conf.setIntVar(HiveConf.ConfVars.HIVE_TXN_MAX_OPEN_BATCH, TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50);
- OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(
- TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname"));
- txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids()));
- GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
- Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
- txnHandler.setOpenTxnTimeOutMillis(1);
- startInitiator();
-
- openTxns = txnHandler.getOpenTxns();
- // txnid:1 has txn_components, txnid:TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1 is the last
- Assert.assertEquals(2, openTxns.getOpen_txnsSize());
- }
-
/**
* Test that HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD triggers compaction.
*
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index ab99a84..d1db106 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -86,17 +86,14 @@ public class MetastoreConf {
static final String METASTORE_DELEGATION_MANAGER_CLASS =
"org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager";
@VisibleForTesting
- static final String ACID_COMPACTION_HISTORY_SERVICE_CLASS =
- "org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService";
- @VisibleForTesting
static final String ACID_HOUSE_KEEPER_SERVICE_CLASS =
"org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService";
@VisibleForTesting
+ static final String ACID_TXN_CLEANER_SERVICE_CLASS =
+ "org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService";
+ @VisibleForTesting
static final String ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS =
"org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService";
- @VisibleForTesting
- static final String ACID_WRITE_SET_SERVICE_CLASS =
- "org.apache.hadoop.hive.metastore.txn.AcidWriteSetService";
public static final String METASTORE_AUTHENTICATION_LDAP_USERMEMBERSHIPKEY_NAME =
"metastore.authentication.ldap.userMembershipKey";
@@ -271,6 +268,15 @@ public class MetastoreConf {
public enum ConfVars {
// alpha order, PLEASE!
+ ACID_HOUSEKEEPER_SERVICE_INTERVAL("metastore.acid.housekeeper.interval",
+ "hive.metastore.acid.housekeeper.interval", 60, TimeUnit.SECONDS,
+ "Time interval describing how often the acid housekeeper runs."),
+ ACID_HOUSEKEEPER_SERVICE_START("metastore.acid.housekeeper.start",
+ "hive.metastore.acid.housekeeper.start", 60, TimeUnit.SECONDS,
+ "Time delay of 1st acid housekeeper run after metastore has started."),
+ ACID_TXN_CLEANER_INTERVAL("metastore.acid.txn.cleaner.interval",
+ "hive.metastore.acid.txn.cleaner.interval", 10, TimeUnit.SECONDS,
+ "Time interval describing how often aborted and committed txns are cleaned."),
ADDED_JARS("metastore.added.jars.path", "hive.added.jars.path", "",
"This an internal parameter."),
AGGREGATE_STATS_CACHE_CLEAN_UNTIL("metastore.aggregate.stats.cache.clean.until",
@@ -382,9 +388,6 @@ public class MetastoreConf {
"has an infinite lifetime."),
CLIENT_SOCKET_TIMEOUT("metastore.client.socket.timeout", "hive.metastore.client.socket.timeout", 600,
TimeUnit.SECONDS, "MetaStore Client socket timeout in seconds"),
- COMPACTOR_HISTORY_REAPER_INTERVAL("metastore.compactor.history.reaper.interval",
- "hive.compactor.history.reaper.interval", 2, TimeUnit.MINUTES,
- "Determines how often compaction history reaper runs"),
COMPACTOR_HISTORY_RETENTION_ATTEMPTED("metastore.compactor.history.retention.attempted",
"hive.compactor.history.retention.attempted", 2,
new RangeValidator(0, 100), "Determines how many attempted compaction records will be " +
@@ -1059,12 +1062,11 @@ public class MetastoreConf {
"or in server mode. They must implement " + METASTORE_TASK_THREAD_CLASS),
TASK_THREADS_REMOTE_ONLY("metastore.task.threads.remote", "metastore.task.threads.remote",
ACID_HOUSE_KEEPER_SERVICE_CLASS + "," +
+ ACID_TXN_CLEANER_SERVICE_CLASS + "," +
ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS + "," +
- ACID_COMPACTION_HISTORY_SERVICE_CLASS + "," +
- ACID_WRITE_SET_SERVICE_CLASS + "," +
MATERIALZIATIONS_REBUILD_LOCK_CLEANER_TASK_CLASS + "," +
PARTITION_MANAGEMENT_TASK_CLASS,
- "Command separated list of tasks that will be started in separate threads. These will be" +
+ "Comma-separated list of tasks that will be started in separate threads. These will be" +
" started only when the metastore is running as a separate service. They must " +
"implement " + METASTORE_TASK_THREAD_CLASS),
TCP_KEEP_ALIVE("metastore.server.tcp.keepalive",
@@ -1150,12 +1152,6 @@ public class MetastoreConf {
"metastore. SEQUENTIAL implies that the first valid metastore from the URIs specified " +
"through hive.metastore.uris will be picked. RANDOM implies that the metastore " +
"will be picked randomly"),
- TIMEDOUT_TXN_REAPER_START("metastore.timedout.txn.reaper.start",
- "hive.timedout.txn.reaper.start", 100, TimeUnit.SECONDS,
- "Time delay of 1st reaper run after metastore start"),
- TIMEDOUT_TXN_REAPER_INTERVAL("metastore.timedout.txn.reaper.interval",
- "hive.timedout.txn.reaper.interval", 180, TimeUnit.SECONDS,
- "Time interval describing how often the reaper runs"),
TOKEN_SIGNATURE("metastore.token.signature", "hive.metastore.token.signature", "",
"The delegation token service name to match when selecting a token from the current user's tokens."),
METASTORE_CACHE_CAN_USE_EVENT("metastore.cache.can.use.event", "hive.metastore.cache.can.use.event", false,
@@ -1252,9 +1248,6 @@ public class MetastoreConf {
"hive.metastore.warehouse.external.dir", "",
"Default location for external tables created in the warehouse. " +
"If not set or null, then the normal warehouse location will be used as the default location."),
- WRITE_SET_REAPER_INTERVAL("metastore.writeset.reaper.interval",
- "hive.writeset.reaper.interval", 60, TimeUnit.SECONDS,
- "Frequency of WriteSet reaper runs"),
WM_DEFAULT_POOL_SIZE("metastore.wm.default.pool.size",
"hive.metastore.wm.default.pool.size", 4,
"The size of a default pool to create when creating an empty resource plan;\n" +
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
index 117348c..b13ad96 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
@@ -10293,8 +10293,6 @@ public class HiveMetaStore extends ThriftHiveMetastore {
MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_WORKER_THREADS));
HMSHandler.LOG
.info("hive.metastore.runworker.in = {}", MetastoreConf.getVar(conf, ConfVars.HIVE_METASTORE_RUNWORKER_IN));
- HMSHandler.LOG.info("metastore.compactor.history.reaper.interval = {}",
- MetastoreConf.getTimeVar(conf, ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL, TimeUnit.MINUTES));
HMSHandler.LOG.info("metastore.compactor.history.retention.attempted = {}",
MetastoreConf.getIntVar(conf, ConfVars.COMPACTOR_HISTORY_RETENTION_ATTEMPTED));
HMSHandler.LOG.info("metastore.compactor.history.retention.failed = {}",
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
index d35c960..4c2d5e3 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/MaterializationsRebuildLockCleanerTask.java
@@ -71,10 +71,10 @@ public class MaterializationsRebuildLockCleanerTask implements MetastoreTaskThre
LOG.debug("Number of materialization locks deleted: " + removedCnt);
}
}
- } catch(Throwable t) {
- LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ } catch (Throwable t) {
+ LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t);
} finally {
- if(handle != null) {
+ if (handle != null) {
handle.releaseLocks();
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidCompactionHistoryService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidCompactionHistoryService.java
deleted file mode 100644
index e96a7ba..0000000
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidCompactionHistoryService.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
-import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * Purges obsolete items from compaction history data
- */
-public class AcidCompactionHistoryService implements MetastoreTaskThread {
- private static final Logger LOG = LoggerFactory.getLogger(AcidCompactionHistoryService.class);
-
- private Configuration conf;
- private TxnStore txnHandler;
-
- @Override
- public void setConf(Configuration configuration) {
- this.conf = configuration;
- txnHandler = TxnUtils.getTxnStore(conf);
- }
-
- @Override
- public Configuration getConf() {
- return conf;
- }
-
- @Override
- public long runFrequency(TimeUnit unit) {
- return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.COMPACTOR_HISTORY_REAPER_INTERVAL,
- unit);
- }
-
- @Override
- public void run() {
- TxnStore.MutexAPI.LockHandle handle = null;
- try {
- handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.CompactionHistory.name());
- long startTime = System.currentTimeMillis();
- txnHandler.purgeCompactionHistory();
- LOG.debug("History reaper reaper ran for " + (System.currentTimeMillis() - startTime)/1000 +
- "seconds.");
- } catch(Throwable t) {
- LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
- } finally {
- if(handle != null) {
- handle.releaseLocks();
- }
- }
- }
-}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java
index c4a488b..177abb7 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidHouseKeeperService.java
@@ -17,27 +17,34 @@
*/
package org.apache.hadoop.hive.metastore.txn;
+import org.apache.commons.lang3.Functions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
+import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
+import static org.apache.commons.lang3.Functions.FailableRunnable;
+
/**
* Performs background tasks for Transaction management in Hive.
* Runs inside Hive Metastore Service.
*/
public class AcidHouseKeeperService implements MetastoreTaskThread {
+
private static final Logger LOG = LoggerFactory.getLogger(AcidHouseKeeperService.class);
private Configuration conf;
+ private boolean isCompactorEnabled;
private TxnStore txnHandler;
@Override
public void setConf(Configuration configuration) {
- this.conf = configuration;
+ conf = configuration;
+ isCompactorEnabled = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON);
txnHandler = TxnUtils.getTxnStore(conf);
}
@@ -48,7 +55,7 @@ public class AcidHouseKeeperService implements MetastoreTaskThread {
@Override
public long runFrequency(TimeUnit unit) {
- return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.TIMEDOUT_TXN_REAPER_INTERVAL, unit);
+ return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_HOUSEKEEPER_SERVICE_INTERVAL, unit);
}
@Override
@@ -56,16 +63,35 @@ public class AcidHouseKeeperService implements MetastoreTaskThread {
TxnStore.MutexAPI.LockHandle handle = null;
try {
handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.HouseKeeper.name());
- long startTime = System.currentTimeMillis();
- txnHandler.performTimeOuts();
- LOG.debug("timeout reaper ran for " + (System.currentTimeMillis() - startTime)/1000 +
- "seconds.");
- } catch(Throwable t) {
- LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ LOG.info("Starting to run AcidHouseKeeperService.");
+ long start = System.currentTimeMillis();
+ cleanTheHouse();
+ LOG.debug("Total time AcidHouseKeeperService took: {} seconds.", elapsedSince(start));
+ } catch (Throwable t) {
+ LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t);
} finally {
- if(handle != null) {
+ if (handle != null) {
handle.releaseLocks();
}
}
}
+
+ private void cleanTheHouse() {
+ performTask(txnHandler::performTimeOuts, "Cleaning timed out txns and locks");
+ performTask(txnHandler::performWriteSetGC, "Cleaning obsolete write set entries");
+ performTask(txnHandler::cleanTxnToWriteIdTable, "Cleaning obsolete TXN_TO_WRITE_ID entries");
+ if (isCompactorEnabled) {
+ performTask(txnHandler::purgeCompactionHistory, "Cleaning obsolete compaction history entries");
+ }
+ }
+
+ private void performTask(FailableRunnable<MetaException> task, String description) {
+ long start = System.currentTimeMillis();
+ Functions.run(task);
+ LOG.debug("{} took {} seconds.", description, elapsedSince(start));
+ }
+
+ private long elapsedSince(long start) {
+ return (System.currentTimeMillis() - start) / 1000;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
index 2ad5a89..fe78800 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidOpenTxnsCounterService.java
@@ -29,7 +29,9 @@ import java.util.concurrent.TimeUnit;
* Runs inside Hive Metastore Service.
*/
public class AcidOpenTxnsCounterService implements MetastoreTaskThread {
+
private static final Logger LOG = LoggerFactory.getLogger(AcidOpenTxnsCounterService.class);
+ private static final int LOG_INTERVAL_MS = 60 * 1000;
private Configuration conf;
private int isAliveCounter = 0;
@@ -44,18 +46,17 @@ public class AcidOpenTxnsCounterService implements MetastoreTaskThread {
@Override
public void run() {
try {
- long startTime = System.currentTimeMillis();
+ long start = System.currentTimeMillis();
isAliveCounter++;
txnHandler.countOpenTxns();
- if (System.currentTimeMillis() - lastLogTime > 60 * 1000) {
- LOG.info("AcidOpenTxnsCounterService ran for " +
- ((System.currentTimeMillis() - startTime) / 1000) +
- " seconds. isAliveCounter = " + isAliveCounter);
- lastLogTime = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
+ if (now - lastLogTime > LOG_INTERVAL_MS) {
+ LOG.info("Open txn counter ran for {} seconds. isAliveCounter: {}", (now - start) / 1000, isAliveCounter);
+ lastLogTime = now;
}
}
- catch(Throwable t) {
- LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ catch (Throwable t) {
+ LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t);
}
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidWriteSetService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java
similarity index 70%
rename from standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidWriteSetService.java
rename to standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java
index 5ec513d..d2800bd 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidWriteSetService.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/AcidTxnCleanerService.java
@@ -26,17 +26,19 @@ import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
/**
- * Periodically cleans WriteSet tracking information used in Transaction management
+ * Periodically cleans out empty aborted and committed txns from the TXNS table.
+ * Runs inside Hive Metastore Service.
*/
-public class AcidWriteSetService implements MetastoreTaskThread {
- private static final Logger LOG = LoggerFactory.getLogger(AcidWriteSetService.class);
+public class AcidTxnCleanerService implements MetastoreTaskThread {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AcidTxnCleanerService.class);
private Configuration conf;
private TxnStore txnHandler;
@Override
public void setConf(Configuration configuration) {
- this.conf = configuration;
+ conf = configuration;
txnHandler = TxnUtils.getTxnStore(conf);
}
@@ -47,23 +49,27 @@ public class AcidWriteSetService implements MetastoreTaskThread {
@Override
public long runFrequency(TimeUnit unit) {
- return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.WRITE_SET_REAPER_INTERVAL, unit);
+ return MetastoreConf.getTimeVar(conf, MetastoreConf.ConfVars.ACID_TXN_CLEANER_INTERVAL, unit);
}
@Override
public void run() {
TxnStore.MutexAPI.LockHandle handle = null;
try {
- handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.WriteSetCleaner.name());
- long startTime = System.currentTimeMillis();
- txnHandler.performWriteSetGC();
- LOG.debug("cleaner ran for " + (System.currentTimeMillis() - startTime)/1000 + "seconds.");
- } catch(Throwable t) {
- LOG.error("Serious error in {}", Thread.currentThread().getName(), ": {}" + t.getMessage(), t);
+ handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.TxnCleaner.name());
+ long start = System.currentTimeMillis();
+ txnHandler.cleanEmptyAbortedAndCommittedTxns();
+ LOG.debug("Txn cleaner service took: {} seconds.", elapsedSince(start));
+ } catch (Throwable t) {
+ LOG.error("Unexpected error in thread: {}, message: {}", Thread.currentThread().getName(), t.getMessage(), t);
} finally {
- if(handle != null) {
+ if (handle != null) {
handle.releaseLocks();
}
}
}
+
+ private long elapsedSince(long start) {
+ return (System.currentTimeMillis() - start) / 1000;
+ }
}
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index 7c39375..cf41ef8 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -3594,15 +3594,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI {
* Retry-by-caller note: this is only idempotent assuming it's only called by dropTable/Db/etc
* operations.
*
- * HIVE_LOCKS is (presumably) expected to be removed by AcidHouseKeeperServices
- * WS_SET is (presumably) expected to be removed by AcidWriteSetService
+ * HIVE_LOCKS and WS_SET are cleaned up by {@link AcidHouseKeeperService}, if turned on
*/
@Override
@RetrySemantics.Idempotent
public void cleanupRecords(HiveObjectType type, Database db, Table table,
Iterator<Partition> partitionIterator) throws MetaException {
- // cleanup should be done only for objecdts belonging to default catalog
+ // cleanup should be done only for objects belonging to default catalog
final String defaultCatalog = getDefaultCatalog(conf);
try {
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 28f22e6..1e177f4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -45,8 +45,8 @@ public interface TxnStore extends Configurable {
String TXN_KEY_START = "_meta";
enum MUTEX_KEY {
- Initiator, Cleaner, HouseKeeper, CompactionHistory, CheckLock,
- WriteSetCleaner, CompactionScheduler, WriteIdAllocator, MaterializationRebuild
+ Initiator, Cleaner, HouseKeeper, CheckLock, TxnCleaner,
+ CompactionScheduler, WriteIdAllocator, MaterializationRebuild
}
// Compactor states (Should really be enum)
String INITIATED_RESPONSE = "initiated";
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
index 9905a14..c73de77 100644
--- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/conf/TestMetastoreConf.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.metastore.conf;
import org.apache.hadoop.hive.metastore.annotation.MetastoreUnitTest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.txn.AcidTxnCleanerService;
import org.hamcrest.CoreMatchers;
import org.hamcrest.core.StringContains;
import org.hamcrest.core.StringEndsWith;
@@ -47,10 +48,8 @@ import org.apache.hadoop.hive.metastore.MetastoreTaskThread;
import org.apache.hadoop.hive.metastore.RuntimeStatsCleanerTask;
import org.apache.hadoop.hive.metastore.events.EventCleanerTask;
import org.apache.hadoop.hive.metastore.security.MetastoreDelegationTokenManager;
-import org.apache.hadoop.hive.metastore.txn.AcidCompactionHistoryService;
import org.apache.hadoop.hive.metastore.txn.AcidHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.AcidOpenTxnsCounterService;
-import org.apache.hadoop.hive.metastore.txn.AcidWriteSetService;
@Category(MetastoreUnitTest.class)
public class TestMetastoreConf {
@@ -465,13 +464,11 @@ public class TestMetastoreConf {
EventCleanerTask.class.getName());
Assert.assertEquals(MetastoreConf.METASTORE_DELEGATION_MANAGER_CLASS,
MetastoreDelegationTokenManager.class.getName());
- Assert.assertEquals(MetastoreConf.ACID_COMPACTION_HISTORY_SERVICE_CLASS,
- AcidCompactionHistoryService.class.getName());
Assert.assertEquals(MetastoreConf.ACID_HOUSE_KEEPER_SERVICE_CLASS,
AcidHouseKeeperService.class.getName());
+ Assert.assertEquals(MetastoreConf.ACID_TXN_CLEANER_SERVICE_CLASS,
+ AcidTxnCleanerService.class.getName());
Assert.assertEquals(MetastoreConf.ACID_OPEN_TXNS_COUNTER_SERVICE_CLASS,
AcidOpenTxnsCounterService.class.getName());
- Assert.assertEquals(MetastoreConf.ACID_WRITE_SET_SERVICE_CLASS,
- AcidWriteSetService.class.getName());
}
}
diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java
new file mode 100644
index 0000000..ba8ba73
--- /dev/null
+++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/txn/TestAcidTxnCleanerService.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.txn;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.LockComponent;
+import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
+import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse;
+import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+
+import static java.util.Collections.singletonList;
+
+/**
+ * Testing whether AcidTxnCleanerService removes the correct records
+ * from the TXNS table (via TxnStore).
+ */
+public class TestAcidTxnCleanerService {
+
+ private AcidTxnCleanerService underTest;
+ private TxnStore txnHandler;
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = MetastoreConf.newMetastoreConf();
+ underTest = new AcidTxnCleanerService();
+ underTest.setConf(conf);
+ txnHandler = TxnUtils.getTxnStore(conf);
+ txnHandler.setOpenTxnTimeOutMillis(100);
+ TxnDbUtil.prepDb(conf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TxnDbUtil.cleanDb(conf);
+ }
+
+ @Test
+ public void cleansEmptyAbortedTxns() throws Exception {
+ for (int i = 0; i < 5; ++i) {
+ long txnid = openTxn();
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+ // +1 represents the initial TXNS record (txnid=0)
+ Assert.assertEquals(5 + 1, getTxnCount());
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2);
+
+ underTest.run();
+
+ // always leaves the MAX(TXN_ID) in the TXNS table
+ Assert.assertEquals(1, getTxnCount());
+ Assert.assertEquals(5, getMaxTxnId());
+ }
+
+ @Test
+ public void doesNotCleanAbortedTxnsThatAreNonEmpty() throws Exception {
+ for (int i = 0; i < 5; ++i) {
+ openNonEmptyThenAbort();
+ }
+ Assert.assertEquals(5 + 1, getTxnCount());
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2);
+
+ underTest.run();
+
+ // deletes only the initial (committed) TXNS record
+ Assert.assertEquals(5, getTxnCount());
+ Assert.assertEquals(5, getMaxTxnId());
+ }
+
+ @Test
+ public void cleansAllCommittedTxns() throws Exception {
+ for (int i = 0; i < 5; ++i) {
+ long txnid = openTxn();
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+ }
+ Assert.assertEquals(5 + 1, getTxnCount());
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2);
+
+ underTest.run();
+
+ // always leaves the MAX(TXN_ID) in the TXNS table
+ Assert.assertEquals(1, getTxnCount());
+ Assert.assertEquals(5, getMaxTxnId());
+ }
+
+ @Test
+ public void cleansCommittedAndEmptyAbortedOnly() throws Exception {
+ for (int i = 0; i < 5; ++i) {
+ // commit one
+ long txnid = openTxn();
+ txnHandler.commitTxn(new CommitTxnRequest(txnid));
+ // abort one empty
+ txnid = openTxn();
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ // abort one non-empty
+ openNonEmptyThenAbort();
+ }
+ Assert.assertEquals(15 + 1, getTxnCount());
+ Thread.sleep(txnHandler.getOpenTxnTimeOutMillis() * 2);
+
+ underTest.run();
+
+ // kept only the 5 non-empty aborted ones
+ Assert.assertEquals(5, getTxnCount());
+ Assert.assertEquals(15, getMaxTxnId());
+ }
+
+ @Test
+ public void cleansEmptyAbortedBatchTxns() throws Exception {
+ // add one non-empty aborted txn
+ openNonEmptyThenAbort();
+ // add a batch of empty, aborted txns
+ txnHandler.setOpenTxnTimeOutMillis(30000);
+ MetastoreConf.setLongVar(conf, MetastoreConf.ConfVars.TXN_MAX_OPEN_BATCH,
+ TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50);
+ OpenTxnsResponse resp = txnHandler.openTxns(new OpenTxnRequest(
+ TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50, "user", "hostname"));
+ txnHandler.abortTxns(new AbortTxnsRequest(resp.getTxn_ids()));
+ GetOpenTxnsResponse openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, openTxns.getOpen_txnsSize());
+ txnHandler.setOpenTxnTimeOutMillis(1);
+
+ underTest.run();
+
+ openTxns = txnHandler.getOpenTxns();
+ Assert.assertEquals(2, openTxns.getOpen_txnsSize());
+ Assert.assertEquals(TxnStore.TIMED_OUT_TXN_ABORT_BATCH_SIZE + 50 + 1, getMaxTxnId());
+ }
+
+ private void openNonEmptyThenAbort() throws MetaException, NoSuchTxnException, TxnAbortedException {
+ long txnid = openTxn();
+ LockRequest req = getLockRequest();
+ req.setTxnid(txnid);
+ txnHandler.lock(req);
+ txnHandler.abortTxn(new AbortTxnRequest(txnid));
+ }
+
+ private long openTxn() throws MetaException {
+ return txnHandler
+ .openTxns(new OpenTxnRequest(1, "me", "localhost"))
+ .getTxn_ids()
+ .get(0);
+ }
+
+ private LockRequest getLockRequest() {
+ LockComponent comp = new LockComponentBuilder()
+ .setDbName("default")
+ .setTableName("ceat")
+ .setOperationType(DataOperationType.UPDATE)
+ .setSharedWrite()
+ .build();
+ return new LockRequest(singletonList(comp), "me", "localhost");
+ }
+
+ private long getTxnCount() throws Exception {
+ return TxnDbUtil.countQueryAgent(conf, "SELECT COUNT(*) FROM \"TXNS\"");
+ }
+
+ private long getMaxTxnId() throws Exception {
+ return TxnDbUtil.countQueryAgent(conf, "SELECT MAX(\"TXN_ID\") FROM \"TXNS\"");
+ }
+}