You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by lp...@apache.org on 2020/02/25 13:03:17 UTC

[hive] branch master updated: HIVE-22863: Commit compaction txn if it is opened but compaction is skipped (Karen Coppage via Laszlo Pinter)

This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new bbdf4c3  HIVE-22863: Commit compaction txn if it is opened but compaction is skipped (Karen Coppage via Laszlo Pinter)
bbdf4c3 is described below

commit bbdf4c32a4ce9e084aced0b26be2d843e695bf20
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Tue Feb 25 13:56:37 2020 +0100

    HIVE-22863: Commit compaction txn if it is opened but compaction is skipped (Karen Coppage via Laszlo Pinter)
---
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 22 +++++++++--
 .../apache/hadoop/hive/ql/TestTxnCommands3.java    | 43 ++++++++++++++++++++++
 2 files changed, 62 insertions(+), 3 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 90c699a..2023292 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -68,6 +68,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
   static final private String CLASS_NAME = Worker.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   static final private long SLEEP_TIME = 10000;
+  private static final int NOT_SET = -1;
 
   private String workerName;
   private JobConf mrJob; // the MR job for compaction
@@ -96,6 +97,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
       // so wrap it in a big catch Throwable statement.
       CompactionHeartbeater heartbeater = null;
       CompactionInfo ci = null;
+      long compactorTxnId = NOT_SET;
       try {
         if (msc == null) {
           msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
@@ -165,7 +167,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
          * multiple statements in it (for query based compactor) which is not supported (and since
          * this case some of the statements are DDL, even in the future will not be allowed in a
          * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, long)} */
-        long compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
+        compactorTxnId = msc.openTxn(ci.runAs, TxnType.COMPACTION);
 
         heartbeater = new CompactionHeartbeater(compactorTxnId, fullTableName, conf);
         heartbeater.start();
@@ -229,7 +231,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
           }
           heartbeater.cancel();
           msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
-          msc.commitTxn(compactorTxnId);
           if (conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
             mrJob = mr.getMrJob();
           }
@@ -265,7 +266,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
         LOG.error("Caught an exception in the main loop of compactor worker " + workerName + ", " +
             StringUtils.stringifyException(t));
       } finally {
-        if(heartbeater != null) {
+        commitTxn(compactorTxnId);
+        if (heartbeater != null) {
           heartbeater.cancel();
         }
       }
@@ -282,6 +284,20 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
     } while (!stop.get());
   }
 
+  private void commitTxn(long compactorTxnId) {
+    if (compactorTxnId != NOT_SET) {
+      try {
+        if (msc != null) {
+          msc.commitTxn(compactorTxnId);
+        }
+      } catch (TException e) {
+        LOG.error(
+            "Caught an exception while committing compaction in worker " + workerName + ", "
+                + StringUtils.stringifyException(e));
+      }
+    }
+  }
+
   @Override
   public void init(AtomicBoolean stop, AtomicBoolean looped) throws Exception {
     super.init(stop, looped);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index 4f4bca2..51b0fa3 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -469,4 +469,47 @@ public class TestTxnCommands3 extends TxnCommandsBaseForTests {
     //now the aborted compactor txn is gone
     Assert.assertEquals(openResp.toString(), 0, openResp.getOpen_txnsSize());
   }
+
+  /**
+   * Not enough deltas to compact, no need to clean: there is absolutely nothing to do.
+   */
+  @Test public void testNotEnoughToCompact() throws Exception {
+    int[][] tableData = {{1, 2}, {3, 4}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+    runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
+
+    runWorker(hiveConf);
+    assertTableIsEmpty("TXNS");
+    assertTableIsEmpty("TXN_COMPONENTS");
+
+    runCleaner(hiveConf);
+    assertTableIsEmpty("TXNS");
+    assertTableIsEmpty("TXN_COMPONENTS");
+  }
+
+  /**
+   * There aren't enough deltas to compact, but cleaning is needed because an insert overwrite
+   * was executed.
+   */
+  @Test public void testNotEnoughToCompactNeedsCleaning() throws Exception {
+    int[][] tableData = {{1, 2}, {3, 4}};
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData));
+    runStatementOnDriver(
+        "insert overwrite table " + Table.ACIDTBL + " " + makeValuesClause(tableData));
+
+    runStatementOnDriver("alter table " + TestTxnCommands2.Table.ACIDTBL + " compact 'MAJOR'");
+
+    runWorker(hiveConf);
+    assertTableIsEmpty("TXNS");
+    assertTableIsEmpty("TXN_COMPONENTS");
+
+    runCleaner(hiveConf);
+    assertTableIsEmpty("TXNS");
+    assertTableIsEmpty("TXN_COMPONENTS");
+  }
+
+  private void assertTableIsEmpty(String table) throws Exception {
+    Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from " + table), 0,
+        TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from " + table));
+  }
 }