You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/12/06 10:11:33 UTC
[hive] branch master updated: HIVE-21266: Don't run cleaner if
compaction is skipped (issue with single delta file) (Karen Coppage
reviewed by Laszlo Pinter and Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0c78dcb HIVE-21266: Don't run cleaner if compaction is skipped (issue with single delta file) (Karen Coppage reviewed by Laszlo Pinter and Peter Vary)
0c78dcb is described below
commit 0c78dcb772d5fa83ade1d8938753a6206ebbb495
Author: Karen Coppage <ka...@cloudera.com>
AuthorDate: Fri Dec 6 11:10:40 2019 +0100
HIVE-21266: Don't run cleaner if compaction is skipped (issue with single delta file) (Karen Coppage reviewed by Laszlo Pinter and Peter Vary)
---
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 37 ------------
.../hive/ql/txn/compactor/MajorQueryCompactor.java | 8 ---
.../ql/txn/compactor/MmMajorQueryCompactor.java | 4 --
.../hive/ql/txn/compactor/QueryCompactor.java | 18 ++++++
.../hadoop/hive/ql/txn/compactor/Worker.java | 16 ++++++
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 3 +-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 8 ++-
.../hadoop/hive/ql/txn/compactor/TestWorker.java | 66 +++++++++++++++++++++-
8 files changed, 106 insertions(+), 54 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index ee2c0f3..90dc4a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
-import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -242,10 +241,6 @@ public class CompactorMR {
AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), true,
null, false);
- if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
- return;
- }
-
List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
if (parsedDeltas.size() > maxDeltasToHandle) {
@@ -305,38 +300,6 @@ public class CompactorMR {
su.gatherStats();
}
- private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
- int deltaCount = dir.getCurrentDirectories().size();
- int origCount = dir.getOriginalFiles().size();
-
- StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
- boolean isEnoughToCompact;
-
- if (isMajorCompaction) {
- isEnoughToCompact = (origCount > 0
- || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1);
-
- } else {
- isEnoughToCompact = (deltaCount > 1);
-
- if (deltaCount == 2) {
- Map<String, Long> deltaByType = dir.getCurrentDirectories().stream()
- .collect(Collectors.groupingBy(delta ->
- (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX),
- Collectors.counting()));
-
- isEnoughToCompact = (deltaByType.size() != deltaCount);
- deltaInfo.append(" ").append(deltaByType);
- }
- }
-
- if (!isEnoughToCompact) {
- LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}",
- sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount);
- }
- return isEnoughToCompact;
- }
-
private String generateTmpPath(StorageDescriptor sd) {
return sd.getLocation() + "/" + TMPDIR + "_" + UUID.randomUUID().toString();
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 10681c0..38689ef 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,13 +54,6 @@ class MajorQueryCompactor extends QueryCompactor {
ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException {
AcidUtils
.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
- AcidUtils.Directory dir = AcidUtils
- .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
- table.getParameters(), false);
-
- if (!Util.isEnoughToCompact(true, dir, storageDescriptor)) {
- return;
- }
String user = UserGroupInformation.getCurrentUser().getShortUserName();
SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 27a3ce8..9b84209 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -82,10 +82,6 @@ class MmMajorQueryCompactor extends QueryCompactor {
return;
}
- if (!Util.isEnoughToCompact(compactionInfo.isMajorCompaction(), dir, storageDescriptor)) {
- return;
- }
-
try {
String tmpLocation = Util.generateTmpPath(storageDescriptor);
Path baseLocation = new Path(tmpLocation, "_base");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 80119de..1eab5b8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -95,6 +95,24 @@ abstract class QueryCompactor {
}
/**
+ * Check for obsolete directories, and return true if any exist and Cleaner should be
+ * run. For example if we insert overwrite into a table with only deltas, a new base file with
+ * the highest writeId is created so there will be no live delta directories, only obsolete
+ * ones. Compaction is not needed, but the cleaner should still be run.
+ *
+ * @return true if cleaning is needed
+ */
+ public static boolean needsCleaning(AcidUtils.Directory dir, StorageDescriptor sd) {
+ int numObsoleteDirs = dir.getObsolete().size();
+ boolean needsJustCleaning = numObsoleteDirs > 0;
+ if (needsJustCleaning) {
+ LOG.debug("{} obsolete directories in {} found; marked for cleaning.",
+ numObsoleteDirs, sd.getLocation());
+ }
+ return needsJustCleaning;
+ }
+
+ /**
* Generate a random tmp path, under the provided storage.
* @param sd storage descriptor, must be not null.
* @return path, always not null
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 749cdb6..b149355 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.JavaUtils;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -30,7 +31,9 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hive.common.util.Ref;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -182,6 +185,19 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
jobName.append("-compactor-");
jobName.append(ci.getFullPartitionName());
+ // Don't start compaction or cleaning if not necessary
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
+ tblValidWriteIds, Ref.from(false), true, null, false);
+ if (!QueryCompactor.Util.isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
+ if (QueryCompactor.Util.needsCleaning(dir, sd)) {
+ msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
+ } else {
+ // do nothing
+ msc.markCleaned(CompactionInfo.compactionInfoToStruct(ci));
+ }
+ continue;
+ }
+
LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + " in " + JavaUtils.txnIdToString(compactorTxnId));
final StatsUpdater su = StatsUpdater.init(ci, msc.findColumnsWithStats(
CompactionInfo.compactionInfoToStruct(ci)), conf,
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index c184ce5..88ca683 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -1323,7 +1323,8 @@ public class TestTxnCommands2 {
ShowCompactResponse resp = txnHandler.showCompact(new ShowCompactRequest());
Assert.assertEquals("Unexpected number of compactions in history", 2, resp.getCompactsSize());
Assert.assertEquals("Unexpected 0 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(0).getState());
- Assert.assertEquals("Unexpected 1 compaction state", TxnStore.CLEANING_RESPONSE, resp.getCompacts().get(1).getState());
+ Assert.assertEquals("Unexpected 1 compaction state", TxnStore.SUCCEEDED_RESPONSE,
+ resp.getCompacts().get(1).getState());
}
/**
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 23d860f..c033a94 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -469,12 +469,13 @@ public class TestDbTxnManager2 {
driver.run("insert into temp.T11 values (4, 4)");
driver.run("insert into temp.T12p partition (ds='today', hour='1') values (5, 5)");
driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (6, 6)");
+ driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (13, 13)");
driver.run("insert into temp.T13p partition (ds='today', hour='1') values (7, 7)");
driver.run("insert into temp.T13p partition (ds='tomorrow', hour='2') values (8, 8)");
int count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t10', 't11')");
Assert.assertEquals(4, count);
count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_TXN_COMPONENTS where CTC_DATABASE='temp' and CTC_TABLE in ('t12p', 't13p')");
- Assert.assertEquals(4, count);
+ Assert.assertEquals(5, count);
// Fail some inserts, so that we have records in TXN_COMPONENTS
conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEROLLBACKTXN, true);
@@ -532,7 +533,10 @@ public class TestDbTxnManager2 {
count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPLETED_COMPACTIONS where CC_DATABASE='temp' and CC_TABLE='t12p' and CC_STATE='s' and CC_TYPE='i'");
Assert.assertEquals(1, count);
- // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS
+ // Fail compaction, so that we have failed records in COMPLETED_COMPACTIONS.
+ // Tables need at least 2 delta files to compact, and minor compaction was just run, so insert
+ driver.run("insert into temp.T11 values (14, 14)");
+ driver.run("insert into temp.T12p partition (ds='tomorrow', hour='2') values (15, 15)");
conf.setBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
driver.run("alter table temp.T11 compact 'major'");
count = TxnDbUtil.countQueryAgent(conf, "select count(*) from COMPACTION_QUEUE where CQ_DATABASE='temp' and CQ_TABLE='t11' and CQ_STATE='i' and CQ_TYPE='a'");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 553addb..70ae85c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TransactionalValidationListener;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -32,6 +34,10 @@ import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.TxnInfo;
+import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.junit.After;
import org.junit.Assert;
@@ -47,12 +53,12 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
/**
* Tests for the worker thread and its MR jobs.
@@ -346,10 +352,11 @@ public class TestWorker extends CompactorTest {
startWorker();
+ // since compaction was not run, state should not be "ready for cleaning" but "succeeded"
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(0).getState());
// There should still be 4 directories in the location
FileSystem fs = FileSystem.get(conf);
@@ -1005,6 +1012,61 @@ public class TestWorker extends CompactorTest {
Assert.assertEquals(0, compacts.size());
}
+ @Test
+ public void oneDeltaWithAbortedTxn() throws Exception {
+ Table t = newTable("default", "delta1", false);
+ addDeltaFile(t, null, 0, 2L, 3);
+ Set<Long> aborted = new HashSet<>();
+ aborted.add(1L);
+ burnThroughTransactions("default", "delta1", 3, null, aborted);
+
+ // MR
+ verifyTxn1IsAborted(0, t, CompactionType.MAJOR);
+ verifyTxn1IsAborted(1, t, CompactionType.MINOR);
+
+ // Query-based
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ verifyTxn1IsAborted(2, t, CompactionType.MAJOR);
+ verifyTxn1IsAborted(3, t, CompactionType.MINOR);
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, false);
+
+ // Insert-only
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put(hive_metastoreConstants.TABLE_TRANSACTIONAL_PROPERTIES,
+ TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
+ Table mm = newTable("default", "delta1", false, parameters);
+ addDeltaFile(mm, null, 0, 2L, 3);
+ burnThroughTransactions("default", "delta1", 3, null, aborted);
+ verifyTxn1IsAborted(0, t, CompactionType.MAJOR);
+ verifyTxn1IsAborted(1, t, CompactionType.MINOR);
+ }
+
+ private void verifyTxn1IsAborted(int compactionNum, Table t, CompactionType type)
+ throws Exception {
+ CompactionRequest rqst = new CompactionRequest("default", t.getTableName(), type);
+ txnHandler.compact(rqst);
+ startWorker();
+
+ // Compaction should not have run on a single delta file
+ FileSystem fs = FileSystem.get(conf);
+ FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
+ Assert.assertEquals(1, stat.length);
+ Assert.assertEquals(makeDeltaDirName(0, 2), stat[0].getPath().getName());
+
+ // State should not be "ready for cleaning" because we skip cleaning
+ List<ShowCompactResponseElement> compacts =
+ txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+ Assert.assertEquals(compactionNum + 1, compacts.size());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(compactionNum).getState());
+
+ // assert transaction with txnId=1 is still aborted after cleaner is run
+ startCleaner();
+ List<TxnInfo> openTxns =
+ HiveMetaStoreUtils.getHiveMetastoreClient(conf).showTxns().getOpen_txns();
+ Assert.assertEquals(1, openTxns.get(0).getId());
+ Assert.assertEquals(TxnState.ABORTED, openTxns.get(0).getState());
+ }
+
@After
public void tearDown() throws Exception {
compactorTestCleanup();