You are viewing a plain text version of this content. The canonical link for it is here.
Posted to gitbox@hive.apache.org by GitBox <gi...@apache.org> on 2020/12/21 16:46:27 UTC

[GitHub] [hive] klcopp commented on a change in pull request #1779: HIVE-24535: Cleanup AcidUtils.Directory and remove unnecessary filesystem listing

klcopp commented on a change in pull request #1779:
URL: https://github.com/apache/hive/pull/1779#discussion_r546738059



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
##########
@@ -1326,106 +1292,16 @@ private AcidDirInfo callInternal() throws IOException {
         // the originals could still be handled by AcidUtils like a regular non-txn table.
         boolean isRecursive = context.conf.getBoolean(FileInputFormat.INPUT_DIR_RECURSIVE,
             context.conf.getBoolean("mapred.input.dir.recursive", false));
-        List<HdfsFileStatusWithId> originals = new ArrayList<>();
-        List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
-        AcidUtils.findOriginals(fs.get(), dir, originals, useFileIds, true, isRecursive);
-        for (HdfsFileStatusWithId fileId : originals) {
-          baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
-        }
-        return new AcidDirInfo(fs.get(), dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), Sets.newHashSet(), false, true, originals,
-            Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>());
+
+        List<HdfsFileStatusWithId> originals = AcidUtils.findOriginals(fs.get(), dir, useFileIds, true, isRecursive);
+        AcidDirectory directory = new AcidDirectory(dir, fs.get(), useFileIds);
+        directory.getOriginalFiles().addAll(originals);
+        return directory;
       }
       //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
-      AcidUtils.Directory dirInfo  = getAcidState();
-      // find the base files (original or new style)
-      List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
-      if (dirInfo.getBaseDirectory() == null) {
-        // For non-acid tables (or paths), all data files are in getOriginalFiles() list
-        for (HdfsFileStatusWithId fileId : dirInfo.getOriginalFiles()) {
-          baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
-        }
-      } else {
-        List<HdfsFileStatusWithId> compactedBaseFiles = dirInfo.getBaseFiles();
-        if (compactedBaseFiles == null) {
-          compactedBaseFiles = AcidUtils.findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fs);
-        }
-        for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
-          baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ?
-            AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
-        }
-      }
-
-      // Find the parsed deltas- some of them containing only the insert delta events
-      // may get treated as base if split-update is enabled for ACID. (See HIVE-14035 for details)
-      List<ParsedDelta> parsedDeltas = new ArrayList<>();
-      if (context.acidOperationalProperties != null &&
-          context.acidOperationalProperties.isSplitUpdate()) {
-        // If we have split-update turned on for this table, then the delta events have already been
-        // split into two directories- delta_x_y/ and delete_delta_x_y/.
-        // When you have split-update turned on, the insert events go to delta_x_y/ directory and all
-        // the delete events go to delete_x_y/. An update event will generate two events-
-        // a delete event for the old record that is put into delete_delta_x_y/,
-        // followed by an insert event for the updated record put into the usual delta_x_y/.
-        // Therefore, everything inside delta_x_y/ is an insert event and all the files in delta_x_y/
-        // can be treated like base files. Hence, each of these are added to baseOrOriginalFiles list.
-
-        for (ParsedDelta parsedDelta : dirInfo.getCurrentDirectories()) {
-          if (parsedDelta.isDeleteDelta()) {
-            parsedDeltas.add(parsedDelta);
-          } else {
-            AcidUtils.AcidBaseFileType deltaType = parsedDelta.isRawFormat() ?
-              AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA;
-            PathFilter bucketFilter = parsedDelta.isRawFormat() ?
-              AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter;
-            if (parsedDelta.isRawFormat() && parsedDelta.getMinWriteId() != parsedDelta.getMaxWriteId()) {
-              //delta/ with files in raw format are a result of Load Data (as opposed to compaction
-              //or streaming ingest so must have interval length == 1.
-              throw new IllegalStateException("Delta in " + AcidUtils.AcidBaseFileType.ORIGINAL_BASE
-               + " format but txnIds are out of range: " + parsedDelta.getPath());
-            }
-            // This is a normal insert delta, which only has insert events and hence all the files
-            // in this delta directory can be considered as a base.
-            Boolean val = useFileIds.value;
-            if (val == null || val) {
-              try {
-                List<HdfsFileStatusWithId> insertDeltaFiles =
-                    SHIMS.listLocatedHdfsStatus(fs.get(), parsedDelta.getPath(), bucketFilter);
-                for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
-                  baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
-                }
-                if (val == null) {
-                  useFileIds.value = true; // The call succeeded, so presumably the API is there.
-                }
-                continue; // move on to process to the next parsedDelta.
-              } catch (Throwable t) {
-                LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
-                if (val == null && t instanceof UnsupportedOperationException) {
-                  useFileIds.value = false;
-                }
-              }
-            }
-            // Fall back to regular API and create statuses without ID.
-            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs.get(),
-                parsedDelta.getPath(), bucketFilter);
-            for (FileStatus child : children) {
-              HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
-              baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
-            }
-          }
-        }
+      AcidDirectory dirInfo  = getAcidState();

