You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ve...@apache.org on 2023/04/24 07:22:53 UTC
[hive] branch master updated: HIVE-27020: Implement a separate handler to handle aborted transaction cleanup (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
This is an automated email from the ASF dual-hosted git repository.
veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 45cafcaea79 HIVE-27020: Implement a separate handler to handle aborted transaction cleanup (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
45cafcaea79 is described below
commit 45cafcaea7907c831a5e024726e65e04f9b32216
Author: Sourabh Badhya <42...@users.noreply.github.com>
AuthorDate: Mon Apr 24 12:52:41 2023 +0530
HIVE-27020: Implement a separate handler to handle aborted transaction cleanup (Sourabh Badhya, reviewed by Laszlo Vegh, Denys Kuzmenko)
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 4 +-
.../hive/ql/txn/compactor/TestCompactor.java | 115 +++++++-
.../hive/ql/txn/compactor/TestCompactorBase.java | 1 +
...pactorWithAbortCleanupUsingCompactionCycle.java | 31 ++
.../hive/ql/txn/compactor/TestInitiator2.java | 4 +-
...iator2WithAbortCleanupUsingCompactionCycle.java | 30 ++
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 12 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 5 +-
.../ql/txn/compactor/MetaStoreCompactorThread.java | 2 +-
.../txn/compactor/handler/AbortedTxnCleaner.java | 158 ++++++++++
.../txn/compactor/handler/CompactionCleaner.java | 90 +-----
.../hive/ql/txn/compactor/handler/TaskHandler.java | 80 +++++
.../txn/compactor/handler/TaskHandlerFactory.java | 12 +-
.../metastore/txn/TestCompactionTxnHandler.java | 10 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 26 +-
...mands2WithAbortCleanupUsingCompactionCycle.java | 37 +++
.../hive/ql/txn/compactor/CompactorTest.java | 41 ++-
.../TestAbortCleanupUsingCompactionCycle.java | 30 ++
...pUsingCompactionCycleWithMinHistoryWriteId.java | 35 +++
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 19 +-
.../hive/ql/txn/compactor/TestInitiator.java | 69 +++--
...tiatorWithAbortCleanupUsingCompactionCycle.java | 28 ++
.../compactor/handler/TestAbortedTxnCleaner.java | 323 ++++++++++++++++++++
.../hive/ql/txn/compactor/handler/TestHandler.java | 4 +-
.../hadoop/hive/metastore/conf/MetastoreConf.java | 4 +
.../apache/hadoop/hive/metastore/HMSHandler.java | 2 +-
.../hadoop/hive/metastore/txn/CompactionInfo.java | 1 -
.../hive/metastore/txn/CompactionTxnHandler.java | 328 +++++++++++++--------
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 18 +-
.../apache/hadoop/hive/metastore/txn/TxnUtils.java | 47 ++-
30 files changed, 1268 insertions(+), 298 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 7e6903a39d6..b6835928011 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3272,11 +3272,11 @@ public class HiveConf extends Configuration {
HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD("hive.compactor.abortedtxn.threshold", 1000,
"Number of aborted transactions involving a given table or partition that will trigger\n" +
- "a major compaction."),
+ "a major compaction / cleanup of aborted directories."),
HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD("hive.compactor.aborted.txn.time.threshold", "12h",
new TimeValidator(TimeUnit.HOURS),
- "Age of table/partition's oldest aborted transaction when compaction will be triggered. " +
+ "Age of table/partition's oldest aborted transaction when compaction / cleanup of aborted directories will be triggered. " +
"Default time unit is: hours. Set to a negative number to disable."),
HIVE_COMPACTOR_ACTIVE_DELTA_DIR_THRESHOLD("hive.compactor.active.delta.dir.threshold", 200,
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
index d0f3ebd19d6..dead15e1799 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -50,6 +52,11 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandler;
+import org.apache.hadoop.hive.ql.txn.compactor.handler.TaskHandlerFactory;
import org.apache.hive.streaming.HiveStreamingConnection;
import org.apache.hive.streaming.StreamingConnection;
import org.apache.hive.streaming.StreamingException;
@@ -75,7 +82,9 @@ import static org.apache.hadoop.hive.ql.TestTxnCommands2.runCleaner;
import static org.apache.hadoop.hive.ql.TestTxnCommands2.runInitiator;
import static org.apache.hadoop.hive.ql.TestTxnCommands2.runWorker;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
@@ -1064,7 +1073,78 @@ public class TestCompactor extends TestCompactorBase {
connection2.close();
}
+ @Test
+ public void testAbortAfterMarkCleaned() throws Exception {
+ assumeTrue(MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER));
+ String dbName = "default";
+ String tableName = "cws";
+
+ String agentInfo = "UT_" + Thread.currentThread().getName();
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a STRING, b STRING) " + //currently ACID requires table to be bucketed
+ " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver);
+ executeStatementOnDriver("insert into table " + tableName + " values ('1', '2'), ('3', '4') ", driver);
+ executeStatementOnDriver("insert into table " + tableName + " values ('1', '2'), ('3', '4') ", driver);
+
+
+ StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+ .withFieldDelimiter(',')
+ .build();
+
+ // Create three folders with two different transactions
+ HiveStreamingConnection connection1 = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tableName)
+ .withAgentInfo(agentInfo)
+ .withHiveConf(conf)
+ .withRecordWriter(writer)
+ .withStreamingOptimizations(true)
+ .withTransactionBatchSize(1)
+ .connect();
+
+ HiveStreamingConnection connection2 = HiveStreamingConnection.newBuilder()
+ .withDatabase(dbName)
+ .withTable(tableName)
+ .withAgentInfo(agentInfo)
+ .withHiveConf(conf)
+ .withRecordWriter(writer)
+ .withStreamingOptimizations(true)
+ .withTransactionBatchSize(1)
+ .connect();
+
+ // Abort a transaction which writes data.
+ connection1.beginTransaction();
+ connection1.write("1,1".getBytes());
+ connection1.write("2,1".getBytes());
+ connection1.abortTransaction();
+
+ // Open a txn which is opened and long running.
+ connection2.beginTransaction();
+ connection2.write("3,1".getBytes());
+
+ Cleaner cleaner = new Cleaner();
+ TxnStore mockedTxnHandler = Mockito.spy(TxnUtils.getTxnStore(conf));
+ doAnswer(invocationOnMock -> {
+ connection2.abortTransaction();
+ return invocationOnMock.callRealMethod();
+ }).when(mockedTxnHandler).markCleaned(any(), eq(false));
+
+ MetadataCache metadataCache = new MetadataCache(false);
+ FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+ cleaner.setConf(conf);
+ List<TaskHandler> cleanupHandlers = TaskHandlerFactory.getInstance()
+ .getHandlers(conf, mockedTxnHandler, metadataCache, false, fsRemover);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(cleanupHandlers);
+ cleaner.run();
+
+ int count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from TXN_COMPONENTS");
+ Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from TXN_COMPONENTS"), 1, count);
+ }
+
private void assertAndCompactCleanAbort(String dbName, String tblName, boolean partialAbort, boolean singleSession) throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
IMetaStoreClient msClient = new HiveMetaStoreClient(conf);
TxnStore txnHandler = TxnUtils.getTxnStore(conf);
Table table = msClient.getTable(dbName, tblName);
@@ -1083,14 +1163,17 @@ public class TestCompactor extends TestCompactorBase {
count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 1, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 1, count);
runWorker(conf);
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(1, rsp.getCompacts().size());
- Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
- Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
- Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+ Assert.assertEquals(useCleanerForAbortCleanup ? 0 : 1, rsp.getCompacts().size());
+ if (!useCleanerForAbortCleanup) {
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState());
+ Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+ Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+ }
runCleaner(conf);
@@ -1108,10 +1191,12 @@ public class TestCompactor extends TestCompactorBase {
}
rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(1, rsp.getCompacts().size());
- Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
- Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
- Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+ Assert.assertEquals(useCleanerForAbortCleanup ? 0 : 1, rsp.getCompacts().size());
+ if (!useCleanerForAbortCleanup) {
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+ Assert.assertEquals("cws", rsp.getCompacts().get(0).getTablename());
+ Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+ }
}
@Test
@@ -1119,6 +1204,7 @@ public class TestCompactor extends TestCompactorBase {
String dbName = "default";
String tblName1 = "cws1";
String tblName2 = "cws2";
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
HiveStreamingConnection connection1 = prepareTableAndConnection(dbName, tblName1, 1);
HiveStreamingConnection connection2 = prepareTableAndConnection(dbName, tblName2, 1);
@@ -1155,7 +1241,8 @@ public class TestCompactor extends TestCompactorBase {
count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 2, count);
runWorker(conf);
runWorker(conf);
@@ -1203,6 +1290,7 @@ public class TestCompactor extends TestCompactorBase {
@Test
public void testCleanDynPartAbortNoDataLoss() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
String dbName = "default";
String tblName = "cws";
@@ -1226,7 +1314,8 @@ public class TestCompactor extends TestCompactorBase {
runInitiator(conf);
int count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
- Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 4, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 3 : 4, count);
runWorker(conf);
runWorker(conf);
@@ -1258,6 +1347,7 @@ public class TestCompactor extends TestCompactorBase {
public void testCleanAbortAndMinorCompact() throws Exception {
String dbName = "default";
String tblName = "cws";
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
HiveStreamingConnection connection = prepareTableAndConnection(dbName, tblName, 1);
@@ -1273,7 +1363,8 @@ public class TestCompactor extends TestCompactorBase {
runInitiator(conf);
int count = TestTxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE");
- Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"), 2, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(conf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 1 : 2, count);
runWorker(conf);
runWorker(conf);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
index 8b6e57c8d0f..a57a817e161 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
@@ -89,6 +89,7 @@ class TestCompactorBase {
hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+ MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
TestTxnDbUtil.setConfValues(hiveConf);
TestTxnDbUtil.cleanDb(hiveConf);
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorWithAbortCleanupUsingCompactionCycle.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorWithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..beb80d18e6c
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorWithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.txn.compactor.TestCompactor;
+import org.junit.Before;
+
+public class TestCompactorWithAbortCleanupUsingCompactionCycle extends TestCompactor {
+ @Override
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+ }
+}
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java
index 18fd05b9fde..84fc4cfdaf6 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.junit.Assert;
import org.junit.Test;
@@ -62,6 +63,7 @@ public class TestInitiator2 extends CompactorTest {
@Test
public void dbNoAutoCompactSetFalseUpperCase() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
String dbName = "test1";
Map<String, String> params = new HashMap<String, String>(1);
params.put("NO_AUTO_COMPACTION", "false");
@@ -89,7 +91,7 @@ public class TestInitiator2 extends CompactorTest {
startInitiator();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
- Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(useCleanerForAbortCleanup ? 0 : 1, rsp.getCompactsSize());
}
@Override
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2WithAbortCleanupUsingCompactionCycle.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2WithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..9a47ba59464
--- /dev/null
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator2WithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.txn.compactor.TestInitiator2;
+
+public class TestInitiator2WithAbortCleanupUsingCompactionCycle extends TestInitiator2 {
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 2325c1527d7..ce822effe7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
@@ -61,12 +60,9 @@ public class Cleaner extends MetaStoreCompactorThread {
cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
COMPACTOR_CLEANER_THREAD_NAME_FORMAT);
- if (CollectionUtils.isEmpty(cleanupHandlers)) {
- FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
- cleanupHandlers = TaskHandlerFactory.getInstance()
- .getHandlers(conf, txnHandler, metadataCache,
- metricsEnabled, fsRemover);
- }
+ FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
+ cleanupHandlers = TaskHandlerFactory.getInstance()
+ .getHandlers(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
}
@Override
@@ -150,7 +146,7 @@ public class Cleaner extends MetaStoreCompactorThread {
}
@Override
- public boolean isCacheEnabled() {
+ protected boolean isCacheEnabled() {
return MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_TABLECACHE_ON);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 9398258a64b..a86c18baad9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -157,8 +157,7 @@ public class Initiator extends MetaStoreCompactorThread {
.parallelStream()
.filter(ci -> isEligibleForCompaction(ci, currentCompactions, skipDBs, skipTables))
.collect(Collectors.toSet())).get();
- LOG.debug("Found " + potentials.size() + " potential compactions, " +
- "checking to see if we should compact any of them");
+ LOG.debug("Found {} potential compactions, checking to see if we should compact any of them", potentials.size());
checkInterrupt();
@@ -246,7 +245,7 @@ public class Initiator extends MetaStoreCompactorThread {
}
@Override
- public boolean isCacheEnabled() {
+ protected boolean isCacheEnabled() {
return MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
index 7cda085ff6b..39d0e10f589 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetaStoreCompactorThread.java
@@ -88,7 +88,7 @@ public abstract class MetaStoreCompactorThread extends CompactorThread implement
return CompactorUtil.getPartitionsByNames(conf, ci.dbname, ci.tableName, ci.partName);
}
- public abstract boolean isCacheEnabled();
+ protected abstract boolean isCacheEnabled();
protected void startCycleUpdater(long updateInterval, Runnable taskToRun) {
if (cycleUpdaterExecutorService == null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
new file mode 100644
index 00000000000..06dc02942d9
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
+import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static java.util.Objects.isNull;
+
+/**
+ * Abort-cleanup based implementation of TaskHandler.
+ * Provides implementation of creation of abort clean tasks.
+ */
+class AbortedTxnCleaner extends TaskHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
+
+ public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
+ MetadataCache metadataCache, boolean metricsEnabled,
+ FSRemover fsRemover) {
+ super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
+ }
+
+ /**
+ The following cleanup is based on the following idea - <br>
+ 1. Aborted cleanup is independent of compaction. This is because directories which are written by
+ aborted txns are not visible by any open txns. It is only visible while determining the AcidState (which
+ only sees the aborted deltas and does not read the file).<br><br>
+
+ The following algorithm is used to clean the set of aborted directories - <br>
+ a. Find the list of entries which are suitable for cleanup (This is done in {@link TxnStore#findReadyToCleanAborts(long, int)}).<br>
+ b. If the table/partition does not exist, then remove the associated aborted entry in TXN_COMPONENTS table. <br>
+ c. Get the AcidState of the table by using the min open txnID, database name, tableName, partition name, highest write ID <br>
+ d. Fetch the aborted directories and delete the directories. <br>
+ e. Fetch the aborted write IDs from the AcidState and use it to delete the associated metadata in the TXN_COMPONENTS table.
+ **/
+ @Override
+ public List<Runnable> getTasks() throws MetaException {
+ int abortedThreshold = HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+ long abortedTimeThreshold = HiveConf
+ .getTimeVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+ TimeUnit.MILLISECONDS);
+ List<CompactionInfo> readyToCleanAborts = txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
+
+ if (!readyToCleanAborts.isEmpty()) {
+ return readyToCleanAborts.stream().map(ci -> ThrowingRunnable.unchecked(() ->
+ clean(ci, ci.txnId > 0 ? ci.txnId : Long.MAX_VALUE, metricsEnabled)))
+ .collect(Collectors.toList());
+ }
+ return Collections.emptyList();
+ }
+
+ private void clean(CompactionInfo info, long minOpenWriteTxn, boolean metricsEnabled) throws MetaException, InterruptedException {
+ LOG.info("Starting cleaning for {}", info);
+ PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
+ String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
+ try {
+ if (metricsEnabled) {
+ perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(), cleanerMetric);
+ }
+ Partition p = null;
+ Table t = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
+ if (isNull(t)) {
+ // The table was dropped before we got around to cleaning it.
+ LOG.info("Unable to find table {}, assuming it was dropped.", info.getFullTableName());
+ txnHandler.markCleaned(info, true);
+ return;
+ }
+ if (!isNull(info.partName)) {
+ p = resolvePartition(info.dbname, info.tableName, info.partName);
+ if (isNull(p)) {
+ // The partition was dropped before we got around to cleaning it.
+ LOG.info("Unable to find partition {}, assuming it was dropped.",
+ info.getFullPartitionName());
+ txnHandler.markCleaned(info, true);
+ return;
+ }
+ }
+
+ String location = CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
+ info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
+ abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
+
+ } catch (InterruptedException e) {
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Caught exception when cleaning, unable to complete cleaning of {} due to {}", info,
+ e.getMessage());
+ throw new MetaException(e.getMessage());
+ } finally {
+ if (metricsEnabled) {
+ perfLogger.perfLogEnd(AbortedTxnCleaner.class.getName(), cleanerMetric);
+ }
+ }
+ }
+
+ private void abortCleanUsingAcidDir(CompactionInfo info, String location, long minOpenWriteTxn) throws Exception {
+ ValidTxnList validTxnList =
+ TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenWriteTxn, true);
+ //save it so that getAcidState() sees it
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+ ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info, validTxnList);
+
+ // Set the highestWriteId of the cleanup equal to the min(minOpenWriteId - 1, highWatermark).
+ // This is necessary for looking at the complete state of the table till the min open write Id
+ // (if there is an open txn on the table) or the highestWatermark.
+ // This is used later on while deleting the records in TXN_COMPONENTS table.
+ info.highestWriteId = Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
+ Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1, validWriteIdList.getHighWatermark());
+ Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () -> resolveTable(info.dbname, info.tableName));
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+
+ boolean success = cleanAndVerifyObsoleteDirectories(info, location, validWriteIdList, table);
+ if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
+ txnHandler.markCleaned(info, false);
+ } else {
+ LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
+ }
+
+ }
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index a352dca4dc5..453a1616c33 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -17,17 +17,13 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor.handler;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.LockComponentBuilder;
import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockType;
@@ -39,35 +35,27 @@ import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
import org.apache.hadoop.hive.metastore.api.UnlockRequest;
-import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest.CleanupRequestBuilder;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
-import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.BitSet;
import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static org.apache.commons.collections4.ListUtils.subtract;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS;
@@ -77,7 +65,7 @@ import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static java.util.Objects.isNull;
/**
- * A compaction based implementation of RequestHandler.
+ * A compaction based implementation of TaskHandler.
* Provides implementation of creation of compaction clean tasks.
*/
class CompactionCleaner extends TaskHandler {
@@ -114,7 +102,7 @@ class CompactionCleaner extends TaskHandler {
return Collections.emptyList();
}
- private void clean(CompactionInfo ci, long minOpenTxnGLB, boolean metricsEnabled) throws MetaException {
+ private void clean(CompactionInfo ci, long minOpenTxn, boolean metricsEnabled) throws MetaException {
LOG.info("Starting cleaning for {}", ci);
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
@@ -134,13 +122,13 @@ class CompactionCleaner extends TaskHandler {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped. {}", ci.getFullTableName(),
idWatermark(ci));
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
return;
}
if (MetaStoreUtils.isNoCleanUpSet(t.getParameters())) {
// The table was marked no clean up true.
LOG.info("Skipping table {} clean up, as NO_CLEANUP set to true", ci.getFullTableName());
- txnHandler.markCleaned(ci);
+ txnHandler.markRefused(ci);
return;
}
if (!isNull(ci.partName)) {
@@ -149,13 +137,13 @@ class CompactionCleaner extends TaskHandler {
// The partition was dropped before we got around to cleaning it.
LOG.info("Unable to find partition {}, assuming it was dropped. {}",
ci.getFullPartitionName(), idWatermark(ci));
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
return;
}
if (MetaStoreUtils.isNoCleanUpSet(p.getParameters())) {
// The partition was marked no clean up true.
LOG.info("Skipping partition {} clean up, as NO_CLEANUP set to true", ci.getFullPartitionName());
- txnHandler.markCleaned(ci);
+ txnHandler.markRefused(ci);
return;
}
}
@@ -172,7 +160,7 @@ class CompactionCleaner extends TaskHandler {
if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, ci.partName))) {
cleanUsingLocation(ci, path, true);
} else {
- cleanUsingAcidDir(ci, path, minOpenTxnGLB);
+ cleanUsingAcidDir(ci, path, minOpenTxn);
}
} else {
cleanUsingLocation(ci, location, false);
@@ -216,15 +204,15 @@ class CompactionCleaner extends TaskHandler {
deleted = fsRemover.clean(getCleaningRequestBasedOnLocation(ci, path));
}
if (!deleted.isEmpty()) {
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
} else {
txnHandler.clearCleanerStart(ci);
}
}
- private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxnGLB) throws Exception {
+ private void cleanUsingAcidDir(CompactionInfo ci, String location, long minOpenTxn) throws Exception {
ValidTxnList validTxnList =
- TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxnGLB);
+ TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), minOpenTxn, false);
//save it so that getAcidState() sees it
conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
/*
@@ -260,50 +248,12 @@ class CompactionCleaner extends TaskHandler {
// Creating 'reader' list since we are interested in the set of 'obsolete' files
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, validTxnList);
- LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-
- Path path = new Path(location);
- FileSystem fs = path.getFileSystem(conf);
-
- // Collect all the files/dirs
- Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
- AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false,
- dirSnapshots);
Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci.dbname, ci.tableName));
- boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, ci.partName);
-
- List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort);
- if (isDynPartAbort || dir.hasUncompactedAborts()) {
- ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
- }
-
- List<Path> deleted = fsRemover.clean(new CleanupRequestBuilder().setLocation(location)
- .setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
- .setRunAs(ci.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
- .build());
-
- if (!deleted.isEmpty()) {
- AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
- txnHandler);
- }
-
- // Make sure there are no leftovers below the compacted watermark
- boolean success = false;
- conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
- dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
- ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
- Ref.from(false), false, dirSnapshots);
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
- List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted);
- if (!remained.isEmpty()) {
- LOG.warn("{} Remained {} obsolete directories from {}. {}",
- idWatermark(ci), remained.size(), location, CompactorUtil.getDebugInfo(remained));
- } else {
- LOG.debug("{} All cleared below the watermark: {} from {}", idWatermark(ci), ci.highestWriteId, location);
- success = true;
- }
+ boolean success = cleanAndVerifyObsoleteDirectories(ci, location, validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, ci.partName)) {
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
} else {
txnHandler.clearCleanerStart(ci);
LOG.warn("No files were removed. Leaving queue entry {} in ready for cleaning state.", ci);
@@ -337,18 +287,10 @@ class CompactionCleaner extends TaskHandler {
return " id=" + ci.id;
}
- private ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
+ @Override
+ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo ci, ValidTxnList validTxnList)
throws NoSuchTxnException, MetaException {
- List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(ci.dbname, ci.tableName));
- GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
- request.setValidTxnList(validTxnList.writeToString());
- GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
- // we could have no write IDs for a table if it was never written to but
- // since we are in the Cleaner phase of compactions, there must have
- // been some delta/base dirs
- assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
- ValidReaderWriteIdList validWriteIdList =
- TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+ ValidReaderWriteIdList validWriteIdList = super.getValidCleanerWriteIdList(ci, validTxnList);
/*
* We need to filter the obsoletes dir list, to only remove directories that were made obsolete by this compaction
* If we have a higher retentionTime it is possible for a second compaction to run on the same partition. Cleaning up the first compaction
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
index 327b9238486..ef95a100c1a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -17,19 +17,40 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor.handler;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
+import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+
+import static org.apache.commons.collections.ListUtils.subtract;
/**
* An abstract class which defines the list of utility methods for performing cleanup activities.
@@ -81,4 +102,63 @@ public abstract class TaskHandler {
return null;
}
}
+
+ protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo info, ValidTxnList validTxnList)
+ throws NoSuchTxnException, MetaException {
+ List<String> tblNames = Collections.singletonList(AcidUtils.getFullTableName(info.dbname, info.tableName));
+ GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
+ request.setValidTxnList(validTxnList.writeToString());
+ GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
+ // we could have no write IDs for a table if it was never written to but
+ // since we are in the Cleaner phase of compactions, there must have
+ // been some delta/base dirs
+ assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
+
+ return TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+ }
+
+ protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, String location,
+ ValidReaderWriteIdList validWriteIdList, Table table) throws MetaException, IOException {
+ Path path = new Path(location);
+ FileSystem fs = path.getFileSystem(conf);
+
+ // Collect all the files/dirs
+ Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
+ AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, validWriteIdList, Ref.from(false), false,
+ dirSnapshots);
+ boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, info.partName);
+
+ List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, isDynPartAbort);
+ if (isDynPartAbort || dir.hasUncompactedAborts()) {
+ info.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+ }
+
+ List<Path> deleted = fsRemover.clean(new CleanupRequest.CleanupRequestBuilder().setLocation(location)
+ .setDbName(info.dbname).setFullPartitionName(info.getFullPartitionName())
+ .setRunAs(info.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
+ .build());
+
+ if (!deleted.isEmpty()) {
+ AcidMetricService.updateMetricsFromCleaner(info.dbname, info.tableName, info.partName, dir.getObsolete(), conf,
+ txnHandler);
+ }
+
+ // Make sure there are no leftovers below the compacted watermark
+ boolean success = false;
+ conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+ dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+ info.getFullTableName(), new long[0], new BitSet(), info.highestWriteId, Long.MAX_VALUE),
+ Ref.from(false), false, dirSnapshots);
+
+ List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, isDynPartAbort), deleted);
+ if (!remained.isEmpty()) {
+ LOG.warn("Remained {} obsolete directories from {}. {}",
+ remained.size(), location, CompactorUtil.getDebugInfo(remained));
+ } else {
+ LOG.debug("All cleared below the watermark: {} from {}", info.highestWriteId, location);
+ success = true;
+ }
+
+ return success;
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
index 79293a22c26..57a4dc625c8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandlerFactory.java
@@ -18,11 +18,12 @@
package org.apache.hadoop.hive.ql.txn.compactor.handler;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -43,7 +44,14 @@ public class TaskHandlerFactory {
public List<TaskHandler> getHandlers(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
boolean metricsEnabled, FSRemover fsRemover) {
- return Arrays.asList(new CompactionCleaner(conf, txnHandler, metadataCache,
+ boolean useAbortHandler = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
+ List<TaskHandler> taskHandlers = new ArrayList<>();
+ if (useAbortHandler) {
+ taskHandlers.add(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ metricsEnabled, fsRemover));
+ }
+ taskHandlers.add(new CompactionCleaner(conf, txnHandler, metadataCache,
metricsEnabled, fsRemover));
+ return taskHandlers;
}
}
diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index 5b334838ac8..03a7326710e 100644
--- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -238,7 +238,7 @@ public class TestCompactionTxnHandler {
List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)));
assertEquals(0, txnHandler.findReadyToClean(0, 0).size());
@@ -529,7 +529,7 @@ public class TestCompactionTxnHandler {
txnHandler.compact(rqst);
ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
assertNotNull(ci);
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
}
private void addWaitingForCleaningCompaction(String dbName, String tableName, CompactionType type,
@@ -866,7 +866,7 @@ public class TestCompactionTxnHandler {
Thread.sleep(txnHandler.getOpenTxnTimeOutMillis());
List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
// Check that we are cleaning up the empty aborted transactions
GetOpenTxnsResponse txnList = txnHandler.getOpenTxns();
@@ -892,7 +892,7 @@ public class TestCompactionTxnHandler {
toClean = txnHandler.findReadyToClean(0, 0);
assertEquals(1, toClean.size());
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
txnHandler.openTxns(new OpenTxnRequest(1, "me", "localhost"));
// The open txn will became the low water mark
@@ -999,7 +999,7 @@ public class TestCompactionTxnHandler {
txnHandler.markCompacted(ci);
checkEnqueueTime(enqueueTime);
- txnHandler.markCleaned(ci);
+ txnHandler.markCleaned(ci, false);
checkEnqueueTime(enqueueTime);
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 2a9ae73ca26..566b1251427 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -156,6 +156,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
//TestTxnCommands2WithSplitUpdateAndVectorization has the vectorized version
//of these tests.
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ //TestTxnCommands2WithAbortCleanupUsingCompactionCycle has the tests with abort cleanup in compaction cycle
+ MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES, false);
HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_ACID_TRUNCATE_USE_BASE, false);
}
@@ -2620,6 +2622,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
@Test
public void testDynPartInsertWithAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
int[][] resultData = new int[][]{{1, 1}, {2, 2}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
verifyDeltaDirAndResult(1, Table.ACIDTBLPART.toString(), "p=p1", resultData);
@@ -2641,7 +2644,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 1, count);
runWorker(hiveConf);
runCleaner(hiveConf);
@@ -2651,6 +2655,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
@Test
public void testDynPartInsertWithMultiPartitionAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
int [][] resultData = new int[][] {{1,1}, {2,2}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p2'),(2,2,'p2')");
@@ -2680,7 +2685,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 1, count);
r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
Assert.assertEquals("4", r1.get(0));
@@ -2696,6 +2702,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
@Test
public void testDynPartIOWWithAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
int [][] resultData = new int[][] {{1,1}, {2,2}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
verifyDeltaDirAndResult(1, Table.ACIDTBLPART.toString(), "p=p1", resultData);
@@ -2718,7 +2725,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 1, count);
runWorker(hiveConf);
runCleaner(hiveConf);
@@ -2729,6 +2737,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
@Test
public void testDynPartIOWWithMultiPartitionAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
int [][] resultData = new int[][] {{1,1}, {2,2}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p2'),(2,2,'p2')");
@@ -2760,7 +2769,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 1, count);
r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
Assert.assertEquals("4", r1.get(0));
@@ -2778,6 +2788,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
@Test
public void testDynPartUpdateWithAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
int[][] resultData1 = new int[][]{{1, 2}, {3, 4}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values (1,2,'p1')");
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values (3,4,'p1')");
@@ -2801,7 +2812,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 1, count);
runWorker(hiveConf);
runCleaner(hiveConf);
@@ -2812,6 +2824,7 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
@Test
public void testDynPartMergeWithAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
int [][] resultData = new int[][] {{1,1}, {2,2}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p) values(1,1,'p1'),(2,2,'p1')");
verifyDeltaDirAndResult(1, Table.ACIDTBLPART.toString(), "p=p1", resultData);
@@ -2845,7 +2858,8 @@ public class TestTxnCommands2 extends TxnCommandsBaseForTests {
count = TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from COMPACTION_QUEUE");
// Only one job is added to the queue per table. This job corresponds to all the entries for a particular table
// with rows in TXN_COMPONENTS
- Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"), 1, count);
+ Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from COMPACTION_QUEUE"),
+ useCleanerForAbortCleanup ? 0 : 1, count);
r1 = runStatementOnDriver("select count(*) from " + Table.ACIDTBLPART);
Assert.assertEquals("2", r1.get(0));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..628c0f979aa
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2WithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+
+/**
+ * Same as TestTxnCommands2 but tests ACID tables with abort cleanup happening explicitly using
+ * compaction cycle by default, and having 'transactional_properties' set to 'default'. This
+ * specifically tests the abort cleanup done exclusively using compaction cycle for ACID tables.
+ */
+public class TestTxnCommands2WithAbortCleanupUsingCompactionCycle extends TestTxnCommands2 {
+ public TestTxnCommands2WithAbortCleanupUsingCompactionCycle() {
+ super();
+ }
+
+ @Override
+ void initHiveConf() {
+ super.initHiveConf();
+ MetastoreConf.setBoolVar(hiveConf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index a556c7fdf3b..113173acb37 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -32,15 +32,20 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
+import org.apache.hadoop.hive.metastore.LockComponentBuilder;
+import org.apache.hadoop.hive.metastore.LockRequestBuilder;
import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.DataOperationType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.LockRequest;
+import org.apache.hadoop.hive.metastore.api.LockType;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.OpenTxnRequest;
@@ -135,6 +140,8 @@ public abstract class CompactorTest {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, useMinHistoryWriteId());
+ // Set this config to true in the base class, there are extended test classes which set this config to false.
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
TestTxnDbUtil.setConfValues(conf);
TestTxnDbUtil.cleanDb(conf);
TestTxnDbUtil.prepDb(conf);
@@ -260,6 +267,36 @@ public abstract class CompactorTest {
return awiResp.getTxnToWriteIds().get(0).getWriteId();
}
+ protected void addDeltaFileWithTxnComponents(Table t, Partition p, int numRecords, boolean abort)
+ throws Exception {
+ long txnId = openTxn();
+ long writeId = ms.allocateTableWriteId(txnId, t.getDbName(), t.getTableName());
+ acquireLock(t, p, txnId);
+ addDeltaFile(t, p, writeId, writeId, numRecords);
+ if (abort) {
+ txnHandler.abortTxns(new AbortTxnsRequest(Collections.singletonList(txnId)));
+ } else {
+ txnHandler.commitTxn(new CommitTxnRequest(txnId));
+ }
+ }
+
+ protected void acquireLock(Table t, Partition p, long txnId) throws Exception {
+ LockComponentBuilder lockCompBuilder = new LockComponentBuilder()
+ .setLock(LockType.SHARED_WRITE)
+ .setOperationType(DataOperationType.INSERT)
+ .setDbName(t.getDbName())
+ .setTableName(t.getTableName())
+ .setIsTransactional(true);
+ if (p != null) {
+ lockCompBuilder.setPartitionName(t.getPartitionKeys().get(0).getName() + "=" + p.getValues().get(0));
+ }
+ LockRequestBuilder requestBuilder = new LockRequestBuilder().setUser(null)
+ .setTransactionId(txnId).addLockComponent(lockCompBuilder.build());
+ requestBuilder.setZeroWaitReadEnabled(!conf.getBoolVar(HiveConf.ConfVars.TXN_OVERWRITE_X_LOCK) ||
+ !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
+ ms.lock(requestBuilder.build());
+ }
+
protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn, int numRecords)
throws Exception {
addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true);
@@ -347,7 +384,7 @@ public abstract class CompactorTest {
} else if (open == null || !open.contains(tid)) {
txnHandler.commitTxn(new CommitTxnRequest(tid));
} else if (open.contains(tid) && useMinHistoryWriteId()){
- txnHandler.addWriteIdsToMinHistory(tid,
+ txnHandler.addWriteIdsToMinHistory(tid,
Collections.singletonMap(dbName + "." + tblName, minOpenWriteId));
}
}
@@ -664,7 +701,7 @@ public abstract class CompactorTest {
* are used since new (1.3) code has to be able to read old files.
*/
abstract boolean useHive130DeltaDirName();
-
+
protected boolean useMinHistoryWriteId() {
return false;
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..b57599113e6
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.Before;
+
+public class TestAbortCleanupUsingCompactionCycle extends TestCleaner {
+ @Override
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java
new file mode 100644
index 00000000000..bbc72661abf
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.junit.Before;
+
+public class TestAbortCleanupUsingCompactionCycleWithMinHistoryWriteId extends TestCleaner {
+ @Override
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+ }
+
+ @Override
+ protected boolean useMinHistoryWriteId() {
+ return true;
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 82f052fe362..df4a786a184 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -67,6 +67,7 @@ import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
@@ -107,7 +108,7 @@ public class TestCleaner extends CompactorTest {
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
.getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
- doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
+ doThrow(new RuntimeException(errorMessage)).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
Table t = newTable("default", "retry_test", false);
@@ -130,8 +131,8 @@ public class TestCleaner extends CompactorTest {
for (int i = 1; i < 4; i++) {
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
- cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(taskHandlers);
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
cleaner.run();
@@ -157,8 +158,8 @@ public class TestCleaner extends CompactorTest {
//Do a final run to reach the maximum retry attempts, so the state finally should be set to failed
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
- cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(taskHandlers);
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
cleaner.run();
@@ -195,13 +196,13 @@ public class TestCleaner extends CompactorTest {
FSRemover fsRemover = new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache);
List<TaskHandler> taskHandlers = TaskHandlerFactory.getInstance()
.getHandlers(conf, mockedHandler, metadataCache, false, fsRemover);
- doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class));
+ doThrow(new RuntimeException()).when(mockedHandler).markCleaned(nullable(CompactionInfo.class), eq(false));
//Do a run to fail the clean and set the retention time
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
- cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(taskHandlers);
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
cleaner.run();
@@ -216,8 +217,8 @@ public class TestCleaner extends CompactorTest {
//Do a final run and check if the compaction is not picked up again
cleaner = new Cleaner();
cleaner.setConf(conf);
- cleaner.setCleanupHandlers(taskHandlers);
cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(taskHandlers);
FieldSetter.setField(cleaner, MetaStoreCompactorThread.class.getDeclaredField("txnHandler"), mockedHandler);
cleaner.run();
@@ -815,7 +816,7 @@ public class TestCleaner extends CompactorTest {
}
@Test
- public void NoCleanupAfterMajorCompaction() throws Exception {
+ public void noCleanupAfterMajorCompaction() throws Exception {
Map<String, String> parameters = new HashMap<>();
//With no cleanup true
@@ -836,7 +837,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+ Assert.assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().get(0).getState());
// Check that the files are not removed
List<Path> paths = getDirectories(conf, t, null);
@@ -884,7 +885,7 @@ public class TestCleaner extends CompactorTest {
// Check there are no compactions requests left.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals(1, rsp.getCompactsSize());
- Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+ Assert.assertEquals(TxnStore.REFUSED_RESPONSE, rsp.getCompacts().get(0).getState());
// Check that the files are not removed
List<Path> paths = getDirectories(conf, t, p);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
index 6a9a37dfe3f..c98b310ca25 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java
@@ -123,6 +123,7 @@ public class TestInitiator extends CompactorTest {
@Test
public void majorCompactOnTableTooManyAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
Table t = newTable("default", "mcottma", false);
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 10);
@@ -144,14 +145,19 @@ public class TestInitiator extends CompactorTest {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("initiated", compacts.get(0).getState());
- Assert.assertEquals("mcottma", compacts.get(0).getTablename());
- Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ if (useCleanerForAbortCleanup) {
+ Assert.assertEquals(0, compacts.size());
+ } else {
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("mcottma", compacts.get(0).getTablename());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
}
@Test
public void majorCompactOnPartitionTooManyAborts() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
Table t = newTable("default", "mcoptma", true);
Partition p = newPartition(t, "today");
@@ -175,11 +181,15 @@ public class TestInitiator extends CompactorTest {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("initiated", compacts.get(0).getState());
- Assert.assertEquals("mcoptma", compacts.get(0).getTablename());
- Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
- Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ if (useCleanerForAbortCleanup) {
+ Assert.assertEquals(0, compacts.size());
+ } else {
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals("mcoptma", compacts.get(0).getTablename());
+ Assert.assertEquals("ds=today", compacts.get(0).getPartitionname());
+ Assert.assertEquals(CompactionType.MAJOR, compacts.get(0).getType());
+ }
}
@Test
@@ -218,6 +228,7 @@ public class TestInitiator extends CompactorTest {
*/
@Test
public void compactExpiredAbortedTxns() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
Table t = newTable("default", "expiredAbortedTxns", false);
// abort a txn
long txnid = openTxn();
@@ -243,8 +254,12 @@ public class TestInitiator extends CompactorTest {
// set to 1 ms, wait 1 ms, and check that minor compaction is queued
conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD, 1, TimeUnit.MILLISECONDS);
Thread.sleep(1L);
- ShowCompactResponse rsp = initiateAndVerifyCompactionQueueLength(1);
- Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+ if (useCleanerForAbortCleanup) {
+ initiateAndVerifyCompactionQueueLength(0);
+ } else {
+ ShowCompactResponse rsp = initiateAndVerifyCompactionQueueLength(1);
+ Assert.assertEquals(CompactionType.MINOR, rsp.getCompacts().get(0).getType());
+ }
}
private ShowCompactResponse initiateAndVerifyCompactionQueueLength(int expectedLength)
@@ -961,6 +976,7 @@ public class TestInitiator extends CompactorTest {
}
@Test public void testInitiatorFailure() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
String tableName = "my_table";
Table t = newTable("default", tableName, false);
@@ -991,9 +1007,13 @@ public class TestInitiator extends CompactorTest {
// verify status of table compaction
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("did not initiate", compacts.get(0).getState());
- Assert.assertEquals(tableName, compacts.get(0).getTablename());
+ if (useCleanerForAbortCleanup) {
+ Assert.assertEquals(0, compacts.size());
+ } else {
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("did not initiate", compacts.get(0).getState());
+ Assert.assertEquals(tableName, compacts.get(0).getTablename());
+ }
}
@Test
@@ -1029,6 +1049,7 @@ public class TestInitiator extends CompactorTest {
@Test
public void testInitiatorHostAndVersion() throws Exception {
+ boolean useCleanerForAbortCleanup = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER);
String tableName = "my_table";
Table t = newTable("default", tableName, false);
@@ -1059,14 +1080,18 @@ public class TestInitiator extends CompactorTest {
// verify status of table compaction
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
- Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("initiated", compacts.get(0).getState());
- Assert.assertEquals(tableName, compacts.get(0).getTablename());
- Assert.assertEquals(runtimeVersion, compacts.get(0).getInitiatorVersion());
- // split the threadid
- String[] parts = compacts.get(0).getInitiatorId().split("-");
- Assert.assertTrue(parts.length > 1);
- Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1)));
+ if (useCleanerForAbortCleanup) {
+ Assert.assertEquals(0, compacts.size());
+ } else {
+ Assert.assertEquals(1, compacts.size());
+ Assert.assertEquals("initiated", compacts.get(0).getState());
+ Assert.assertEquals(tableName, compacts.get(0).getTablename());
+ Assert.assertEquals(runtimeVersion, compacts.get(0).getInitiatorVersion());
+ // split the threadid
+ String[] parts = compacts.get(0).getInitiatorId().split("-");
+ Assert.assertTrue(parts.length > 1);
+ Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1)));
+ }
}
@Test
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiatorWithAbortCleanupUsingCompactionCycle.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiatorWithAbortCleanupUsingCompactionCycle.java
new file mode 100644
index 00000000000..5fdf9fc72a9
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiatorWithAbortCleanupUsingCompactionCycle.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+
+public class TestInitiatorWithAbortCleanupUsingCompactionCycle extends TestInitiator {
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, false);
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
new file mode 100644
index 00000000000..3b40e2f7594
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor.handler;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.txn.compactor.Cleaner;
+import org.apache.hadoop.hive.ql.txn.compactor.CleanupRequest;
+import org.apache.hadoop.hive.ql.txn.compactor.FSRemover;
+import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.ArgumentMatchers.any;
+
+public class TestAbortedTxnCleaner extends TestHandler {
+
+ @Test
+ public void testCleaningOfAbortedDirectoriesForUnpartitionedTables() throws Exception {
+ String dbName = "default", tableName = "handler_unpart_test";
+ Table t = newTable(dbName, tableName, false);
+
+ // 3-aborted deltas & one committed delta
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ List<Path> directories = getDirectories(conf, t, null);
+ // All aborted directories removed, hence 1 committed delta directory must be present
+ Assert.assertEquals(1, directories.size());
+ }
+
+ @Test
+ public void testCleaningOfAbortedDirectoriesForSinglePartition() throws Exception {
+ String dbName = "default", tableName = "handler_part_single_test", partName = "today";
+ Table t = newTable(dbName, tableName, true);
+ Partition p = newPartition(t, partName);
+
+ // 3-aborted deltas & one committed delta
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+ addDeltaFileWithTxnComponents(t, p, 2, false);
+ addDeltaFileWithTxnComponents(t, p, 2, true);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ List<Path> directories = getDirectories(conf, t, p);
+ // All aborted directories removed, hence 1 committed delta directory must be present
+ Assert.assertEquals(1, directories.size());
+ }
+
+ @Test
+ public void testCleaningOfAbortedDirectoriesForMultiplePartitions() throws Exception {
+ String dbName = "default", tableName = "handler_part_multiple_test", partName1 = "today1", partName2 = "today2";
+ Table t = newTable(dbName, tableName, true);
+ Partition p1 = newPartition(t, partName1);
+ Partition p2 = newPartition(t, partName2);
+
+ // 3-aborted deltas & one committed delta for partition-1
+ addDeltaFileWithTxnComponents(t, p1, 2, true);
+ addDeltaFileWithTxnComponents(t, p1, 2, true);
+ addDeltaFileWithTxnComponents(t, p1, 2, false);
+ addDeltaFileWithTxnComponents(t, p1, 2, true);
+
+ // 3-aborted deltas & one committed delta for partition-2
+ addDeltaFileWithTxnComponents(t, p2, 2, true);
+ addDeltaFileWithTxnComponents(t, p2, 2, true);
+ addDeltaFileWithTxnComponents(t, p2, 2, false);
+ addDeltaFileWithTxnComponents(t, p2, 2, true);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(2)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ List<Path> directories = getDirectories(conf, t, p1);
+ // All aborted directories removed, hence 1 committed delta directory must be present
+ Assert.assertEquals(1, directories.size());
+
+ directories = getDirectories(conf, t, p2);
+ // All aborted directories removed, hence 1 committed delta directory must be present
+ Assert.assertEquals(1, directories.size());
+ }
+
+ @Test
+ public void testCleaningOfAbortedDirectoriesWithLongRunningOpenWriteTxn() throws Exception {
+ String dbName = "default", tableName = "handler_unpart_open_test";
+ Table t = newTable(dbName, tableName, false);
+
+ // 3-aborted deltas & one committed delta
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+
+ // Open a long-running transaction
+ long openTxnId = openTxn();
+ long writeId = ms.allocateTableWriteId(openTxnId, t.getDbName(), t.getTableName());
+ acquireLock(t, null, openTxnId);
+ addDeltaFile(t, null, writeId, writeId, 2);
+
+ // Add an aborted write after open txn
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ List<Path> directories = getDirectories(conf, t, null);
+ // All aborted directories below min open write ID are removed,
+ // hence 1 open, 1 committed, 1 aborted delta directory must be present
+ Assert.assertEquals(3, directories.size());
+
+ // Commit the long open txn
+ txnHandler.commitTxn(new CommitTxnRequest(openTxnId));
+ }
+
+ @Test
+ public void testCleaningOfAbortedDirectoriesOnTopOfBase() throws Exception {
+ String dbName = "default", tableName = "handler_unpart_top_test";
+ Table t = newTable(dbName, tableName, false);
+
+ // Add 4 committed deltas
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+
+ CompactionRequest cr = new CompactionRequest(dbName, tableName, CompactionType.MAJOR);
+ txnHandler.compact(cr);
+
+ // Run compaction
+ startWorker();
+
+ // Check if there is a one base file
+ List<Path> directories = getDirectories(conf, t, null);
+ // Both base and delta files are present since we haven't cleaned yet.
+ Assert.assertEquals(5, directories.size());
+ Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());
+
+ // 3 aborted deltas
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ directories = getDirectories(conf, t, null);
+ Assert.assertEquals(1, directories.size());
+ Assert.assertTrue(directories.get(0).getName().startsWith(AcidUtils.BASE_PREFIX));
+ }
+
+ @Test
+ public void testCleaningOfAbortedDirectoriesBelowBase() throws Exception {
+ String dbName = "default", tableName = "handler_unpart_below_test";
+ Table t = newTable(dbName, tableName, false);
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+
+ CompactionRequest cr = new CompactionRequest(dbName, tableName, CompactionType.MAJOR);
+ txnHandler.compact(cr);
+
+ // Run compaction
+ startWorker();
+
+ // Check if there is a one base file
+ List<Path> directories = getDirectories(conf, t, null);
+ // Both base and delta files are present since we haven't cleaned yet.
+ Assert.assertEquals(5, directories.size());
+ Assert.assertEquals(1, directories.stream().filter(dir -> dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+
+ directories = getDirectories(conf, t, null);
+ // The table is already compacted, so we must see 1 base delta
+ Assert.assertEquals(1, directories.size());
+ }
+
+ @Test
+ public void testAbortedCleaningWithThreeTxnsWithDiffWriteIds() throws Exception {
+ String dbName = "default", tableName = "handler_unpart_writeid_test";
+ Table t = newTable(dbName, tableName, false);
+
+ // Add 2 committed deltas and 2 aborted deltas
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, true);
+ addDeltaFileWithTxnComponents(t, null, 2, false);
+
+ long openTxnId1 = openTxn();
+ long openTxnId2 = openTxn();
+ long openTxnId3 = openTxn();
+ long writeId2 = ms.allocateTableWriteId(openTxnId2, t.getDbName(), t.getTableName());
+ long writeId3 = ms.allocateTableWriteId(openTxnId3, t.getDbName(), t.getTableName());
+ long writeId1 = ms.allocateTableWriteId(openTxnId1, t.getDbName(), t.getTableName());
+ assert writeId2 < writeId1 && writeId2 < writeId3;
+ acquireLock(t, null, openTxnId3);
+ acquireLock(t, null, openTxnId2);
+ acquireLock(t, null, openTxnId1);
+ addDeltaFile(t, null, writeId3, writeId3, 2);
+ addDeltaFile(t, null, writeId1, writeId1, 2);
+ addDeltaFile(t, null, writeId2, writeId2, 2);
+
+ ms.abortTxns(Collections.singletonList(openTxnId2));
+ ms.commitTxn(openTxnId3);
+
+ HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD, 0);
+ MetadataCache metadataCache = new MetadataCache(true);
+ FSRemover mockedFSRemover = Mockito.spy(new FSRemover(conf, ReplChangeManager.getInstance(conf), metadataCache));
+ TaskHandler mockedTaskHandler = Mockito.spy(new AbortedTxnCleaner(conf, txnHandler, metadataCache,
+ false, mockedFSRemover));
+ Cleaner cleaner = new Cleaner();
+ cleaner.setConf(conf);
+ cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
+ cleaner.run();
+
+ List<Path> directories = getDirectories(conf, t, null);
+ Assert.assertEquals(5, directories.size());
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
index c14da6950c4..b1d60b8a851 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
@@ -67,8 +67,8 @@ public class TestHandler extends TestCleaner {
AtomicBoolean stop = new AtomicBoolean(true);
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
- cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
cleaner.init(stop);
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
cleaner.run();
Mockito.verify(mockedFSRemover, Mockito.times(1)).clean(any(CleanupRequest.class));
@@ -98,8 +98,8 @@ public class TestHandler extends TestCleaner {
false, fsRemover));
Cleaner cleaner = new Cleaner();
cleaner.setConf(conf);
- cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
cleaner.init(new AtomicBoolean(true));
+ cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
cleaner.run();
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index 570f4f94dbe..8d17c0a23cf 100644
--- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -649,6 +649,10 @@ public class MetastoreConf {
COMPACTOR_CLEANER_TABLECACHE_ON("metastore.compactor.cleaner.tablecache.on",
"hive.compactor.cleaner.tablecache.on", true,
"Enable table caching in the cleaner. Currently the cache is cleaned after each cycle."),
+ COMPACTOR_CLEAN_ABORTS_USING_CLEANER("metastore.compactor.clean.aborts.using.cleaner", "hive.compactor.clean.aborts.using.cleaner", false,
+ "Whether to use cleaner for cleaning aborted directories or not.\n" +
+ "Set to true when cleaner is expected to clean delta/delete-delta directories from aborted transactions.\n" +
+ "Otherwise the cleanup of such directories will take place within the compaction cycle."),
HIVE_COMPACTOR_CONNECTION_POOLING_MAX_CONNECTIONS("metastore.compactor.connectionPool.maxPoolSize",
"hive.compactor.connectionPool.maxPoolSize", 5,
"Specify the maximum number of connections in the connection pool used by the compactor."),
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
index 5c091836d81..5357e394506 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java
@@ -8815,7 +8815,7 @@ public class HMSHandler extends FacebookBase implements IHMSHandler {
@Override
public void mark_cleaned(CompactionInfoStruct cr) throws MetaException {
- getTxnHandler().markCleaned(CompactionInfo.compactionStructToInfo(cr));
+ getTxnHandler().markCleaned(CompactionInfo.compactionStructToInfo(cr), false);
}
@Override
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
index 5e5db301d63..dda975f17b4 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.metastore.txn;
-import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct;
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 12851547693..5e3e31c1bc5 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -162,31 +162,33 @@ class CompactionTxnHandler extends TxnHandler {
}
rs.close();
- // Check for aborted txns: number of aborted txns past threshold and age of aborted txns
- // past time threshold
- boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
- String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " +
- "MIN(\"TXN_STARTED\"), COUNT(*) FROM \"TXNS\", \"TXN_COMPONENTS\" " +
- " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " " +
- "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
- (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold);
-
- LOG.debug("Going to execute query <{}>", sCheckAborted);
- rs = stmt.executeQuery(sCheckAborted);
- long systemTime = System.currentTimeMillis();
- while (rs.next()) {
- boolean pastTimeThreshold =
- checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
- int numAbortedTxns = rs.getInt(5);
- if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
- CompactionInfo info = new CompactionInfo();
- info.dbname = rs.getString(1);
- info.tableName = rs.getString(2);
- info.partName = rs.getString(3);
- info.tooManyAborts = numAbortedTxns > abortedThreshold;
- info.hasOldAbort = pastTimeThreshold;
- LOG.debug("Found potential compaction: {}", info);
- response.add(info);
+ if (!MetastoreConf.getBoolVar(conf, ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER)) {
+ // Check for aborted txns: number of aborted txns past threshold and age of aborted txns
+ // past time threshold
+ boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
+ String sCheckAborted = "SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " +
+ "MIN(\"TXN_STARTED\"), COUNT(*) FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+ " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED + " " +
+ "GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
+ (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold);
+
+ LOG.debug("Going to execute query <{}>", sCheckAborted);
+ rs = stmt.executeQuery(sCheckAborted);
+ long systemTime = System.currentTimeMillis();
+ while (rs.next()) {
+ boolean pastTimeThreshold =
+ checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
+ int numAbortedTxns = rs.getInt(5);
+ if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ info.tooManyAborts = numAbortedTxns > abortedThreshold;
+ info.hasOldAbort = pastTimeThreshold;
+ LOG.debug("Found potential compaction: {}", info);
+ response.add(info);
+ }
}
}
} catch (SQLException e) {
@@ -464,6 +466,55 @@ class CompactionTxnHandler extends TxnHandler {
}
}
+ @Override
+ @RetrySemantics.ReadOnly
+ public List<CompactionInfo> findReadyToCleanAborts(long abortedTimeThreshold, int abortedThreshold) throws MetaException {
+ try {
+ List<CompactionInfo> readyToCleanAborts = new ArrayList<>();
+ try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
+ Statement stmt = dbConn.createStatement()) {
+ boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
+ String sCheckAborted = "SELECT \"tc\".\"TC_DATABASE\", \"tc\".\"TC_TABLE\", \"tc\".\"TC_PARTITION\", " +
+ " \"tc\".\"MIN_TXN_START_TIME\", \"tc\".\"ABORTED_TXN_COUNT\", \"minOpenWriteTxnId\".\"MIN_OPEN_WRITE_TXNID\" FROM " +
+ " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\", " +
+ " MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+ " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = " + TxnStatus.ABORTED +
+ " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" " +
+ (checkAbortedTimeThreshold ? "" : " HAVING COUNT(*) > " + abortedThreshold) + " ) \"tc\" " +
+ " LEFT JOIN ( SELECT MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\", \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+ " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\"=" + TxnStatus.OPEN + " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" ) \"minOpenWriteTxnId\" " +
+ " ON \"tc\".\"TC_DATABASE\" = \"minOpenWriteTxnId\".\"TC_DATABASE\" AND \"tc\".\"TC_TABLE\" = \"minOpenWriteTxnId\".\"TC_TABLE\"" +
+ " AND (\"tc\".\"TC_PARTITION\" = \"minOpenWriteTxnId\".\"TC_PARTITION\" OR (\"tc\".\"TC_PARTITION\" IS NULL AND \"minOpenWriteTxnId\".\"TC_PARTITION\" IS NULL))";
+
+ LOG.debug("Going to execute query <{}>", sCheckAborted);
+ try (ResultSet rs = stmt.executeQuery(sCheckAborted)) {
+ long systemTime = System.currentTimeMillis();
+ while (rs.next()) {
+ boolean pastTimeThreshold =
+ checkAbortedTimeThreshold && rs.getLong(4) + abortedTimeThreshold < systemTime;
+ int numAbortedTxns = rs.getInt(5);
+ if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
+ CompactionInfo info = new CompactionInfo();
+ info.dbname = rs.getString(1);
+ info.tableName = rs.getString(2);
+ info.partName = rs.getString(3);
+ // In this case, this field contains min open write txn ID.
+ info.txnId = rs.getLong(6);
+ readyToCleanAborts.add(info);
+ }
+ }
+ }
+ return readyToCleanAborts;
+ } catch (SQLException e) {
+ LOG.error("Unable to select next element for cleaning, " + e.getMessage());
+ checkRetryable(e, "findReadyToCleanForAborts");
+ throw new MetaException("Unable to connect to transaction database " +
+ e.getMessage());
+ }
+ } catch (RetryException e) {
+ return findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
+ }
+ }
/**
* Mark the cleaning start time for a particular compaction
@@ -565,7 +616,7 @@ class CompactionTxnHandler extends TxnHandler {
*/
@Override
@RetrySemantics.CannotRetry
- public void markCleaned(CompactionInfo info) throws MetaException {
+ public void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaException {
LOG.debug("Running markCleaned with CompactionInfo: {}", info);
try {
Connection dbConn = null;
@@ -573,132 +624,151 @@ class CompactionTxnHandler extends TxnHandler {
ResultSet rs = null;
try {
dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction);
- String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", "
- + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
- + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
- + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", \"CC_ENQUEUE_TIME\", "
- + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", "
- + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
- + "\"CC_ORDER_BY\") "
- + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
- + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
- + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
- + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", \"CQ_ENQUEUE_TIME\", "
- + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
- + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
- + "\"CQ_ORDER_BY\" "
- + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
- pStmt = dbConn.prepareStatement(s);
- pStmt.setLong(1, info.id);
- LOG.debug("Going to execute update <{}> for CQ_ID={}", s, info.id);
- pStmt.executeUpdate();
-
- s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
- pStmt = dbConn.prepareStatement(s);
- pStmt.setLong(1, info.id);
- LOG.debug("Going to execute update <{}>", s);
- int updCount = pStmt.executeUpdate();
- if (updCount != 1) {
- LOG.error("Unable to delete compaction record: {}. Update count={}", info, updCount);
- LOG.debug("Going to rollback");
- dbConn.rollback();
- }
- // Remove entries from completed_txn_components as well, so we don't start looking there
- // again but only up to the highest write ID include in this compaction job.
- //highestWriteId will be NULL in upgrade scenarios
- s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND \"CTC_TABLE\" = ?";
- if (info.partName != null) {
- s += " AND \"CTC_PARTITION\" = ?";
- }
- if(info.highestWriteId != 0) {
- s += " AND \"CTC_WRITEID\" <= ?";
- }
- pStmt = dbConn.prepareStatement(s);
- int paramCount = 1;
- pStmt.setString(paramCount++, info.dbname);
- pStmt.setString(paramCount++, info.tableName);
- if (info.partName != null) {
- pStmt.setString(paramCount++, info.partName);
- }
- if(info.highestWriteId != 0) {
- pStmt.setLong(paramCount, info.highestWriteId);
- }
- LOG.debug("Going to execute update <{}>", s);
- if ((updCount = pStmt.executeUpdate()) < 1) {
- LOG.warn("Expected to remove at least one row from completed_txn_components when " +
- "marking compaction entry as clean!");
- }
- LOG.debug("Removed {} records from completed_txn_components", updCount);
- /*
- * compaction may remove data from aborted txns above tc_writeid bit it only guarantees to
- * remove it up to (inclusive) tc_writeid, so it's critical to not remove metadata about
- * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns).
- * See {@link ql.txn.compactor.Cleaner.removeFiles()}
- */
- s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
- + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + ") "
- + "AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? "
- + "AND \"TC_PARTITION\" "+ (info.partName != null ? "= ?" : "IS NULL");
-
- List<String> queries = new ArrayList<>();
- Iterator<Long> writeIdsIter = null;
- List<Integer> counts = null;
-
- if (info.writeIds != null && !info.writeIds.isEmpty()) {
- StringBuilder prefix = new StringBuilder(s).append(" AND ");
- List<String> questions = Collections.nCopies(info.writeIds.size(), "?");
+ if (!isAbortOnly) {
+ String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\", \"CC_DATABASE\", "
+ + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\", \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
+ + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
+ + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", \"CC_ENQUEUE_TIME\", "
+ + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", "
+ + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
+ + "\"CC_ORDER_BY\") "
+ + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
+ + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
+ + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
+ + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", \"CQ_ENQUEUE_TIME\", "
+ + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
+ + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
+ + "\"CQ_ORDER_BY\" "
+ + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setLong(1, info.id);
+ LOG.debug("Going to execute update <{}> for CQ_ID={}", s, info.id);
+ pStmt.executeUpdate();
- counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix,
- new StringBuilder(), questions, "\"TC_WRITEID\"", false, false);
- writeIdsIter = info.writeIds.iterator();
- } else if (!info.hasUncompactedAborts){
+ s = "DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
+ pStmt = dbConn.prepareStatement(s);
+ pStmt.setLong(1, info.id);
+ LOG.debug("Going to execute update <{}>", s);
+ int updCount = pStmt.executeUpdate();
+ if (updCount != 1) {
+ LOG.error("Unable to delete compaction record: {}. Update count={}", info, updCount);
+ LOG.debug("Going to rollback");
+ dbConn.rollback();
+ }
+ // Remove entries from completed_txn_components as well, so we don't start looking there
+ // again but only up to the highest write ID include in this compaction job.
+ //highestWriteId will be NULL in upgrade scenarios
+ s = "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" WHERE \"CTC_DATABASE\" = ? AND \"CTC_TABLE\" = ?";
+ if (info.partName != null) {
+ s += " AND \"CTC_PARTITION\" = ?";
+ }
if (info.highestWriteId != 0) {
- s += " AND \"TC_WRITEID\" <= ?";
+ s += " AND \"CTC_WRITEID\" <= ?";
}
- queries.add(s);
- }
-
- for (int i = 0; i < queries.size(); i++) {
- String query = queries.get(i);
- int writeIdCount = (counts != null) ? counts.get(i) : 0;
-
- LOG.debug("Going to execute update <{}>", query);
- pStmt = dbConn.prepareStatement(query);
- paramCount = 1;
-
+ pStmt = dbConn.prepareStatement(s);
+ int paramCount = 1;
pStmt.setString(paramCount++, info.dbname);
pStmt.setString(paramCount++, info.tableName);
if (info.partName != null) {
pStmt.setString(paramCount++, info.partName);
}
- if (info.highestWriteId != 0 && writeIdCount == 0) {
+ if (info.highestWriteId != 0) {
pStmt.setLong(paramCount, info.highestWriteId);
}
- for (int j = 0; j < writeIdCount; j++) {
- if (writeIdsIter.hasNext()) {
- pStmt.setLong(paramCount + j, writeIdsIter.next());
- }
+ LOG.debug("Going to execute update <{}>", s);
+ if ((updCount = pStmt.executeUpdate()) < 1) {
+ LOG.warn("Expected to remove at least one row from completed_txn_components when " +
+ "marking compaction entry as clean!");
}
- int rc = pStmt.executeUpdate();
- LOG.debug("Removed {} records from txn_components", rc);
- // Don't bother cleaning from the txns table. A separate call will do that. We don't
- // know here which txns still have components from other tables or partitions in the
- // table, so we don't know which ones we can and cannot clean.
+ LOG.debug("Removed {} records from completed_txn_components", updCount);
}
+
+ // Do cleanup of metadata in TXN_COMPONENTS table.
+ removeTxnComponents(dbConn, info);
LOG.debug("Going to commit");
dbConn.commit();
} catch (SQLException e) {
LOG.error("Unable to delete from compaction queue " + e.getMessage());
LOG.debug("Going to rollback");
rollbackDBConn(dbConn);
- checkRetryable(e, "markCleaned(" + info + ")");
+ checkRetryable(e, "markCleaned(" + info + "," + isAbortOnly + ")");
throw new MetaException("Unable to connect to transaction database " +
e.getMessage());
} finally {
close(rs, pStmt, dbConn);
}
} catch (RetryException e) {
- markCleaned(info);
+ markCleaned(info, isAbortOnly);
+ }
+ }
+
+ private void removeTxnComponents(Connection dbConn, CompactionInfo info) throws MetaException, RetryException {
+ PreparedStatement pStmt = null;
+ ResultSet rs = null;
+ try {
+ /*
+ * compaction may remove data from aborted txns above tc_writeid bit it only guarantees to
+ * remove it up to (inclusive) tc_writeid, so it's critical to not remove metadata about
+ * aborted TXN_COMPONENTS above tc_writeid (and consequently about aborted txns).
+ * See {@link ql.txn.compactor.Cleaner.removeFiles()}
+ */
+ String s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
+ + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED + ") "
+ + "AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? "
+ + "AND \"TC_PARTITION\" " + (info.partName != null ? "= ?" : "IS NULL");
+
+ List<String> queries = new ArrayList<>();
+ Iterator<Long> writeIdsIter = null;
+ List<Integer> counts = null;
+
+ if (info.writeIds != null && !info.writeIds.isEmpty()) {
+ StringBuilder prefix = new StringBuilder(s).append(" AND ");
+ List<String> questions = Collections.nCopies(info.writeIds.size(), "?");
+
+ counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix,
+ new StringBuilder(), questions, "\"TC_WRITEID\"", false, false);
+ writeIdsIter = info.writeIds.iterator();
+ } else if (!info.hasUncompactedAborts) {
+ if (info.highestWriteId != 0) {
+ s += " AND \"TC_WRITEID\" <= ?";
+ }
+ queries.add(s);
+ }
+
+ for (int i = 0; i < queries.size(); i++) {
+ String query = queries.get(i);
+ int writeIdCount = (counts != null) ? counts.get(i) : 0;
+
+ LOG.debug("Going to execute update <{}>", query);
+ pStmt = dbConn.prepareStatement(query);
+ int paramCount = 1;
+
+ pStmt.setString(paramCount++, info.dbname);
+ pStmt.setString(paramCount++, info.tableName);
+ if (info.partName != null) {
+ pStmt.setString(paramCount++, info.partName);
+ }
+ if (info.highestWriteId != 0 && writeIdCount == 0) {
+ pStmt.setLong(paramCount, info.highestWriteId);
+ }
+ for (int j = 0; j < writeIdCount; j++) {
+ if (writeIdsIter.hasNext()) {
+ pStmt.setLong(paramCount + j, writeIdsIter.next());
+ }
+ }
+ int rc = pStmt.executeUpdate();
+ LOG.debug("Removed {} records from txn_components", rc);
+ }
+ } catch (SQLException e) {
+ LOG.error("Unable to delete from txn components due to {}", e.getMessage());
+ LOG.debug("Going to rollback");
+ rollbackDBConn(dbConn);
+ checkRetryable(e, "markCleanedForAborts(" + info + ")");
+ throw new MetaException("Unable to connect to transaction database " +
+ e.getMessage());
+ } finally {
+ close(rs);
+ closeStmt(pStmt);
}
}
@@ -866,7 +936,7 @@ class CompactionTxnHandler extends TxnHandler {
* The committed txns are left there for TXN_OPENTXN_TIMEOUT window period intentionally.
* The reason such aborted txns exist can be that now work was done in this txn
* (e.g. Streaming opened TransactionBatch and abandoned it w/o doing any work)
- * or due to {@link #markCleaned(CompactionInfo)} being called.
+ * or due to {@link #markCleaned(CompactionInfo, boolean)} being called.
*/
@Override
@RetrySemantics.SafeToRetry
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index 525747716c3..cc8f9d94a26 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -516,6 +516,19 @@ public interface TxnStore extends Configurable {
@RetrySemantics.ReadOnly
List<CompactionInfo> findReadyToClean(long minOpenTxnWaterMark, long retentionTime) throws MetaException;
+ /**
+ * Find the aborted entries in TXN_COMPONENTS which can be used to
+ * clean directories belonging to transactions in aborted state.
+ * @param abortedTimeThreshold Age of table/partition's oldest aborted transaction involving a given table
+ * or partition that will trigger cleanup.
+ * @param abortedThreshold Number of aborted transactions involving a given table or partition
+ * that will trigger cleanup.
+ * @return Information of potential abort items that needs to be cleaned.
+ * @throws MetaException
+ */
+ @RetrySemantics.ReadOnly
+ List<CompactionInfo> findReadyToCleanAborts(long abortedTimeThreshold, int abortedThreshold) throws MetaException;
+
/**
* Sets the cleaning start time for a particular compaction
*
@@ -537,9 +550,10 @@ public interface TxnStore extends Configurable {
* it has been compacted.
*
* @param info info on the compaction entry to remove
+ * @param isAbortOnly whether to cleanup only abort related cleanup information
*/
@RetrySemantics.CannotRetry
- void markCleaned(CompactionInfo info) throws MetaException;
+ void markCleaned(CompactionInfo info, boolean isAbortOnly) throws MetaException;
/**
* Mark a compaction entry as failed. This will move it to the compaction history queue with a
@@ -584,7 +598,7 @@ public interface TxnStore extends Configurable {
/**
* Clean up aborted or committed transactions from txns that have no components in txn_components. The reason such
* txns exist can be that no work was done in this txn (e.g. Streaming opened TransactionBatch and
- * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo)} being called,
+ * abandoned it w/o doing any work) or due to {@link #markCleaned(CompactionInfo, boolean)} being called,
* or the delete from the txns was delayed because of TXN_OPENTXN_TIMEOUT window.
*/
@RetrySemantics.SafeToRetry
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index 31e5b712d21..704956a6f46 100644
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -60,30 +60,45 @@ import static org.apache.hadoop.hive.metastore.TransactionalValidationListener.D
public class TxnUtils {
private static final Logger LOG = LoggerFactory.getLogger(TxnUtils.class);
- public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns, long minOpenTxnGLB) {
- long highWaterMark = minOpenTxnGLB - 1;
- long[] abortedTxns = new long[txns.getOpen_txnsSize()];
+ /**
+ * Returns a valid txn list for cleaner.
+ * @param txns Response containing open txns list.
+ * @param minOpenTxn Minimum open txn which is min open write txn on the table in the case of abort cleanup.
+ * @param isAbortCleanup Whether the request is for abort cleanup.
+ * @return a valid txn list
+ */
+ public static ValidTxnList createValidTxnListForCleaner(GetOpenTxnsResponse txns, long minOpenTxn, boolean isAbortCleanup) {
+ long highWatermark = minOpenTxn - 1;
+ long[] exceptions = new long[txns.getOpen_txnsSize()];
BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
int i = 0;
- for(long txnId : txns.getOpen_txns()) {
- if(txnId > highWaterMark) {
+ for (long txnId : txns.getOpen_txns()) {
+ if (txnId > highWatermark) {
break;
}
- if(abortedBits.get(i)) {
- abortedTxns[i] = txnId;
- }
- else {
- assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" + highWaterMark;
+ if (abortedBits.get(i)) {
+ exceptions[i] = txnId;
+ } else {
+ if (isAbortCleanup) {
+ exceptions[i] = txnId;
+ } else {
+ assert false : JavaUtils.txnIdToString(txnId) + " is open and <= hwm:" + highWatermark;
+ }
}
++i;
}
- abortedTxns = Arrays.copyOf(abortedTxns, i);
- BitSet bitSet = new BitSet(abortedTxns.length);
- bitSet.set(0, abortedTxns.length);
- //add ValidCleanerTxnList? - could be problematic for all the places that read it from
- // string as they'd have to know which object to instantiate
- return new ValidReadTxnList(abortedTxns, bitSet, highWaterMark, Long.MAX_VALUE);
+ exceptions = Arrays.copyOf(exceptions, i);
+ if (!isAbortCleanup) {
+ BitSet bitSet = new BitSet(exceptions.length);
+ bitSet.set(0, exceptions.length);
+ //add ValidCleanerTxnList? - could be problematic for all the places that read it from
+ // string as they'd have to know which object to instantiate
+ return new ValidReadTxnList(exceptions, bitSet, highWatermark, Long.MAX_VALUE);
+ } else {
+ return new ValidReadTxnList(exceptions, abortedBits, highWatermark, Long.MAX_VALUE);
+ }
}
+
/**
* Transform a {@link org.apache.hadoop.hive.metastore.api.TableValidWriteIds} to a
* {@link org.apache.hadoop.hive.common.ValidCompactorWriteIdList}. This assumes that the caller intends to