You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2019/04/09 08:59:49 UTC

[hive] branch master updated: HIVE-9995: ACID compaction tries to compact a single file (Denys Kuzmenko, reviewed by Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

szita pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new cfe90d5  HIVE-9995: ACID compaction tries to compact a single file (Denys Kuzmenko, reviewed by Peter Vary)
cfe90d5 is described below

commit cfe90d57d9a9c3d3c2f26fcbd7254e040833362e
Author: Denys Kuzmenko <dk...@cloudera.com>
AuthorDate: Tue Apr 9 10:53:12 2019 +0200

    HIVE-9995: ACID compaction tries to compact a single file (Denys Kuzmenko, reviewed by Peter Vary)
---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    | 92 ++++++++++++----------
 .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java  |  4 +-
 .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java |  2 +-
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  | 88 +++++++++++++--------
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |  2 +-
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   | 18 ++---
 6 files changed, 116 insertions(+), 90 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index af8743d..24fc0d5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -723,7 +723,10 @@ public class AcidUtils {
     }
   }
 
-  public static interface Directory {
+  /**
+   * Interface used to provide ACID directory information.
+   */
+  public interface Directory {
 
     /**
      * Get the base directory.
@@ -1080,12 +1083,15 @@ public class AcidUtils {
 
   /**
    * Is the given directory in ACID format?
-   * @param fileSystem file system instance
    * @param directory the partition directory to check
    * @param conf the query configuration
    * @return true, if it is an ACID directory
    * @throws IOException
    */
+  public static boolean isAcid(Path directory, Configuration conf) throws IOException {
+    return isAcid(null, directory, conf);
+  }
+
   public static boolean isAcid(FileSystem fileSystem, Path directory,
                                Configuration conf) throws IOException {
     FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem;
@@ -1105,9 +1111,8 @@ public class AcidUtils {
   @VisibleForTesting
   public static Directory getAcidState(Path directory,
       Configuration conf,
-      ValidWriteIdList writeIdList
-      ) throws IOException {
-    return getAcidState(null, directory, conf, writeIdList, false, false);
+      ValidWriteIdList writeIdList) throws IOException {
+    return getAcidState(directory, conf, writeIdList, false, false);
   }
 
   /** State class for getChildState; cannot modify 2 things in a method. */
@@ -1123,24 +1128,35 @@ public class AcidUtils {
    * base and diff directories. Note that because major compactions don't
    * preserve the history, we can't use a base directory that includes a
    * write id that we must exclude.
-   * @param fileSystem file system instance
    * @param directory the partition directory to analyze
    * @param conf the configuration
    * @param writeIdList the list of write ids that we are reading
    * @return the state of the directory
    * @throws IOException
    */
-  public static Directory getAcidState(FileSystem fileSystem, Path directory,
-                                       Configuration conf,
+  public static Directory getAcidState(Path directory, Configuration conf,
+                                       ValidWriteIdList writeIdList,
+                                       boolean useFileIds,
+                                       boolean ignoreEmptyFiles) throws IOException {
+    return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null);
+  }
+
+  public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf,
                                        ValidWriteIdList writeIdList,
                                        boolean useFileIds,
-                                       boolean ignoreEmptyFiles
-                                       ) throws IOException {
+                                       boolean ignoreEmptyFiles) throws IOException {
     return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null);
   }
 
-  public static Directory getAcidState(FileSystem fileSystem, Path directory,
-                                       Configuration conf,
+  public static Directory getAcidState(Path directory, Configuration conf,
+                                       ValidWriteIdList writeIdList,
+                                       Ref<Boolean> useFileIds,
+                                       boolean ignoreEmptyFiles,
+                                       Map<String, String> tblproperties) throws IOException {
+    return getAcidState(null, directory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties);
+  }
+
+  public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf,
                                        ValidWriteIdList writeIdList,
                                        Ref<Boolean> useFileIds,
                                        boolean ignoreEmptyFiles,
@@ -1169,21 +1185,8 @@ public class AcidUtils {
     List<Path> originalDirectories = new ArrayList<>();
     final List<Path> obsolete = new ArrayList<>();
     final List<Path> abortedDirectories = new ArrayList<>();
-    List<HdfsFileStatusWithId> childrenWithId = null;
-    Boolean val = useFileIds.value;
-    if (val == null || val) {
-      try {
-        childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
-        if (val == null) {
-          useFileIds.value = true;
-        }
-      } catch (Throwable t) {
-        LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
-        if (val == null && t instanceof UnsupportedOperationException) {
-          useFileIds.value = false;
-        }
-      }
-    }
+    List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory);
+
     TxnBase bestBase = new TxnBase();
     final List<HdfsFileStatusWithId> original = new ArrayList<>();
     if (childrenWithId != null) {
@@ -1452,21 +1455,7 @@ public class AcidUtils {
   public static void findOriginals(FileSystem fs, Path dir,
       List<HdfsFileStatusWithId> original, Ref<Boolean> useFileIds,
       boolean ignoreEmptyFiles, boolean recursive) throws IOException {
-    List<HdfsFileStatusWithId> childrenWithId = null;
-    Boolean val = useFileIds.value;
-    if (val == null || val) {
-      try {
-        childrenWithId = SHIMS.listLocatedHdfsStatus(fs, dir, hiddenFileFilter);
-        if (val == null) {
-          useFileIds.value = true;
-        }
-      } catch (Throwable t) {
-        LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
-        if (val == null && t instanceof UnsupportedOperationException) {
-          useFileIds.value = false;
-        }
-      }
-    }
+    List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, dir);
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
         if (child.getFileStatus().isDirectory()) {
@@ -1496,6 +1485,25 @@ public class AcidUtils {
     }
   }
 
+  private static List<HdfsFileStatusWithId> tryListLocatedHdfsStatus(Ref<Boolean> useFileIds,
+                                                                     FileSystem fs, Path directory) {
+    Boolean val = useFileIds.value;
+    List<HdfsFileStatusWithId> childrenWithId = null;
+    if (val == null || val) {
+      try {
+        childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
+        if (val == null) {
+          useFileIds.value = true;
+        }
+      } catch (Throwable t) {
+        LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+        if (val == null && t instanceof UnsupportedOperationException) {
+          useFileIds.value = false;
+        }
+      }
+    }
+    return childrenWithId;
+  }
 
   public static boolean isTablePropertyTransactional(Properties props) {
     String resultStr = props.getProperty(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index fbbddba..b1ede05 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -463,7 +463,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          */
         //the split is from something other than the 1st file of the logical bucket - compute offset
         AcidUtils.Directory directoryState
-                = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, validWriteIdList, false,
+                = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false,
           true);
         for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
           int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
@@ -578,7 +578,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       assert options.getOffset() == 0;
       assert options.getMaxOffset() == Long.MAX_VALUE;
       AcidUtils.Directory directoryState
-              = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf, validWriteIdList, false, true);
+              = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true);
       /**
        * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
        * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 18c35f2..15f1f94 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -722,7 +722,7 @@ public class VectorizedOrcAcidRowBatchReader
     int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
         //statementId is from directory name (or 0 if there is none)
       .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
-    AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf,
+    AcidUtils.Directory directoryState = AcidUtils.getAcidState(syntheticTxnInfo.folder, conf,
         validWriteIdList, false, true);
     for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
       int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index e5f3047..aa12ddb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
 import java.util.regex.Matcher;
+import java.util.stream.Collectors;
 
 import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -266,10 +267,16 @@ public class CompactorMR {
     // and discovering that in getSplits is too late as we then have no way to pass it to our
     // mapper.
 
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, false, true);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(
+        new Path(sd.getLocation()), conf, writeIds, false, true);
+
+    if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
+      return;
+    }
+
     List<AcidUtils.ParsedDelta> parsedDeltas = dir.getCurrentDirectories();
-    int maxDeltastoHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
-    if(parsedDeltas.size() > maxDeltastoHandle) {
+    int maxDeltasToHandle = conf.getIntVar(HiveConf.ConfVars.COMPACTOR_MAX_NUM_DELTA);
+    if (parsedDeltas.size() > maxDeltasToHandle) {
       /**
        * if here, that means we have very high number of delta files.  This may be sign of a temporary
        * glitch or a real issue.  For example, if transaction batch size or transaction size is set too
@@ -282,13 +289,13 @@ public class CompactorMR {
         + " located at " + sd.getLocation() + "! This is likely a sign of misconfiguration, " +
         "especially if this message repeats.  Check that compaction is running properly.  Check for any " +
         "runaway/mis-configured process writing to ACID tables, especially using Streaming Ingest API.");
-      int numMinorCompactions = parsedDeltas.size() / maxDeltastoHandle;
-      for(int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
+      int numMinorCompactions = parsedDeltas.size() / maxDeltasToHandle;
+      for (int jobSubId = 0; jobSubId < numMinorCompactions; jobSubId++) {
         JobConf jobMinorCompact = createBaseJobConf(conf, jobName + "_" + jobSubId, t, sd, writeIds, ci);
         launchCompactionJob(jobMinorCompact,
           null, CompactionType.MINOR, null,
-          parsedDeltas.subList(jobSubId * maxDeltastoHandle, (jobSubId + 1) * maxDeltastoHandle),
-          maxDeltastoHandle, -1, conf, msc, ci.id, jobName);
+            parsedDeltas.subList(jobSubId * maxDeltasToHandle, (jobSubId + 1) * maxDeltasToHandle),
+            maxDeltasToHandle, -1, conf, msc, ci.id, jobName);
       }
       //now recompute state since we've done minor compactions and have different 'best' set of deltas
       dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds);
@@ -319,17 +326,6 @@ public class CompactorMR {
         dirsToSearch.add(baseDir);
       }
     }
-    if (parsedDeltas.size() == 0 && dir.getOriginalFiles().size() == 0) {
-      // Skip compaction if there's no delta files AND there's no original files
-      String minOpenInfo = ".";
-      if(writeIds.getMinOpenWriteId() != null) {
-        minOpenInfo = " with min Open " + JavaUtils.writeIdToString(writeIds.getMinOpenWriteId()) +
-          ".  Compaction cannot compact above this writeId";
-      }
-      LOG.error("No delta files or original files found to compact in " + sd.getLocation() +
-        " for compactionId=" + ci.id + minOpenInfo);
-      return;
-    }
 
     launchCompactionJob(job, baseDir, ci.type, dirsToSearch, dir.getCurrentDirectories(),
       dir.getCurrentDirectories().size(), dir.getObsolete().size(), conf, msc, ci.id, jobName);
@@ -345,16 +341,14 @@ public class CompactorMR {
   private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
       CompactionInfo ci) throws IOException {
     AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds,
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds,
       Ref.from(false), false,
         t.getParameters());
-    int deltaCount = dir.getCurrentDirectories().size();
-    int origCount = dir.getOriginalFiles().size();
-    if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) {
-      LOG.debug("Not compacting {}; current base is {} and there are {} deltas and {} originals", sd.getLocation(), dir
-          .getBaseDirectory(), deltaCount, origCount);
+
+    if (!isEnoughToCompact(dir, sd)) {
       return;
     }
+
     String user = UserGroupInformation.getCurrentUser().getShortUserName();
     SessionState sessionState = DriverUtils.setUpSessionState(hiveConf, user, true);
     // Set up the session for driver.
@@ -412,11 +406,47 @@ public class CompactorMR {
     }
   }
 
+  private static boolean isEnoughToCompact(AcidUtils.Directory dir, StorageDescriptor sd) {
+    return isEnoughToCompact(true, dir, sd);
+  }
+
+  private static boolean isEnoughToCompact(boolean isMajorCompaction, AcidUtils.Directory dir, StorageDescriptor sd) {
+    int deltaCount = dir.getCurrentDirectories().size();
+    int origCount = dir.getOriginalFiles().size();
+
+    StringBuilder deltaInfo = new StringBuilder().append(deltaCount);
+    boolean isEnoughToCompact;
+
+    if (isMajorCompaction) {
+      isEnoughToCompact = (origCount > 0
+          || deltaCount + (dir.getBaseDirectory() == null ? 0 : 1) > 1);
+
+    } else {
+      isEnoughToCompact = (deltaCount > 1);
+
+      if (deltaCount == 2) {
+        Map<String, Long> deltaByType = dir.getCurrentDirectories().stream()
+            .collect(Collectors.groupingBy(delta ->
+                    (delta.isDeleteDelta() ? AcidUtils.DELETE_DELTA_PREFIX : AcidUtils.DELTA_PREFIX),
+                Collectors.counting()));
+
+        isEnoughToCompact = (deltaByType.size() != deltaCount);
+        deltaInfo.append(" ").append(deltaByType);
+      }
+    }
+
+    if (!isEnoughToCompact) {
+      LOG.debug("Not compacting {}; current base: {}, delta files: {}, originals: {}",
+          sd.getLocation(), dir.getBaseDirectory(), deltaInfo, origCount);
+    }
+    return isEnoughToCompact;
+  }
+
   private void runMmCompaction(HiveConf conf, Table t, Partition p,
       StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
     LOG.debug("Going to delete directories for aborted transactions for MM table "
         + t.getDbName() + "." + t.getTableName());
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()),
+    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()),
         conf, writeIds, Ref.from(false), false, t.getParameters());
     removeFilesForMmTable(conf, dir);
 
@@ -427,14 +457,10 @@ public class CompactorMR {
       return;
     }
 
-    int deltaCount = dir.getCurrentDirectories().size();
-    int origCount = dir.getOriginalFiles().size();
-    if ((deltaCount + (dir.getBaseDirectory() == null ? 0 : 1)) + origCount <= 1) {
-      LOG.debug("Not compacting " + sd.getLocation() + "; current base is "
-        + dir.getBaseDirectory() + " and there are " + deltaCount + " deltas and "
-        + origCount + " originals");
+    if (!isEnoughToCompact(dir, sd)) {
       return;
     }
+
     try {
       String tmpLocation = generateTmpPath(sd);
       Path baseLocation = new Path(tmpLocation, "_base");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 0734ed9..d4abf42 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -953,7 +953,7 @@ public class TestTxnCommands2 {
   public void testEmptyInTblproperties() throws Exception {
     runStatementOnDriver("create table t1 " + "(a int, b int) stored as orc TBLPROPERTIES ('serialization.null.format'='', 'transactional'='true')");
     runStatementOnDriver("insert into t1 " + "(a,b) values(1,7),(3,7)");
-    runStatementOnDriver("update t1" + " set b = -2 where b = 2");
+    runStatementOnDriver("update t1" + " set b = -2 where a = 1");
     runStatementOnDriver("alter table t1 " + " compact 'MAJOR'");
     runWorker(hiveConf);
     TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index d9e4468..553addb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -351,25 +351,17 @@ public class TestWorker extends CompactorTest {
     Assert.assertEquals(1, compacts.size());
     Assert.assertEquals("ready for cleaning", compacts.get(0).getState());
 
-    // There should still now be 5 directories in the location
+    // There should still be 4 directories in the location
     FileSystem fs = FileSystem.get(conf);
     FileStatus[] stat = fs.listStatus(new Path(t.getSd().getLocation()));
-    Assert.assertEquals(toString(stat),6 , stat.length);
+    Assert.assertEquals(toString(stat), 4, stat.length);
 
     // Find the new delta file and make sure it has the right contents
     Arrays.sort(stat);
     Assert.assertEquals("base_20", stat[0].getPath().getName());
-    /**
-     * this may look a bit odd.  Compactor is capped at min open write id which is 23 in this case
-     * so the minor compaction above only 1 dir as input, delta_21_22 and outputs
-     * delta_21_22_v28 (and matching delete_delta)  (HIVE-9995/HIVE-20901)
-     */
-    Assert.assertEquals(makeDeleteDeltaDirNameCompacted(21, 22) + "_v0000028",
-        stat[1].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22), stat[2].getPath().getName());
-    Assert.assertEquals(makeDeltaDirNameCompacted(21, 22) + "_v0000028", stat[3].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(23, 25), stat[4].getPath().getName());
-    Assert.assertEquals(makeDeltaDirName(26, 27), stat[5].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(21, 22), stat[1].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(23, 25), stat[2].getPath().getName());
+    Assert.assertEquals(makeDeltaDirName(26, 27), stat[3].getPath().getName());
   }
 
   @Test