Review comment:
       Nit: Can just return getAcidState()

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
##########
@@ -1316,7 +1282,7 @@ private Directory getAcidState() throws IOException {
     }
 
 
-    private AcidDirInfo callInternal() throws IOException {
+    private AcidDirectory callInternal() throws IOException {
       if (context.acidOperationalProperties != null

Review comment:
       Side note: Shouldn't HIVE_MM_ALLOW_ORIGINALS also be part of this condition?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -1253,19 +1190,12 @@ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSys
       ParsedDelta p = parsedDelta(deltaDir, isRawFormat);
       List<HdfsFileStatusWithId> files = null;
       if (dirSnapshot != null) {
+        final PathFilter filter = isRawFormat ? AcidUtils.originalBucketFilter : AcidUtils.bucketFileFilter;
+        // If we already know the files, store it for future use
         files = dirSnapshot.getFiles().stream()
-            .filter(fileStatus -> bucketFileFilter.accept(fileStatus.getPath()))
+            .filter(fileStatus -> filter.accept(fileStatus.getPath()))
             .map(HdfsFileStatusWithoutId::new)
             .collect(Collectors.toList());
-      } else if (isDeleteDelta) {
-        // For delete deltas we need the files for AcidState
-        try {
-          files = SHIMS.listLocatedHdfsStatus(fs, deltaDir, bucketFileFilter);
-        } catch (UnsupportedOperationException uoe) {
-          files = Arrays.stream(fs.listStatus(deltaDir, bucketFileFilter))
-              .map(HdfsFileStatusWithoutId::new)
-              .collect(Collectors.toList());
-        }

Review comment:
       Why was this block (1260-1268) needed originally?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -3360,14 +3257,13 @@ public static Directory getAcidStateFromCache(Supplier<FileSystem> fileSystem,
 
     // compute and add to cache
     if (recompute || (value == null)) {
-      Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
+      AcidDirectory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
           writeIdList, useFileIds, ignoreEmptyFiles);
       value = new DirInfoValue(writeIdList.writeToString(), dirInfo);
 
       if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null
           && value.dirInfo.getCurrentDirectories().isEmpty()) {
         if (dirCacheDuration > 0) {
-          populateBaseFiles(dirInfo, useFileIds, fileSystem);

Review comment:
       Why was this here?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidDirectory.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hive.common.util.Ref;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileType.ORIGINAL_BASE;
+
+/**
+ * AcidDirectory used to provide ACID directory layout information, which directories and files to read.
+ * This representation only valid in a context of a ValidWriteIdList and ValidTxnList.
+ */
+public final class AcidDirectory {
+
+  private final Path path;
+  private final FileSystem fs;
+  private final Ref<Boolean> useFileId;
+
+  private AcidUtils.ParsedBase base;
+  private AcidUtils.ParsedBaseLight oldestBase;
+
+  private final List<Path> abortedDirectories = new ArrayList<>();
+  private final Set<Long> abortedWriteIds = new HashSet<>();
+  private boolean unCompactedAborts;
+  private final List<HadoopShims.HdfsFileStatusWithId> originalFiles = new ArrayList<>();
+  private final List<Path> originalDirectories = new ArrayList<>();
+  private final List<Path> obsolete = new ArrayList<>();
+  private final List<AcidUtils.ParsedDelta> currentDirectories = new ArrayList<>();
+
+  public AcidDirectory(Path path, FileSystem fs, Ref<Boolean> useFileId) {
+    this.path = path;
+    this.fs = fs;
+    this.useFileId = useFileId;
+    if (!(this.fs instanceof DistributedFileSystem) && this.useFileId != null) {
+      this.useFileId.value = false;
+    }
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  /**
+   * Get the base directory path.
+   * @return the base directory to read
+   */
+  public Path getBaseDirectory() {
+    return base == null ? null : base.getBaseDirPath();
+  }
+
+  /**
+   * Get the base directory.
+   * @return the base directory to read
+   */
+  public AcidUtils.ParsedBase getBase() {
+    return base;
+  }
+
+  /**
+   * Oldest base directory in the filesystem, may be shadowed by newer base
+   */
+  public AcidUtils.ParsedBaseLight getOldestBase() {
+    return oldestBase;
+  }
+
+  public void setBase(AcidUtils.ParsedBase base) {
+    this.base = base;
+  }
+
+  public void setOldestBase(AcidUtils.ParsedBaseLight oldestBase) {
+    this.oldestBase = oldestBase;
+  }
+
+  public void setUnCompactedAborts(boolean unCompactedAborts) {
+    this.unCompactedAborts = unCompactedAborts;
+  }
+
+  /**
+   * Is Base directory in raw format or in Acid format
+   */
+  public boolean isBaseInRawFormat() {
+    return base != null && base.isRawFormat();
+  }
+
+  /**
+   * Get the list of original files.  Not {@code null}.  Must be sorted.
+   * @return the list of original files (eg. 000000_0)
+   */
+  public List<HadoopShims.HdfsFileStatusWithId> getOriginalFiles() {
+    return originalFiles;
+  }
+
+  /**
+   * List of original directories containing files in not ACID format
+   */
+  public List<Path> getOriginalDirectories() {
+    return originalDirectories;
+  }
+
+  /**
+   * Get the list of delta directories that are valid and not
+   * obsolete.  Not {@code null}.  List must be sorted in a specific way.
+   * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)}
+   * for details.
+   * @return the minimal list of current directories
+   */
+  public List<AcidUtils.ParsedDelta> getCurrentDirectories() {
+    return currentDirectories;
+  }
+
+  /**
+   * Get the list of obsolete directories. After filtering out bases and
+   * deltas that are not selected by the valid transaction/write ids list, return the
+   * list of original files, bases, and deltas that have been replaced by
+   * more up to date ones.  Not {@code null}.
+   */
+  public List<Path> getObsolete() {
+    return obsolete;
+  }
+
+  /**
+   * Get the list of directories that has nothing but aborted transactions.
+   * @return the list of aborted directories
+   */
+  public List<Path> getAbortedDirectories() {
+    return abortedDirectories;
+  }
+
+  /**
+   * Get the list of writeIds that belong to the aborted transactions.
+   * @return the list of aborted writeIds
+   */
+  public Set<Long> getAbortedWriteIds() {
+    return abortedWriteIds;
+  }
+
+  /**
+   * Does the directory contain writeIds that belong to aborted transactions,
+   * but are mixed together with committed writes. These aborted writes can not be cleaned.
+   * @return true if there are aborted writes that can can be cleaned
+   */
+  public boolean hasUncompactedAborts() {
+    return unCompactedAborts;
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  /**
+   * Delete deltas that should be read by this reader.
+   */
+  public List<AcidUtils.ParsedDelta> getDeleteDeltas() {
+    return currentDirectories.stream().filter(AcidUtils.ParsedDeltaLight::isDeleteDelta).collect(Collectors.toList());
+  }
+
+  /**
+   * All original, base and delta bucket files that should be read by this reader
+   * @throws IOException ex
+   */
+  public List<AcidUtils.AcidBaseFileInfo> getBaseAndDeltaFiles() throws IOException {

Review comment:
       It looks like this method is only used in the context of: acidDir.getBaseAndDeltaFiles().isEmpty()
   What if this method returned a boolean (something like isEmpty) instead?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -1490,49 +1432,31 @@ else if (prev != null && next.maxWriteId == prev.maxWriteId
         prev = next;
       }
       else {
-        obsolete.add(next.path);
+        directory.getObsolete().add(next.path);
       }
     }
+    directory.getCurrentDirectories().clear();
+    directory.getCurrentDirectories().addAll(deltas);
+  }
 
-    if(bestBase.oldestBase != null && bestBase.basePath == null &&
-        isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs, dirSnapshots)) {
+  private static ValidTxnList getValidTxnList(Configuration conf) {
+    ValidTxnList validTxnList = null;
+    String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
+    if(!Strings.isNullOrEmpty(s)) {
       /*
-       * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
-       * {@link writeIdList}.  Note that 'original' files are logically a base_Long.MIN_VALUE and thus
-       * cannot have any data for an open txn.  We could check {@link deltas} has files to cover
-       * [1,n] w/o gaps but this would almost never happen...
+       * getAcidState() is sometimes called on non-transactional tables, e.g.
+       * OrcInputFileFormat.FileGenerator.callInternal().  e.g. orc_merge3.q In that case
+       * writeIdList is bogus - doesn't even have a table name.
+       * see https://issues.apache.org/jira/browse/HIVE-20856.
        *
-       * We only throw for base_x produced by Compactor since that base erases all history and
-       * cannot be used for a client that has a snapshot in which something inside this base is
-       * open.  (Nor can we ignore this base of course)  But base_x which is a result of IOW,
-       * contains all history so we treat it just like delta wrt visibility.  Imagine, IOW which
-       * aborts. It creates a base_x, which can and should just be ignored.*/
-      long[] exceptions = writeIdList.getInvalidWriteIds();
-      String minOpenWriteId = exceptions != null && exceptions.length > 0 ?
-        Long.toString(exceptions[0]) : "x";
-      throw new IOException(ErrorMsg.ACID_NOT_ENOUGH_HISTORY.format(
-        Long.toString(writeIdList.getHighWatermark()),
-              minOpenWriteId, bestBase.oldestBase.toString()));
-    }
-
-    Path base = null;
-    boolean isBaseInRawFormat = false;
-    if (bestBase.basePath != null) {
-      base = bestBase.basePath;
-      isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, dirSnapshots != null ? dirSnapshots.get(base) : null);
+       * For now, assert that ValidTxnList.VALID_TXNS_KEY is set only if this is really a read
+       * of a transactional table.
+       * see {@link #getChildState(FileStatus, HdfsFileStatusWithId, ValidWriteIdList, List, List, List, List, TxnBase, boolean, List, Map, FileSystem, ValidTxnList)}

Review comment:
       getChildState parameters have been changed

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidDirectory.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hive.common.util.Ref;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileType.ORIGINAL_BASE;
+
+/**
+ * AcidDirectory used to provide ACID directory layout information, which directories and files to read.
+ * This representation only valid in a context of a ValidWriteIdList and ValidTxnList.
+ */
+public final class AcidDirectory {
+
+  private final Path path;
+  private final FileSystem fs;
+  private final Ref<Boolean> useFileId;
+
+  private AcidUtils.ParsedBase base;
+  private AcidUtils.ParsedBaseLight oldestBase;
+
+  private final List<Path> abortedDirectories = new ArrayList<>();
+  private final Set<Long> abortedWriteIds = new HashSet<>();
+  private boolean unCompactedAborts;
+  private final List<HadoopShims.HdfsFileStatusWithId> originalFiles = new ArrayList<>();
+  private final List<Path> originalDirectories = new ArrayList<>();
+  private final List<Path> obsolete = new ArrayList<>();
+  private final List<AcidUtils.ParsedDelta> currentDirectories = new ArrayList<>();
+
+  public AcidDirectory(Path path, FileSystem fs, Ref<Boolean> useFileId) {
+    this.path = path;
+    this.fs = fs;
+    this.useFileId = useFileId;
+    if (!(this.fs instanceof DistributedFileSystem) && this.useFileId != null) {
+      this.useFileId.value = false;
+    }
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  /**
+   * Get the base directory path.
+   * @return the base directory to read
+   */
+  public Path getBaseDirectory() {
+    return base == null ? null : base.getBaseDirPath();
+  }
+
+  /**
+   * Get the base directory.
+   * @return the base directory to read
+   */
+  public AcidUtils.ParsedBase getBase() {
+    return base;
+  }
+
+  /**
+   * Oldest base directory in the filesystem, may be shadowed by newer base
+   */
+  public AcidUtils.ParsedBaseLight getOldestBase() {
+    return oldestBase;
+  }
+
+  public void setBase(AcidUtils.ParsedBase base) {
+    this.base = base;
+  }
+
+  public void setOldestBase(AcidUtils.ParsedBaseLight oldestBase) {
+    this.oldestBase = oldestBase;
+  }
+
+  public void setUnCompactedAborts(boolean unCompactedAborts) {
+    this.unCompactedAborts = unCompactedAborts;
+  }
+
+  /**
+   * Is Base directory in raw format or in Acid format
+   */
+  public boolean isBaseInRawFormat() {
+    return base != null && base.isRawFormat();
+  }
+
+  /**
+   * Get the list of original files.  Not {@code null}.  Must be sorted.
+   * @return the list of original files (eg. 000000_0)
+   */
+  public List<HadoopShims.HdfsFileStatusWithId> getOriginalFiles() {
+    return originalFiles;
+  }
+
+  /**
+   * List of original directories containing files in not ACID format
+   */
+  public List<Path> getOriginalDirectories() {
+    return originalDirectories;
+  }
+
+  /**
+   * Get the list of delta directories that are valid and not
+   * obsolete.  Not {@code null}.  List must be sorted in a specific way.
+   * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)}
+   * for details.
+   * @return the minimal list of current directories
+   */
+  public List<AcidUtils.ParsedDelta> getCurrentDirectories() {
+    return currentDirectories;
+  }
+
+  /**
+   * Get the list of obsolete directories. After filtering out bases and
+   * deltas that are not selected by the valid transaction/write ids list, return the
+   * list of original files, bases, and deltas that have been replaced by
+   * more up to date ones.  Not {@code null}.
+   */
+  public List<Path> getObsolete() {
+    return obsolete;
+  }
+
+  /**
+   * Get the list of directories that has nothing but aborted transactions.
+   * @return the list of aborted directories
+   */
+  public List<Path> getAbortedDirectories() {
+    return abortedDirectories;
+  }
+
+  /**
+   * Get the list of writeIds that belong to the aborted transactions.
+   * @return the list of aborted writeIds
+   */
+  public Set<Long> getAbortedWriteIds() {
+    return abortedWriteIds;
+  }
+
+  /**
+   * Does the directory contain writeIds that belong to aborted transactions,
+   * but are mixed together with committed writes. These aborted writes can not be cleaned.
+   * @return true if there are aborted writes that can can be cleaned
+   */
+  public boolean hasUncompactedAborts() {
+    return unCompactedAborts;
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  /**
+   * Delete deltas that should be read by this reader.
+   */
+  public List<AcidUtils.ParsedDelta> getDeleteDeltas() {
+    return currentDirectories.stream().filter(AcidUtils.ParsedDeltaLight::isDeleteDelta).collect(Collectors.toList());
+  }
+
+  /**
+   * All original, base and delta bucket files that should be read by this reader
+   * @throws IOException ex
+   */
+  public List<AcidUtils.AcidBaseFileInfo> getBaseAndDeltaFiles() throws IOException {
+    List<AcidUtils.AcidBaseFileInfo> baseAndDeltaFiles = new ArrayList<>();
+    if (base == null) {
+      // For non-acid tables (or paths), all data files are in getOriginalFiles() list
+      for (HadoopShims.HdfsFileStatusWithId fileId : originalFiles) {
+        baseAndDeltaFiles.add(new AcidUtils.AcidBaseFileInfo(fileId, ORIGINAL_BASE));
+      }
+    } else {
+      // The base files are either listed in ParsedBase or will be populated now
+      for (HadoopShims.HdfsFileStatusWithId fileId : base.getFiles(fs, useFileId)) {
+        baseAndDeltaFiles.add(new AcidUtils.AcidBaseFileInfo(fileId,
+            isBaseInRawFormat() ? ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
+      }
+    }
+    for (AcidUtils.ParsedDelta parsedDelta : currentDirectories) {
+      if (parsedDelta.isDeleteDelta()) {
+        continue;
+      }
+      if (parsedDelta.isRawFormat() && parsedDelta.getMinWriteId() != parsedDelta.getMaxWriteId()) {
+        // delta/ with files in raw format are a result of Load Data (as opposed to compaction
+        // or streaming ingest so must have interval length == 1.
+        throw new IllegalStateException(
+            "Delta in " + ORIGINAL_BASE + " format but txnIds are out of range: " + parsedDelta.getPath());
+      }
+
+      AcidUtils.AcidBaseFileType deltaType =
+          parsedDelta.isRawFormat() ? ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA;
+      // The bucket files are either listed in ParsedDelta or will be populated now
+      for (HadoopShims.HdfsFileStatusWithId deltaFile : parsedDelta.getFiles(fs, useFileId)) {
+        baseAndDeltaFiles.add(new AcidUtils.AcidBaseFileInfo(deltaFile, deltaType));
+      }
+    }
+
+    return baseAndDeltaFiles;
+  }
+
+  @Override
+  public String toString() {
+    return "Aborted Directories: " + abortedDirectories + "; original: " + originalFiles + "; obsolete: " + obsolete
+        + "; currentDirectories: " + currentDirectories + "; base: " + base;

Review comment:
       consider including isBaseInRawFormat as before; maybe also abortedWriteIds.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
##########
@@ -994,18 +857,76 @@ public long getVisibilityTxnId() {
     public Path getBaseDirPath() {
       return baseDirPath;
     }
-    public static ParsedBase parseBase(Path path) {
+
+
+
+    public static ParsedBaseLight parseBase(Path path) {
       String filename = path.getName();
       if(!filename.startsWith(BASE_PREFIX)) {
         throw new IllegalArgumentException(filename + " does not start with " + BASE_PREFIX);
       }
       int idxOfv = filename.indexOf(VISIBILITY_PREFIX);
       if(idxOfv < 0) {
-        return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length())), path);
+        return new ParsedBaseLight(Long.parseLong(filename.substring(BASE_PREFIX.length())), path);
       }
-      return new ParsedBase(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)),
+      return new ParsedBaseLight(Long.parseLong(filename.substring(BASE_PREFIX.length(), idxOfv)),
           Long.parseLong(filename.substring(idxOfv + VISIBILITY_PREFIX.length())), path);
     }
+
+    @Override
+    public String toString() {
+      return "Path: " + baseDirPath + "; writeId: "
+          + writeId + "; visibilityTxnId: " + visibilityTxnId;
+    }
+  }
+  /**
+   * In addition to {@link ParsedBaseLight} this knows if the data is in raw format, i.e. doesn't
+   * have acid metadata columns embedded in the files.  To determine this in some cases
+   * requires looking at the footer of the data file which can be expensive so if this info is
+   * not needed {@link ParsedBaseLight} should be used.
+   */
+  public static final class ParsedBase extends ParsedBaseLight {

Review comment:
       Seems like this might kind of duplicate AcidBaseFileInfo? If so we can get rid of AcidBaseFileInfo?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/io/AcidDirectory.java
##########
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hive.common.util.Ref;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hive.ql.io.AcidUtils.AcidBaseFileType.ORIGINAL_BASE;
+
+/**
+ * AcidDirectory used to provide ACID directory layout information, which directories and files to read.
+ * This representation only valid in a context of a ValidWriteIdList and ValidTxnList.
+ */
+public final class AcidDirectory {
+
+  private final Path path;
+  private final FileSystem fs;
+  private final Ref<Boolean> useFileId;
+
+  private AcidUtils.ParsedBase base;
+  private AcidUtils.ParsedBaseLight oldestBase;
+
+  private final List<Path> abortedDirectories = new ArrayList<>();
+  private final Set<Long> abortedWriteIds = new HashSet<>();
+  private boolean unCompactedAborts;
+  private final List<HadoopShims.HdfsFileStatusWithId> originalFiles = new ArrayList<>();
+  private final List<Path> originalDirectories = new ArrayList<>();
+  private final List<Path> obsolete = new ArrayList<>();
+  private final List<AcidUtils.ParsedDelta> currentDirectories = new ArrayList<>();
+
+  public AcidDirectory(Path path, FileSystem fs, Ref<Boolean> useFileId) {
+    this.path = path;
+    this.fs = fs;
+    this.useFileId = useFileId;
+    if (!(this.fs instanceof DistributedFileSystem) && this.useFileId != null) {
+      this.useFileId.value = false;
+    }
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  /**
+   * Get the base directory path.
+   * @return the base directory to read
+   */
+  public Path getBaseDirectory() {
+    return base == null ? null : base.getBaseDirPath();
+  }
+
+  /**
+   * Get the base directory.
+   * @return the base directory to read
+   */
+  public AcidUtils.ParsedBase getBase() {
+    return base;
+  }
+
+  /**
+   * Oldest base directory in the filesystem, may be shadowed by newer base
+   */
+  public AcidUtils.ParsedBaseLight getOldestBase() {
+    return oldestBase;
+  }
+
+  public void setBase(AcidUtils.ParsedBase base) {
+    this.base = base;
+  }
+
+  public void setOldestBase(AcidUtils.ParsedBaseLight oldestBase) {
+    this.oldestBase = oldestBase;
+  }
+
+  public void setUnCompactedAborts(boolean unCompactedAborts) {
+    this.unCompactedAborts = unCompactedAborts;
+  }
+
+  /**
+   * Is Base directory in raw format or in Acid format
+   */
+  public boolean isBaseInRawFormat() {
+    return base != null && base.isRawFormat();
+  }
+
+  /**
+   * Get the list of original files.  Not {@code null}.  Must be sorted.
+   * @return the list of original files (eg. 000000_0)
+   */
+  public List<HadoopShims.HdfsFileStatusWithId> getOriginalFiles() {
+    return originalFiles;
+  }
+
+  /**
+   * List of original directories containing files in not ACID format
+   */
+  public List<Path> getOriginalDirectories() {
+    return originalDirectories;
+  }
+
+  /**
+   * Get the list of delta directories that are valid and not
+   * obsolete.  Not {@code null}.  List must be sorted in a specific way.
+   * See {@link org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight#compareTo(org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight)}
+   * for details.
+   * @return the minimal list of current directories
+   */
+  public List<AcidUtils.ParsedDelta> getCurrentDirectories() {
+    return currentDirectories;
+  }
+
+  /**
+   * Get the list of obsolete directories. After filtering out bases and
+   * deltas that are not selected by the valid transaction/write ids list, return the
+   * list of original files, bases, and deltas that have been replaced by
+   * more up to date ones.  Not {@code null}.
+   */
+  public List<Path> getObsolete() {
+    return obsolete;
+  }
+
+  /**
+   * Get the list of directories that has nothing but aborted transactions.
+   * @return the list of aborted directories
+   */
+  public List<Path> getAbortedDirectories() {
+    return abortedDirectories;
+  }
+
+  /**
+   * Get the list of writeIds that belong to the aborted transactions.
+   * @return the list of aborted writeIds
+   */
+  public Set<Long> getAbortedWriteIds() {
+    return abortedWriteIds;
+  }
+
+  /**
+   * Does the directory contain writeIds that belong to aborted transactions,
+   * but are mixed together with committed writes. These aborted writes can not be cleaned.
+   * @return true if there are aborted writes that can can be cleaned
+   */
+  public boolean hasUncompactedAborts() {
+    return unCompactedAborts;
+  }
+
+  public FileSystem getFs() {
+    return fs;
+  }
+
+  /**
+   * Delete deltas that should be read by this reader.
+   */
+  public List<AcidUtils.ParsedDelta> getDeleteDeltas() {
+    return currentDirectories.stream().filter(AcidUtils.ParsedDeltaLight::isDeleteDelta).collect(Collectors.toList());
+  }
+
+  /**
+   * All original, base and delta bucket files that should be read by this reader
+   * @throws IOException ex
+   */
+  public List<AcidUtils.AcidBaseFileInfo> getBaseAndDeltaFiles() throws IOException {
+    List<AcidUtils.AcidBaseFileInfo> baseAndDeltaFiles = new ArrayList<>();
+    if (base == null) {

Review comment:
       This is kind of weird? What if this is an acid table that has never gone through major compaction?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: gitbox-unsubscribe@hive.apache.org
For additional commands, e-mail: gitbox-help@hive.apache.org