You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by dk...@apache.org on 2022/03/28 12:40:04 UTC

[hive] branch master updated: HIVE-25977: Addendum (Denys Kuzmenko, reviewed by Karen Coppage)

This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 969b450  HIVE-25977: Addendum (Denys Kuzmenko, reviewed by Karen Coppage)
969b450 is described below

commit 969b450fdf5c0f2c00f67355bf26ee0f22cdede8
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Mon Mar 28 14:39:44 2022 +0200

    HIVE-25977: Addendum (Denys Kuzmenko, reviewed by Karen Coppage)
    
    Closes #3102
---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |   5 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      | 151 +++++++++------------
 .../hive/ql/txn/compactor/CompactorTest.java       |   4 +-
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  |  19 +--
 4 files changed, 78 insertions(+), 101 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 018c3e8..a03ef01 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1323,7 +1323,7 @@ public class AcidUtils {
    * @return the state of the directory
    * @throws IOException on filesystem errors
    */
-  private static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
+  public static AcidDirectory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
       ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles, Map<Path,
       HdfsDirSnapshot> dirSnapshots) throws IOException {
     ValidTxnList validTxnList = getValidTxnList(conf);
@@ -2529,6 +2529,9 @@ public class AcidUtils {
         LOG.debug("isRawFormat() called on " + dataFile + " which is not an ORC file: " +
             ex.getMessage());
         return true;
+      } catch (FileNotFoundException ex) {
+        //Fallback in case file was already removed and used Snapshot is outdated
+        return false;
       }
     }
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index faef9b5..11652d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
+import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -32,7 +33,6 @@ import org.apache.hadoop.hive.metastore.api.NoSuchLockException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.api.TxnAbortedException;
 import org.apache.hadoop.hive.metastore.api.TxnOpenException;
@@ -51,7 +51,6 @@ import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -59,9 +58,6 @@ import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.Ref;
@@ -69,18 +65,19 @@ import org.apache.hive.common.util.Ref;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.BitSet;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.Callable;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
+import static org.apache.commons.collections4.ListUtils.subtract;
 import static org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
