You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/02/25 13:03:17 UTC
[hive] branch master updated: HIVE-22863: Commit compaction txn if
it is opened but compaction is skipped (Karen Coppage via Laszlo Pinter)
This is an automated email from the ASF dual-hosted git repository.
lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new bbdf4c3 HIVE-22863: Commit compaction txn if it is opened but compaction is skipped (Karen Coppage via Laszlo Pinter)
bbdf4c3 is described below
commit bbdf4c32a4ce9e084aced0b26be2d843e695bf20
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Tue Feb 25 13:56:37 2020 +0100
HIVE-22863: Commit compaction txn if it is opened but compaction is skipped (Karen Coppage via Laszlo Pinter)
---
.../hadoop/hive/ql/txn/compactor/Worker.java | 22 +++++++++--
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 43 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 3 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 90c699a..2023292 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -68,6 +68,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
static final private String CLASS_NAME = Worker.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
static final private long SLEEP_TIME = 10000;
+ private static final int NOT_SET = -1;
private String workerName;
private JobConf mrJob; // the MR job for compaction
@@ -96,6 +97,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
// so wrap it in a big catch Throwable statement.
CompactionHeartbeater heartbeater = null;
CompactionInfo ci = null;
+ long compactorTxnId = NOT_SET;
try {
if (msc == null) {
msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
@@ -165,7 +167,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
* multiple statements in it (for query based compactor) which is not supported (and since
* this case some of the statements are DDL, even in the future will not be allowed in a
* multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
- long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
+ compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
heartbeater.start();
@@ -229,7 +231,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
}
heartbeater.cancel();
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
- msc.commitTxn(compactorTxnId);
if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
mrJob = mr.getMrJob();
}
@@ -265,7 +266,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " +
StringUtils.stringifyException(t));
} finally {
- if(heartbeater != null) {
+ commitTxn(compactorTxnId);
+ if (heartbeater != null) {
heartbeater.cancel();
}
}
@@ -282,6 +284,20 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
} while (!stop.get());
}
+ private void commitTxn(long compactorTxnId) {
+ if (compactorTxnId != NOT_SET) {
+ try {
+ if (msc != null) {
+ msc.commitTxn(compactorTxnId);
+ }
+ } catch (TException e) {
+ LOG.error(
+ "Caught an exception while committing compaction in worker " + workerName + ", "
+ + StringUtils.stringifyException(e));
+ }
+ }
+ }
+
@Override
public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
super.init(stop, looped);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 4f4bca2..51b0fa3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -469,4 +469,47 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
//now the aborted compactor txn is gone
Assert.assertEquals(openResp.toString(), 0, openResp.getOpen_txnsSize());
}
+
+ /**
+ * Not enough deltas to compact, no need to clean: there is absolutely nothing to do.
+ */
+ @Test public void testNotEnoughToCompact() throws Exception {
+ int[][] tableData = {{1, 2}, {3, 4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
+
+ runWorker(hiveConf);
+ assertTableIsEmpty("TXNS");
+ assertTableIsEmpty("TXN_COMPONENTS");
+
+ runCleaner(hiveConf);
+ assertTableIsEmpty("TXNS");
+ assertTableIsEmpty("TXN_COMPONENTS");
+ }
+
+ /**
+ * There aren't enough deltas to compact, but cleaning is needed because an insert overwrite
+ * was executed.
+ */
+ @Test public void testNotEnoughToCompactNeedsCleaning() throws Exception {
+ int[][] tableData = {{1, 2}, {3, 4}};
+ runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+ runStatementOnDriver(
+ "insert overwrite table " + Table.ACIDTBL + " " + makeValuesClause(tableData));
+
+ runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
+
+ runWorker(hiveConf);
+ assertTableIsEmpty("TXNS");
+ assertTableIsEmpty("TXN_COMPONENTS");
+
+ runCleaner(hiveConf);
+ assertTableIsEmpty("TXNS");
+ assertTableIsEmpty("TXN_COMPONENTS");
+ }
+
+ private void assertTableIsEmpty(String table) throws Exception {
+ Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0,
+ TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table));
+ }
}