You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/12/06 10:11:33 UTC

[hive] branch master updated: HIVE-21266: Don't run cleaner if compaction is skipped (issue with single delta file) (Karen Coppage reviewed by Laszlo Pinter and Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c78dcb  HIVE-21266: Don't run cleaner if compaction is skipped (issue with single delta file) (Karen Coppage reviewed by Laszlo Pinter and Peter Vary)
0c78dcb is described below

commit 0c78dcb772d5fa83ade1d8938753a6206ebbb495
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Fri Dec 6 11:10:40 2019 +0100

    HIVE-21266: Don't run cleaner if compaction is skipped (issue with single delta file) (Karen Coppage reviewed by Laszlo Pinter and Peter Vary)
---
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  | 37 ------------
 .../hive/ql/txn/compactor/MajorQueryCompactor.java |  8 ---
 .../ql/txn/compactor/MmMajorQueryCompactor.java    |  4 --
 .../hive/ql/txn/compactor/QueryCompactor.java      | 18 ++++++
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 16 ++++++
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  3 +-
 .../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java  |  8 ++-
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   | 66 +++++++++++++++++++++-
 8 files changed, 106 insertions(+), 54 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index ee2c0f3..90dc4a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.regex.Matcher;
-import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -242,10 +241,6 @@ public class CompactorMR {
     AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), true,
         null, false);
 