@@ -158,14 +155,12 @@ public class Cleaner extends MetaStoreCompactorThread {
             // when min_history_level is finally dropped, than every HMS will commit compaction the new way
             // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used instead.
             for (CompactionInfo compactionInfo : readyToClean) {
-              String tableName = compactionInfo.getFullTableName();
-              String partition = compactionInfo.getFullPartitionName();
               CompletableFuture<Void> asyncJob =
                   CompletableFuture.runAsync(
                           ThrowingRunnable.unchecked(() -> clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
                           cleanerExecutor)
                       .exceptionally(t -> {
-                        LOG.error("Error during the cleaning the table {} / partition {}", tableName, partition, t);
+                        LOG.error("Error clearing {}", compactionInfo.getFullPartitionName(), t);
                         return null;
                       });
               cleanerList.add(asyncJob);
@@ -423,7 +418,40 @@ public class Cleaner extends MetaStoreCompactorThread {
       throws IOException, NoSuchObjectException, MetaException {
     Path path = new Path(location);
     FileSystem fs = path.getFileSystem(conf);
-    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false);
+    
+    // Collect all of the files/dirs
+    Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = AcidUtils.getHdfsDirSnapshots(fs, path);
+    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, Ref.from(false), false, 
+        dirSnapshots);
+    Table table = resolveTable(ci);
+    boolean isDynPartAbort = isDynPartAbort(table, ci);
+    
+    List<Path> obsoleteDirs = getObsoleteDirs(dir, isDynPartAbort);
+    if (isDynPartAbort || dir.hasUncompactedAborts()) {
+      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
+    }
+    List<Path> deleted = remove(location, ci, obsoleteDirs, true, fs);
+    if (dir.getObsolete().size() > 0) {
+      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
+          txnHandler);
+    }
+    // Make sure there are no leftovers below the compacted watermark
+    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
+    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
+        ci.getFullTableName(), new long[0], new BitSet(), ci.highestWriteId, Long.MAX_VALUE),
+      Ref.from(false), false, dirSnapshots);
+    
+    List<Path> remained = subtract(getObsoleteDirs(dir, isDynPartAbort), deleted);
+    if (!remained.isEmpty()) {
+      LOG.warn(idWatermark(ci) + " Remained " + remained.size() +
+        " obsolete directories from " + location + ". " + getDebugInfo(remained));
+      return false;
+    }
+    LOG.debug(idWatermark(ci) + " All cleared below the watermark: " + ci.highestWriteId + " from " + location);
+    return true;
+  }
+  
+  private List<Path> getObsoleteDirs(AcidDirectory dir, boolean isDynPartAbort) {
     List<Path> obsoleteDirs = dir.getObsolete();
     /**
      * add anything in 'dir'  that only has data from aborted transactions - no one should be
@@ -434,105 +462,50 @@ public class Cleaner extends MetaStoreCompactorThread {
      * txns with write IDs > {@link CompactionInfo#highestWriteId}.
      * See {@link TxnStore#markCleaned(CompactionInfo)}
      */
-    Table table = getMSForConf(conf).getTable(getDefaultCatalog(conf), ci.dbname, ci.tableName);
-    if (isDynPartAbort(table, ci) || dir.hasUncompactedAborts()) {
-      ci.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
-    }
     obsoleteDirs.addAll(dir.getAbortedDirectories());
-    if (isDynPartAbort(table, ci)) {
+    if (isDynPartAbort) {
       // In the event of an aborted DP operation, we should only consider the aborted directories for cleanup.
       // Including obsolete directories for partitioned tables can result in data loss.
       obsoleteDirs = dir.getAbortedDirectories();
     }
-
-    if (obsoleteDirs.isEmpty()
-        && !hasDataBelowWatermark(dir, fs, path, ci.highestWriteId, writeIdList.getHighWatermark())) {
-      LOG.info(idWatermark(ci) + " nothing to remove below watermark " + ci.highestWriteId + ", ");
-      return true;
-    }
-    StringBuilder extraDebugInfo = new StringBuilder("[").append(obsoleteDirs.stream()
-        .map(Path::getName).collect(Collectors.joining(",")));
-    boolean success = remove(location, ci, obsoleteDirs, true, fs, extraDebugInfo);
-    if (dir.getObsolete().size() > 0) {
-      AcidMetricService.updateMetricsFromCleaner(ci.dbname, ci.tableName, ci.partName, dir.getObsolete(), conf,
-          txnHandler);
-    }
-    return success;
-  }
-
-  private boolean hasDataBelowWatermark(AcidDirectory acidDir, FileSystem fs, Path path, long highWatermark,
-      long minOpenTxn)
-      throws IOException {
-    Set<Path> acidPaths = new HashSet<>();
-    for (ParsedDelta delta : acidDir.getCurrentDirectories()) {
-      acidPaths.add(delta.getPath());
-    }
-    if (acidDir.getBaseDirectory() != null) {
-      acidPaths.add(acidDir.getBaseDirectory());
-    }
-    FileStatus[] children = fs.listStatus(path, p -> {
-      return !acidPaths.contains(p);
-    });
-    for (FileStatus child : children) {
-      if (isFileBelowWatermark(child, highWatermark, minOpenTxn)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean isFileBelowWatermark(FileStatus child, long highWatermark, long minOpenTxn) {
-    Path p = child.getPath();
-    String fn = p.getName();
-    if (!child.isDirectory()) {
-      return true;
-    }
-    if (fn.startsWith(AcidUtils.BASE_PREFIX)) {
-      ParsedBaseLight b = ParsedBaseLight.parseBase(p);
-      return b.getWriteId() <= highWatermark;
-    }
-    if (fn.startsWith(AcidUtils.DELTA_PREFIX) || fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
-      ParsedDeltaLight d = ParsedDeltaLight.parse(p);
-      return d.getMaxWriteId() <= highWatermark;
-    }
-    return false;
+    return obsoleteDirs;
   }
 
   private boolean removeFiles(String location, CompactionInfo ci)
-    throws NoSuchObjectException, IOException, MetaException {
-    Path path = new Path(location);
-    StringBuilder extraDebugInfo = new StringBuilder("[").append(path.getName()).append(",");
-
+      throws NoSuchObjectException, IOException, MetaException {
     String strIfPurge = ci.getProperty("ifPurge");
     boolean ifPurge = strIfPurge != null || Boolean.parseBoolean(ci.getProperty("ifPurge"));
-
-    return remove(location, ci, Collections.singletonList(path), ifPurge,
-      path.getFileSystem(conf), extraDebugInfo);
+    
+    Path path = new Path(location);
+    return !remove(location, ci, Collections.singletonList(path), ifPurge,
+      path.getFileSystem(conf)).isEmpty();
   }
 
-  private boolean remove(String location, CompactionInfo ci, List<Path> filesToDelete, boolean ifPurge,
-      FileSystem fs, StringBuilder extraDebugInfo)
+  private List<Path> remove(String location, CompactionInfo ci, List<Path> paths, boolean ifPurge, FileSystem fs)
       throws NoSuchObjectException, MetaException, IOException {
-
-    extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
-    LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
-         " obsolete directories from " + location + ". " + extraDebugInfo.toString());
-    if (filesToDelete.size() < 1) {
-      LOG.warn("Hmm, nothing to delete in the cleaner for directory " + location +
-          ", that hardly seems right.");
-      return false;
+    List<Path> deleted = new ArrayList<>();
+    if (paths.size() < 1) {
+      return deleted;
     }
+    LOG.info(idWatermark(ci) + " About to remove " + paths.size() +
+      " obsolete directories from " + location + ". " + getDebugInfo(paths));
     Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf), ci.dbname);
     boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
-
-    for (Path dead : filesToDelete) {
+    
+    for (Path dead : paths) {
       LOG.debug("Going to delete path " + dead.toString());
       if (needCmRecycle) {
         replChangeManager.recycle(dead, ReplChangeManager.RecycleType.MOVE, ifPurge);
       }
-      FileUtils.moveToTrash(fs, dead, conf, ifPurge);
+      if (FileUtils.moveToTrash(fs, dead, conf, ifPurge)) {
+        deleted.add(dead);
+      }
     }
-    return true;
+    return deleted;
+  }
+  
+  private String getDebugInfo(List<Path> paths) {
+    return "[" + paths.stream().map(Path::getName).collect(Collectors.joining(",")) + ']';
   }
 
   private static class CleanerCycleUpdater implements Runnable {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 98a042a..d353f20 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -400,9 +400,9 @@ public abstract class CompactorTest {
     Path location = new Path(getLocation(t.getTableName(), partValue));
     String filename = null;
     switch (type) {
-      case BASE: filename = AcidUtils.BASE_PREFIX + maxTxn + (visibilityId > 0 ? AcidUtils.VISIBILITY_PREFIX + visibilityId : ""); break;
+      case BASE: filename = AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxn, visibilityId); break;
       case LENGTH_FILE: // Fall through to delta
-      case DELTA: filename = makeDeltaDirName(minTxn, maxTxn); break;
+      case DELTA: filename = AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn, maxTxn),visibilityId); break;
       case LEGACY: break; // handled below
     }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 79b8348..8b31f06 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -58,6 +58,7 @@ import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEAN
 import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
 import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME;
 import static org.apache.hadoop.hive.metastore.conf.MetastoreConf.getTimeVar;
+import static org.apache.hadoop.hive.ql.io.AcidUtils.addVisibilitySuffix;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.nullable;
 import static org.mockito.Mockito.doAnswer;
@@ -233,7 +234,7 @@ public class TestCleaner extends CompactorTest {
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, null);
     Assert.assertEquals(1, paths.size());
-    Assert.assertEquals("base_25_v26", paths.get(0).getName());
+    Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName());
   }
 
   @Test
@@ -261,7 +262,7 @@ public class TestCleaner extends CompactorTest {
     // Check that the files are removed
     List<Path> paths = getDirectories(conf, t, null);
     Assert.assertEquals(1, paths.size());
-    Assert.assertEquals("base_25_v26", paths.get(0).getName());
+    Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName());
   }
   
   @Test
