You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/03/28 12:40:04 UTC
[hive] branch master updated: HIVE-25977: Addendum (Denys Kuzmenko, reviewed by Karen Coppage)
This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 969b450 HIVE-25977: Addendum (Denys Kuzmenko, reviewed by Karen Coppage)
969b450 is described below
commit 969b450fdf5c0f2c00f67355bf26ee0f22cdede8
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Mon Mar 28 14:39:44 2022 +0200
HIVE-25977: Addendum (Denys Kuzmenko, reviewed by Karen Coppage)
Closes #3102
---
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 5 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 151 +++++++++------------
.../hive/ql/txn/compactor/CompactorTest.java | 4 +-
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 19 +--
4 files changed, 78 insertions(+), 101 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 018c3e8..a03ef01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1323,7 +1323,7 @@ public class AcidUtils {
* @return the state of the directory
* @throws IOException on filesystem errors
*/
- private static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
+ public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles, Map<Path,
HdfsDirSnapshot> dirSnapshots) throws IOException {
ValidTxnList validTxnList = getValidTxnList(conf);
@@ -2529,6 +2529,9 @@ public class AcidUtils {
LOG.debug("isRawFormat() called on " + dataFile + " which is not an ORC file: " +
ex.getMessage());
return true;
+ } catch (FileNotFoundException ex) {
+ //Fallback in case file was already removed and used Snapshot is outdated
+ return false;
}
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index faef9b5..11652d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -32,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
import org.apache.hadoop.hive.metastore.api.TxnOpenException;
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -59,9 +58,6 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
@@ -69,18 +65,19 @@ import org.apache.hive.common.util.Ref;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.BitSet;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
+import static org.apache.commons.collections4.ListUtils.subtract;
import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
@@ -158,14 +155,12 @@ public class Cleaner extends MetaStoreCompactorThread {
// when min_history_level is finally dropped, than every HMS will commit compaction the new way
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
for (CompactionInfo compactionInfo : readyToClean) {
- String tableName = compactionInfo.getFullTableName();
- String partition = compactionInfo.getFullPartitionName();
CompletableFuture<Void> asyncJob =
CompletableFuture.runAsync(
ThrowingRunnable.unchecked(() -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
cleanerExecutor)
.exceptionally(t -> {
- LOG.error("Error during the cleaning the table {} / partition {}", tableName, partition, t);
+ LOG.error("Error clearing {}", compactionInfo.getFullPartitionName(), t);
return null;
});
cleanerList.add(asyncJob);
@@ -423,7 +418,40 @@ public class Cleaner extends MetaStoreCompactorThread {
throws IOException, NoSuchObjectException, MetaException {
Path path = new Path(location);
FileSystem fs = path.getFileSystem(conf);
- AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false);
+
+ // Collect all of the files/dirs
+ Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshots(fs, path);
+ AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false,
+ dirSnapshots);
+ Table table = resolveTable(ci);
+ boolean isDynPartAbort = isDynPartAbort(table, ci);
+
+ List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
+ if (isDynPartAbort || dir.hasUncompactedAborts()) {
+ ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+ }
+ List<Path> deleted = remove(location, ci, obsoleteDirs, true, fs);
+ if (dir.getObsolete().size() > 0) {
+ AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
+ txnHandler);
+ }
+ // Make sure there are no leftovers below the compacted watermark
+ conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+ dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+ ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
+ Ref.from(false), false, dirSnapshots);
+
+ List<Path> remained = subtract(getObsoleteDirs(dir, isDynPartAbort), deleted);
+ if (!remained.isEmpty()) {
+ LOG.warn(idWatermark(ci) + " Remained " + remained.size() +
+ " obsolete directories from " + location + ". " + getDebugInfo(remained));
+ return false;
+ }
+ LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + ci.highestWriteId + " from " + location);
+ return true;
+ }
+
+ private List<Path> getObsoleteDirs(AcidDirectory dir, boolean isDynPartAbort) {
List<Path> obsoleteDirs = dir.getObsolete();
/**
* add anything in 'dir' that only has data from aborted transactions - no one should be
@@ -434,105 +462,50 @@ public class Cleaner extends MetaStoreCompactorThread {
* txns with write IDs > {@link CompactionInfo#highestWriteId}.
* See {@link TxnStore#markCleaned(CompactionInfo)}
*/
- Table table = getMSForConf(conf).getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
- if (isDynPartAbort(table, ci) || dir.hasUncompactedAborts()) {
- ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
- }
obsoleteDirs.addAll(dir.getAbortedDirectories());
- if (isDynPartAbort(table, ci)) {
+ if (isDynPartAbort) {
// In the event of an aborted DP operation, we should only consider the aborted directories for cleanup.
// Including obsolete directories for partitioned tables can result in data loss.
obsoleteDirs = dir.getAbortedDirectories();
}
-
- if (obsoleteDirs.isEmpty()
- && !hasDataBelowWatermark(dir, fs, path, ci.highestWriteId, writeIdList.getHighWatermark())) {
- LOG.info(idWatermark(ci) + " nothing to remove below watermark " + ci.highestWriteId + ", ");
- return true;
- }
- StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
- .map(Path::getName).collect(Collectors.joining(",")));
- boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
- if (dir.getObsolete().size() > 0) {
- AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
- txnHandler);
- }
- return success;
- }
-
- private boolean hasDataBelowWatermark(AcidDirectory acidDir, FileSystem fs, Path path, long highWatermark,
- long minOpenTxn)
- throws IOException {
- Set<Path> acidPaths = new HashSet<>();
- for (ParsedDelta delta : acidDir.getCurrentDirectories()) {
- acidPaths.add(delta.getPath());
- }
- if (acidDir.getBaseDirectory() != null) {
- acidPaths.add(acidDir.getBaseDirectory());
- }
- FileStatus[] children = fs.listStatus(path, p -> {
- return !acidPaths.contains(p);
- });
- for (FileStatus child : children) {
- if (isFileBelowWatermark(child, highWatermark, minOpenTxn)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean isFileBelowWatermark(FileStatus child, long highWatermark, long minOpenTxn) {
- Path p = child.getPath();
- String fn = p.getName();
- if (!child.isDirectory()) {
- return true;
- }
- if (fn.startsWith(AcidUtils.BASE_PREFIX)) {
- ParsedBaseLight b = ParsedBaseLight.parseBase(p);
- return b.getWriteId() <= highWatermark;
- }
- if (fn.startsWith(AcidUtils.DELTA_PREFIX) || fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
- ParsedDeltaLight d = ParsedDeltaLight.parse(p);
- return d.getMaxWriteId() <= highWatermark;
- }
- return false;
+ return obsoleteDirs;
}
private boolean removeFiles(String location, CompactionInfo ci)
- throws NoSuchObjectException, IOException, MetaException {
- Path path = new Path(location);
- StringBuilder extraDebugInfo = new StringBuilder("[").append(path.getName()).append(",");
-
+ throws NoSuchObjectException, IOException, MetaException {
String strIfPurge = ci.getProperty("ifPurge");
boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
-
- return remove(location, ci, Collections.singletonList(path), ifPurge,
- path.getFileSystem(conf), extraDebugInfo);
+
+ Path path = new Path(location);
+ return !remove(location, ci, Collections.singletonList(path), ifPurge,
+ path.getFileSystem(conf)).isEmpty();
}
- private boolean remove(String location, CompactionInfo ci, List<Path> filesToDelete, boolean ifPurge,
- FileSystem fs, StringBuilder extraDebugInfo)
+ private List<Path> remove(String location, CompactionInfo ci, List<Path> paths, boolean ifPurge, FileSystem fs)
throws NoSuchObjectException, MetaException, IOException {
-
- extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
- LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
- " obsolete directories from " + location + ". " + extraDebugInfo.toString());
- if (filesToDelete.size() < 1) {
- LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location +
- ", that hardly seems right.");
- return false;
+ List<Path> deleted = new ArrayList<>();
+ if (paths.size() < 1) {
+ return deleted;
}
+ LOG.info(idWatermark(ci) + " About to remove " + paths.size() +
+ " obsolete directories from " + location + ". " + getDebugInfo(paths));
Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
-
- for (Path dead : filesToDelete) {
+
+ for (Path dead : paths) {
LOG.debug("Going to delete path " + dead.toString());
if (needCmRecycle) {
replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, ifPurge);
}
- FileUtils.moveToTrash(fs, dead, conf, ifPurge);
+ if (FileUtils.moveToTrash(fs, dead, conf, ifPurge)) {
+ deleted.add(dead);
+ }
}
- return true;
+ return deleted;
+ }
+
+ private String getDebugInfo(List<Path> paths) {
+ return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
}
private static class CleanerCycleUpdater implements Runnable {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 98a042a..d353f20 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -400,9 +400,9 @@ public abstract class CompactorTest {
Path location = new Path(getLocation(t.getTableName(), partValue));
String filename = null;
switch (type) {
- case BASE: filename = AcidUtils.BASE_PREFIX + maxTxn + (visibilityId > 0 ? AcidUtils.VISIBILITY_PREFIX + visibilityId : ""); break;
+ case BASE: filename = AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxn, visibilityId); break;
case LENGTH_FILE: // Fall through to delta
- case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
+ case DELTA: filename = AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn, maxTxn),visibilityId); break;
case LEGACY: break; // handled below
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 79b8348..8b31f06 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -58,6 +58,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEAN
import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
@@ -233,7 +234,7 @@ public class TestCleaner extends CompactorTest {
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, null);
Assert.assertEquals(1, paths.size());
- Assert.assertEquals("base_25_v26", paths.get(0).getName());
+ Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName());
}
@Test
@@ -261,7 +262,7 @@ public class TestCleaner extends CompactorTest {
// Check that the files are removed
List<Path> paths = getDirectories(conf, t, null);
Assert.assertEquals(1, paths.size());
- Assert.assertEquals("base_25_v26", paths.get(0).getName());
+ Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName());
}
@Test
@@ -323,7 +324,7 @@ public class TestCleaner extends CompactorTest {
// Check that the files are removed
paths = getDirectories(conf, t, null);
Assert.assertEquals(1, paths.size());
- Assert.assertEquals("base_25_v26", paths.get(0).getName());
+ Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName());
}
@Test
@@ -714,7 +715,7 @@ public class TestCleaner extends CompactorTest {
// Check that the files are removed
paths = getDirectories(conf, t, p);
Assert.assertEquals(1, paths.size());
- Assert.assertEquals("base_23_v25", paths.get(0).getName());
+ Assert.assertEquals(addVisibilitySuffix("base_23", 25), paths.get(0).getName());
}
@Test
@@ -735,7 +736,7 @@ public class TestCleaner extends CompactorTest {
burnThroughTransactions(dbName, tblName, 22);
CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
rqst.setPartitionname(partName);
- compactInTxn(rqst);
+ long compactTxn = compactInTxn(rqst);
addDeltaFile(t, p, 21, 22, 2);
startCleaner();
@@ -751,10 +752,10 @@ public class TestCleaner extends CompactorTest {
// major compaction
addDeltaFile(t, p, 23L, 23L, 1);
addDeltaFile(t, p, 24L, 24L, 1);
- burnThroughTransactions(dbName, tblName, 2);
+ burnThroughTransactions(dbName, tblName, 2, null, new HashSet<>(Collections.singletonList(compactTxn + 1)));
rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
rqst.setPartitionname(partName);
- long compactTxn = compactInTxn(rqst);
+ compactTxn = compactInTxn(rqst);
addBaseFile(t, p, 24, 24, compactTxn);
startCleaner();
@@ -1071,8 +1072,8 @@ public class TestCleaner extends CompactorTest {
burnThroughTransactions(dbName, tblName, 22);
// block cleaner with an open txn
- long blockingTxn = openTxn();
-
+ openTxn();
+
CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
rqst.setPartitionname(partName);
long ctxnid = compactInTxn(rqst);