-    if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
-      return;
-    }
-
     List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
     int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
     if (parsedDeltas.size() > maxDeltasToHandle) {
@@ -305,38 +300,6 @@ public class CompactorMR {
     su.gatherStats();
   }
 
-  private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
-    int deltaCount = dir.getCurrentDirectories().size();
-    int origCount = dir.getOriginalFiles().size();
-
-    StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
-    boolean isEnoughToCompact;
-
-    if (isMajorCompaction) {
-      isEnoughToCompact = (origCount > 0
-          || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1);
-
-    } else {
-      isEnoughToCompact = (deltaCount > 1);
-
-      if (deltaCount == 2) {
-        Map<String, Long> deltaByType = dir.getCurrentDirectories().stream()
-            .collect(Collectors.groupingBy(delta ->
-                    (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX),
-                Collectors.counting()));
-
-        isEnoughToCompact = (deltaByType.size() != deltaCount);
-        deltaInfo.append(" ").append(deltaByType);
-      }
-    }
-
-    if (!isEnoughToCompact) {
-      LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}",
-          sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount);
-    }
-    return isEnoughToCompact;
-  }
-
   private String generateTmpPath(StorageDescriptor sd) {
     return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 10681c0..38689ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,13 +54,6 @@ class MajorQueryCompactor extends QueryCompactor {
       ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
     AcidUtils
         .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
-    AcidUtils.Directory dir = AcidUtils
-        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
-            table.getParameters(), false);
-
-    if (!Util.isEnoughToCompact(true, dir, storageDescriptor)) {
-      return;
-    }
 
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 27a3ce8..9b84209 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -82,10 +82,6 @@ class MmMajorQueryCompactor extends QueryCompactor {
       return;
     }
 
-    if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) {
-      return;
-    }
-
     try {
       String tmpLocation = Util.generateTmpPath(storageDescriptor);
       Path baseLocation = new Path(tmpLocation, "_base");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 80119de..1eab5b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -95,6 +95,24 @@ abstract class QueryCompactor {
     }
 
     /**
+     * Check for obsolete directories, and return true if any exist and Cleaner should be
+     * run. For example if we insert overwrite into a table with only deltas, a new base file with
+     * the highest writeId is created so there will be no live delta directories, only obsolete
+     * ones. Compaction is not needed, but the cleaner should still be run.
+     *
+     * @return true if cleaning is needed
+     */
+    public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor sd) {
+      int numObsoleteDirs = dir.getObsolete().size();
+      boolean needsJustCleaning = numObsoleteDirs > 0;
+      if (needsJustCleaning) {
+        LOG.debug("{} obsolete directories in {} found; marked for cleaning.",
+            numObsoleteDirs, sd.getLocation());
+      }
+      return needsJustCleaning;
+    }
+
+    /**
      * Generate a random tmp path, under the provided storage.
      * @param sd storage descriptor, must be not null.
      * @return path, always not null
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 749cdb6..b149355 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -30,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnType;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.common.util.Ref;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -182,6 +185,19 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
         jobName.append("-compactor-");
         jobName.append(ci.getFullPartitionName());
 
+        // Don't start compaction or cleaning if not necessary
+        AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
+            tblValidWriteIds, Ref.from(false), true, null, false);
+        if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
+          if (QueryCompactor.Util.needsCleaning(dir, sd)) {
+            msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+          } else {
+            // do nothing
+            msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+          }
+          continue;
+        }
+
         LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId));
         final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats(
             CompactionInfo.compactionInfoToStruct(ci)), conf,
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index c184ce5..88ca683 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1323,7 +1323,8 @@ public class TestTxnCommands2 {
     ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
     Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
     Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
-    Assert.assertEquals("Unexpected 1 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState());
+    Assert.assertEquals("Unexpected 1 compaction state", TxnStore.SUCCEEDED_RESPONSE,
+        resp.getCompacts().get(1).getState());
   }
 
   /**
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 23d860f..c033a94 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -469,12 +469,13 @@ public class TestDbTxnManager2 {
     driver.run("insert into temp.T11 values (4, 4)");
     driver.run("insert into temp.T12p partition (ds='today', hour='1') values (5, 5)");
     driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (6, 6)");
+    driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (13, 13)");
     driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)");
     driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)");
     int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')");
     Assert.assertEquals(4, count);
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')");
-    Assert.assertEquals(4, count);
+    Assert.assertEquals(5, count);
 
     // Fail some inserts, so that we have records in TXN_COMPONENTS
     conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
@@ -532,7 +533,10 @@ public class TestDbTxnManager2 {
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'");
     Assert.assertEquals(1, count);
 
-    // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS
+    // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS.
+    // Tables need at least 2 delta files to compact, and minor compaction was just run, so insert
+    driver.run("insert into temp.T11 values (14, 14)");
+    driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (15, 15)");
     conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
     driver.run("alter table temp.T11 compact 'major'");
     count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 553addb..70ae85c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.StringableMap;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -32,6 +34,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -47,12 +53,12 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.BitSet;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Tests for the worker thread and its MR jobs.
@@ -346,10 +352,11 @@ public class TestWorker extends CompactorTest {
 
     startWorker();
 
+    // since compaction was not run, state should not be "ready for cleaning" but "succeeded"
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(1, compacts.size());
-    Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState());
 
     // There should still be 4 directories in the location
     FileSystem fs = FileSystem.get(conf);
@@ -1005,6 +1012,61 @@ public class TestWorker extends CompactorTest {
     Assert.assertEquals(0, compacts.size());
   }
 
+  @Test
+  public void oneDeltaWithAbortedTxn() throws Exception {
+    Table t = newTable("default", "delta1", false);
+    addDeltaFile(t, null, 0, 2L, 3);
+    Set<Long> aborted = new HashSet<>();
+    aborted.add(1L);
+    burnThroughTransactions("default", "delta1", 3, null, aborted);
+
+    // MR
+    verifyTxn1IsAborted(0, t, CompactionType.MAJOR);
+    verifyTxn1IsAborted(1, t, CompactionType.MINOR);
+
+    // Query-based
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    verifyTxn1IsAborted(2, t, CompactionType.MAJOR);
+    verifyTxn1IsAborted(3, t, CompactionType.MINOR);
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, false);
+
+    // Insert-only
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+        TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
+    Table mm = newTable("default", "delta1", false, parameters);
+    addDeltaFile(mm, null, 0, 2L, 3);
+    burnThroughTransactions("default", "delta1", 3, null, aborted);
+    verifyTxn1IsAborted(0, t, CompactionType.MAJOR);
+    verifyTxn1IsAborted(1, t, CompactionType.MINOR);
+  }
+
+  private void verifyTxn1IsAborted(int compactionNum, Table t, CompactionType type)
+      throws Exception {
+    CompactionRequest rqst = new CompactionRequest("default", t.getTableName(), type);
+    txnHandler.compact(rqst);
+    startWorker();
+
+    // Compaction should not have run on a single delta file
+    FileSystem fs = FileSystem.get(conf);
+    FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+    Assert.assertEquals(1, stat.length);
+    Assert.assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName());
+
+    // State should not be "ready for cleaning" because we skip cleaning
+    List<ShowCompactResponseElement> compacts =
+        txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals(compactionNum + 1, compacts.size());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(compactionNum).getState());
+
+    // assert transaction with txnId=1 is still aborted after cleaner is run
+    startCleaner();
+    List<TxnInfo> openTxns =
+        HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+    Assert.assertEquals(1, openTxns.get(0).getId());
+    Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
+  }
+
   @After
   public void tearDown() throws Exception {
     compactorTestCleanup();