@@ -323,7 +324,7 @@ public class TestCleaner extends CompactorTest {
     // Check that the files are removed
     paths = getDirectories(conf, t, null);
     Assert.assertEquals(1, paths.size());
-    Assert.assertEquals("base_25_v26", paths.get(0).getName());
+    Assert.assertEquals(addVisibilitySuffix("base_25", 26), paths.get(0).getName());
   }
 
   @Test
@@ -714,7 +715,7 @@ public class TestCleaner extends CompactorTest {
     // Check that the files are removed
     paths = getDirectories(conf, t, p);
     Assert.assertEquals(1, paths.size());
-    Assert.assertEquals("base_23_v25", paths.get(0).getName());
+    Assert.assertEquals(addVisibilitySuffix("base_23", 25), paths.get(0).getName());
   }
 
   @Test
@@ -735,7 +736,7 @@ public class TestCleaner extends CompactorTest {
     burnThroughTransactions(dbName, tblName, 22);
     CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
     rqst.setPartitionname(partName);
-    compactInTxn(rqst);
+    long compactTxn = compactInTxn(rqst);
     addDeltaFile(t, p, 21, 22, 2);
     startCleaner();
 
@@ -751,10 +752,10 @@ public class TestCleaner extends CompactorTest {
     // major compaction
     addDeltaFile(t, p, 23L, 23L, 1);
     addDeltaFile(t, p, 24L, 24L, 1);
-    burnThroughTransactions(dbName, tblName, 2);
+    burnThroughTransactions(dbName, tblName, 2, null, new HashSet<>(Collections.singletonList(compactTxn + 1)));
     rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR);
     rqst.setPartitionname(partName);
-    long compactTxn = compactInTxn(rqst);
+    compactTxn = compactInTxn(rqst);
     addBaseFile(t, p, 24, 24, compactTxn);
     startCleaner();
 
@@ -1071,8 +1072,8 @@ public class TestCleaner extends CompactorTest {
     burnThroughTransactions(dbName, tblName, 22);
 
     // block cleaner with an open txn
-    long blockingTxn = openTxn();
-
+    openTxn();
+    
     CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
     rqst.setPartitionname(partName);
     long ctxnid = compactInTxn(rqst);