You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2020/06/15 09:07:43 UTC

[hive] branch master updated: HIVE-23495: AcidUtils.getAcidState cleanup (Peter Varga, reviewed by Karen Coppage and Marta Kuczora)

This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 50f8765  HIVE-23495: AcidUtils.getAcidState cleanup (Peter Varga, reviewed by Karen Coppage and Marta Kuczora)
50f8765 is described below

commit 50f8765658f2733e43c232c9506770b62a980be2
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Mon Jun 15 11:07:31 2020 +0200

    HIVE-23495: AcidUtils.getAcidState cleanup (Peter Varga, reviewed by Karen Coppage and Marta Kuczora)
    
    Closes #1097
---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    | 421 +++++++++------------
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  |   2 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java      |   9 +-
 .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java  |   4 +-
 .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java |   2 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |   2 +-
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |   2 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   6 +-
 .../hive/ql/txn/compactor/MinorQueryCompactor.java |   3 +-
 .../ql/txn/compactor/MmMajorQueryCompactor.java    |   3 +-
 .../ql/txn/compactor/MmMinorQueryCompactor.java    |   6 +-
 .../hadoop/hive/ql/txn/compactor/Worker.java       |   2 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |   6 +-
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java    |  49 +--
 .../hive/ql/io/orc/TestInputOutputFormat.java      |   4 +-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java     |   5 +-
 .../org/apache/hive/streaming/TestStreaming.java   |  12 +-
 17 files changed, 231 insertions(+), 307 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4ee8b9d..cd9238a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -29,6 +29,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -1243,11 +1244,10 @@ public class AcidUtils {
 
   /** State class for getChildState; cannot modify 2 things in a method. */
   private static class TxnBase {
-    private FileStatus status;
+    private Path basePath;
     private long writeId = 0;
     private long oldestBaseWriteId = Long.MAX_VALUE;
     private Path oldestBase = null;
-    private HdfsDirSnapshot dirSnapShot;
   }
 
   /**
@@ -1255,20 +1255,22 @@ public class AcidUtils {
    * base and diff directories. Note that because major compactions don't
    * preserve the history, we can't use a base directory that includes a
    * write id that we must exclude.
-   * @param directory the partition directory to analyze
+   * @param fileSystem optional, it it is not provided, it will be derived from the candidateDirectory
+   * @param candidateDirectory the partition directory to analyze
    * @param conf the configuration
    * @param writeIdList the list of write ids that we are reading
+   * @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds
+   * @param ignoreEmptyFiles Ignore files with 0 length
    * @return the state of the directory
-   * @throws IOException
+   * @throws IOException on filesystem errors
    */
   @VisibleForTesting
   public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
-      ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
-      Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
+      ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
     ValidTxnList validTxnList = null;
     String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
     if(!Strings.isNullOrEmpty(s)) {
-      /**
+      /*
        * getAcidState() is sometimes called on non-transactional tables, e.g.
        * OrcInputFileFormat.FileGenerator.callInternal().  e.g. orc_merge3.q In that case
        * writeIdList is bogus - doesn't even have a table name.
@@ -1284,36 +1286,29 @@ public class AcidUtils {
 
     FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem;
     // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
-    final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
-    List<ParsedDelta> working = new ArrayList<ParsedDelta>();
+    final List<ParsedDelta> deltas = new ArrayList<>();
+    List<ParsedDelta> working = new ArrayList<>();
     List<Path> originalDirectories = new ArrayList<>();
     final List<Path> obsolete = new ArrayList<>();
     final List<Path> abortedDirectories = new ArrayList<>();
-    List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory);
-
     TxnBase bestBase = new TxnBase();
     final List<HdfsFileStatusWithId> original = new ArrayList<>();
     Map<Path, HdfsDirSnapshot> dirSnapshots = null;
+
+    List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory);
+
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
-        getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete,
-            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+        getChildState(child, writeIdList, working, originalDirectories, original, obsolete,
+            bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList);
       }
     } else {
-      if (generateDirSnapshots) {
-        dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory);
-        getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete,
-            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
-      } else {
-        List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, candidateDirectory, hiddenFileFilter);
-        for (FileStatus child : children) {
-          getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase,
-              ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
-        }
-      }
+      dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory);
+      getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete,
+          bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList);
     }
     // If we have a base, the original files are obsolete.
-    if (bestBase.status != null) {
+    if (bestBase.basePath != null) {
       // Add original files to obsolete list if any
       for (HdfsFileStatusWithId fswid : original) {
         obsolete.add(fswid.getFileStatus().getPath());
@@ -1382,9 +1377,9 @@ public class AcidUtils {
       }
     }
 
-    if(bestBase.oldestBase != null && bestBase.status == null &&
+    if(bestBase.oldestBase != null && bestBase.basePath == null &&
         isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs, dirSnapshots)) {
-      /**
+      /*
        * If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
        * {@link writeIdList}.  Note that 'original' files are logically a base_Long.MIN_VALUE and thus
        * cannot have any data for an open txn.  We could check {@link deltas} has files to cover
@@ -1405,78 +1400,80 @@ public class AcidUtils {
 
     Path base = null;
     boolean isBaseInRawFormat = false;
-    if (bestBase.status != null) {
-      base = bestBase.status.getPath();
+    if (bestBase.basePath != null) {
+      base = bestBase.basePath;
       isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, dirSnapshots != null ? dirSnapshots.get(base) : null);
-      if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) {
-        for (FileStatus stat : bestBase.dirSnapShot.getFiles()) {
-          if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
-            original.add(createOriginalObj(null, stat));
-          }
-        }
-      }
     }
     LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " +
         deltas.size());
-    /**
+    /*
      * If this sort order is changed and there are tables that have been converted to transactional
      * and have had any update/delete/merge operations performed but not yet MAJOR compacted, it
      * may result in data loss since it may change how
      * {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns
      * {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen).
      */
-    Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> {
-      //this does "Path.uri.compareTo(that.uri)"
-      return o1.getFileStatus().compareTo(o2.getFileStatus());
-    });
-    return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original,
-        obsolete, deltas, base);
-  }
-  
-  public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshots(final FileSystem fs,
-      final Path path) throws IOException {
-    try {
-      Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<Path, HdfsDirSnapshot>();
-      RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
-      while (itr.hasNext()) {
-        FileStatus fStatus = itr.next();
-        Path fPath = fStatus.getPath();
-        if (acidHiddenFileFilter.accept(fPath)) {
-          if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
-            HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+    // this does "Path.uri.compareTo(that.uri)"
+    original.sort(Comparator.comparing(HdfsFileStatusWithId::getFileStatus));
+    return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base);
+  }
+
+  public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshots(final FileSystem fs, final Path path)
+      throws IOException {
+    Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<>();
+    RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
+    while (itr.hasNext()) {
+      FileStatus fStatus = itr.next();
+      Path fPath = fStatus.getPath();
+      if (acidHiddenFileFilter.accept(fPath)) {
+        if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
+          HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+          if (dirSnapshot == null) {
+            dirSnapshot = new HdfsDirSnapshotImpl(fPath);
+            dirToSnapshots.put(fPath, dirSnapshot);
+          }
+        } else {
+          Path parentDirPath = fPath.getParent();
+          if (acidTempDirFilter.accept(parentDirPath)) {
+            while (isChildOfDelta(parentDirPath, path)) {
+              // Some cases there are other directory layers between the delta and the datafiles
+              // (export-import mm table, insert with union all to mm table, skewed tables).
+              // But it does not matter for the AcidState, we just need the deltas and the data files
+              // So build the snapshot with the files inside the delta directory
+              parentDirPath = parentDirPath.getParent();
+            }
+            HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
             if (dirSnapshot == null) {
-              dirSnapshot = new HdfsDirSnapshotImpl(fPath, fStatus);
-              dirToSnapshots.put(fPath, dirSnapshot);
+              dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath);
+              dirToSnapshots.put(parentDirPath, dirSnapshot);
             }
-          } else {
-            Path parentDirPath = fPath.getParent();
-            if (acidTempDirFilter.accept(parentDirPath)) {
-              HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
-              if (dirSnapshot == null) {
-                FileStatus parentDirFStatus = null;
-                if (!parentDirPath.equals(path)) {
-                  parentDirFStatus = fs.getFileStatus(parentDirPath);
-                }
-                dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus);
-                dirToSnapshots.put(parentDirPath, dirSnapshot);
-              }
-              // We're not filtering out the metadata file and acid format file, as they represent parts of a valid snapshot
-              // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task
-              if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) {
-                dirSnapshot.addMetadataFile(fStatus);
-              } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) {
-                dirSnapshot.addOrcAcidFormatFile(fStatus);
-              } else {
-                dirSnapshot.addFile(fStatus);
-              }
+            // We're not filtering out the metadata file and acid format file,
+            // as they represent parts of a valid snapshot
+            // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task
+            if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) {
+              dirSnapshot.addMetadataFile(fStatus);
+            } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) {
+              dirSnapshot.addOrcAcidFormatFile(fStatus);
+            } else {
+              dirSnapshot.addFile(fStatus);
             }
           }
         }
       }
-      return dirToSnapshots;
-    } catch (IOException e) {
-      throw new IOException(e);
     }
+    return dirToSnapshots;
+  }
+
+  private static boolean isChildOfDelta(Path childDir, Path rootPath) {
+    if (childDir.toUri().toString().length() <= rootPath.toUri().toString().length()) {
+      return false;
+    }
+    // We do not want to look outside the original directory
+    String fullName = childDir.toUri().toString().substring(rootPath.toUri().toString().length() + 1);
+    String dirName = childDir.getName();
+    return (fullName.startsWith(BASE_PREFIX) && !dirName.startsWith(BASE_PREFIX)) ||
+        (fullName.startsWith(DELTA_PREFIX) && !dirName.startsWith(DELTA_PREFIX)) ||
+        (fullName.startsWith(DELETE_DELTA_PREFIX) && !dirName.startsWith(DELETE_DELTA_PREFIX));
   }
   
   /**
@@ -1485,7 +1482,7 @@ public class AcidUtils {
    * with additional properties about the dir (like isBase etc)
    *
    */
-  public static interface HdfsDirSnapshot {
+  public interface HdfsDirSnapshot {
     public Path getPath();
 
     public void addOrcAcidFormatFile(FileStatus fStatus);
@@ -1496,14 +1493,9 @@ public class AcidUtils {
 
     public FileStatus getMetadataFile(FileStatus fStatus);
 
-    // FileStatus of this HDFS directory
-    public FileStatus getFileStatus();
-
     // Get the list of files if any within this directory
     public List<FileStatus> getFiles();
 
-    public void setFileStatus(FileStatus fStatus);
-
     public void addFile(FileStatus file);
 
     // File id or null
@@ -1530,7 +1522,6 @@ public class AcidUtils {
   
   public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot {
     private Path dirPath;
-    private FileStatus fStatus;
     private FileStatus metadataFStatus = null;
     private FileStatus orcAcidFormatFStatus = null;
     private List<FileStatus> files = new ArrayList<FileStatus>();
@@ -1540,31 +1531,19 @@ public class AcidUtils {
     private Boolean isValidBase = null;
     private Boolean isCompactedBase = null;
 
-    public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List<FileStatus> files) {
+    public HdfsDirSnapshotImpl(Path path, List<FileStatus> files) {
       this.dirPath = path;
-      this.fStatus = fStatus;
       this.files = files;
     }
     
-    public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) {
+    public HdfsDirSnapshotImpl(Path path) {
       this.dirPath = path;
-      this.fStatus = fStatus;
     }
     
     @Override
     public Path getPath() {
       return dirPath;
     }
-    
-    @Override
-    public FileStatus getFileStatus() {
-      return fStatus;
-    }
-    
-    @Override
-    public void setFileStatus(FileStatus fStatus) {
-      this.fStatus = fStatus;
-    }
 
     @Override
     public List<FileStatus> getFiles() {
@@ -1676,35 +1655,28 @@ public class AcidUtils {
    * causes anything written previously to be ignored (hence the overwrite).  In this case, base_x
    * is visible if writeid:x is committed for current reader.
    */
-  private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList,
-            FileSystem fs) throws IOException {
-    if(parsedBase.getWriteId() == Long.MIN_VALUE) {
-      //such base is created by 1st compaction in case of non-acid to acid table conversion
-      //By definition there are no open txns with id < 1.
-      return true;
-    }
-    if (writeIdList.getMinOpenWriteId() != null && parsedBase.getWriteId() <= writeIdList.getMinOpenWriteId()) {
-      return true;
-    }
-    if(isCompactedBase(parsedBase, fs, (HdfsDirSnapshot) null)) {
-      return writeIdList.isValidBase(parsedBase.getWriteId());
-    }
-    //if here, it's a result of IOW
-    return writeIdList.isWriteIdValid(parsedBase.getWriteId());
-  }
-  
-  private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parsedBase, ValidWriteIdList writeIdList,
-      FileSystem fs) throws IOException {
+  private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList, FileSystem fs,
+      HdfsDirSnapshot dirSnapshot) throws IOException {
     boolean isValidBase;
-    if (dirSnapshot.isValidBase() != null) {
+    if (dirSnapshot != null && dirSnapshot.isValidBase() != null) {
       isValidBase = dirSnapshot.isValidBase();
     } else {
-      if (isCompactedBase(parsedBase, fs, dirSnapshot)) {
+      if (parsedBase.getWriteId() == Long.MIN_VALUE) {
+        //such base is created by 1st compaction in case of non-acid to acid table conversion
+        //By definition there are no open txns with id < 1.
+        isValidBase = true;
+      } else if (writeIdList.getMinOpenWriteId() != null && parsedBase.getWriteId() <= writeIdList
+          .getMinOpenWriteId()) {
+        isValidBase = true;
+      } else if (isCompactedBase(parsedBase, fs, dirSnapshot)) {
         isValidBase = writeIdList.isValidBase(parsedBase.getWriteId());
       } else {
+        // if here, it's a result of IOW
         isValidBase = writeIdList.isWriteIdValid(parsedBase.getWriteId());
       }
-      dirSnapshot.setIsValidBase(isValidBase);
+      if (dirSnapshot != null) {
+        dirSnapshot.setIsValidBase(isValidBase);
+      }
     }
     return isValidBase;
   }
@@ -1724,78 +1696,39 @@ public class AcidUtils {
       HdfsDirSnapshot snapshot) throws IOException {
     return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs, snapshot);
   }
-  
-  private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
-      ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
-      List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase,
-      boolean ignoreEmptyFiles, List<Path> aborted, Map<String, String> tblproperties,
-      FileSystem fs, ValidTxnList validTxnList) throws IOException {
-    Path p = child.getPath();
-    String fn = p.getName();
-    if (!child.isDirectory()) {
-      if (!ignoreEmptyFiles || child.getLen() != 0) {
-        original.add(createOriginalObj(childWithId, child));
-      }
-      return;
-    }
-    if (fn.startsWith(BASE_PREFIX)) {
-      ParsedBase parsedBase = ParsedBase.parseBase(p);
-      if(!isDirUsable(child.getPath(), parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
-        return;
-      }
-      final long writeId = parsedBase.getWriteId();
-      if(bestBase.oldestBaseWriteId > writeId) {
-        //keep track for error reporting
-        bestBase.oldestBase = p;
-        bestBase.oldestBaseWriteId = writeId;
-      }
-      if (bestBase.status == null) {
-        if(isValidBase(parsedBase, writeIdList, fs)) {
-          bestBase.status = child;
-          bestBase.writeId = writeId;
-        }
-      } else if (bestBase.writeId < writeId) {
-        if(isValidBase(parsedBase, writeIdList, fs)) {
-          obsolete.add(bestBase.status.getPath());
-          bestBase.status = child;
-          bestBase.writeId = writeId;
-        }
-      } else {
-        obsolete.add(child.getPath());
+
+  private static void getChildState(HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList,
+      List<ParsedDelta> working, List<Path> originalDirectories, List<HdfsFileStatusWithId> original,
+      List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<Path> aborted, FileSystem fs,
+      ValidTxnList validTxnList) throws IOException {
+    Path childPath = childWithId.getFileStatus().getPath();
+    String fn = childPath.getName();
+    if (!childWithId.getFileStatus().isDirectory()) {
+      if (!ignoreEmptyFiles || childWithId.getFileStatus().getLen() != 0) {
+        original.add(childWithId);
       }
+    } else if (fn.startsWith(BASE_PREFIX)) {
+      processBaseDir(childPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, null);
     } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) {
-      String deltaPrefix = fn.startsWith(DELTA_PREFIX)  ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
-      ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null);
-      if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) {
-        return;
-      }
-      if(ValidWriteIdList.RangeResponse.ALL ==
-          writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
-        aborted.add(child.getPath());
-      }
-      else if (writeIdList.isWriteIdRangeValid(
-          delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) {
-        working.add(delta);
-      }
+      processDeltaDir(childPath, writeIdList, working, aborted, fs, validTxnList, null);
     } else {
       // This is just the directory.  We need to recurse and find the actual files.  But don't
       // do this until we have determined there is no base.  This saves time.  Plus,
       // it is possible that the cleaner is running and removing these original files,
       // in which case recursing through them could cause us to get an error.
-      originalDirectories.add(child.getPath());
+      originalDirectories.add(childPath);
     }
   }
-  
+
   private static void getChildState(Path candidateDirectory, Map<Path, HdfsDirSnapshot> dirSnapshots,
       ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
-      List<HdfsFileStatusWithId> original,
-      List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<Path> aborted,
-      Map<String, String> tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException {
+      List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles,
+      List<Path> aborted, FileSystem fs, ValidTxnList validTxnList) throws IOException {
     for (HdfsDirSnapshot dirSnapshot : dirSnapshots.values()) {
-      FileStatus fStat = dirSnapshot.getFileStatus();
       Path dirPath = dirSnapshot.getPath();
       String dirName = dirPath.getName();
-      if (dirPath.equals(candidateDirectory)) {
+      // dirPath may contains the filesystem prefix
+      if (dirPath.toString().endsWith(candidateDirectory.toString())) {
         // if the candidateDirectory is itself a delta directory, we need to add originals in that directory
         // and return. This is the case when compaction thread calls getChildState.
         for (FileStatus fileStatus : dirSnapshot.getFiles()) {
@@ -1804,44 +1737,9 @@ public class AcidUtils {
           }
         }
       } else if (dirName.startsWith(BASE_PREFIX)) {
-        bestBase.dirSnapShot = dirSnapshot;
-        ParsedBase parsedBase = ParsedBase.parseBase(dirPath);
-        if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
-          continue;
-        }
-        final long writeId = parsedBase.getWriteId();
-        if (bestBase.oldestBaseWriteId > writeId) {
-          //keep track for error reporting
-          bestBase.oldestBase = dirPath;
-          bestBase.oldestBaseWriteId = writeId;
-        }
-        if (bestBase.status == null) {
-          if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
-            bestBase.status = fStat;
-            bestBase.writeId = writeId;
-          }
-        } else if (bestBase.writeId < writeId) {
-          if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
-            obsolete.add(bestBase.status.getPath());
-            bestBase.status = fStat;
-            bestBase.writeId = writeId;
-          }
-        } else {
-          obsolete.add(dirPath);
-        }
+        processBaseDir(dirPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, dirSnapshot);
       } else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) {
-        String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
-        ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs, dirSnapshot);
-        if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) {
-          continue;
-        }
-        if (ValidWriteIdList.RangeResponse.ALL == writeIdList
-            .isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
-          aborted.add(dirPath);
-        } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId)
-            != ValidWriteIdList.RangeResponse.NONE) {
-          working.add(delta);
-        }
+        processDeltaDir(dirPath, writeIdList, working, aborted, fs, validTxnList, dirSnapshot);
       } else {
         originalDirectories.add(dirPath);
         for (FileStatus stat : dirSnapshot.getFiles()) {
@@ -1852,27 +1750,72 @@ public class AcidUtils {
       }
     }
   }
-  
+
+  private static void processBaseDir(Path baseDir, ValidWriteIdList writeIdList, List<Path> obsolete, TxnBase bestBase,
+      List<Path> aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot)
+      throws IOException {
+    ParsedBase parsedBase = ParsedBase.parseBase(baseDir);
+    if (!isDirUsable(baseDir, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
+      return;
+    }
+    final long writeId = parsedBase.getWriteId();
+    if (bestBase.oldestBaseWriteId > writeId) {
+      // keep track for error reporting
+      bestBase.oldestBase = baseDir;
+      bestBase.oldestBaseWriteId = writeId;
+    }
+    if (bestBase.basePath == null) {
+      if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) {
+        bestBase.basePath = baseDir;
+        bestBase.writeId = writeId;
+      }
+    } else if (bestBase.writeId < writeId) {
+      if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) {
+        obsolete.add(bestBase.basePath);
+        bestBase.basePath = baseDir;
+        bestBase.writeId = writeId;
+      }
+    } else {
+      obsolete.add(baseDir);
+    }
+  }
+
+  private static void processDeltaDir(Path deltadir, ValidWriteIdList writeIdList, List<ParsedDelta> working,
+      List<Path> aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot)
+      throws IOException {
+    String dirName = deltadir.getName();
+    String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
+    ParsedDelta delta = parseDelta(deltadir, deltaPrefix, fs, dirSnapshot);
+    if (!isDirUsable(deltadir, delta.getVisibilityTxnId(), aborted, validTxnList)) {
+      return;
+    }
+    if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
+      aborted.add(deltadir);
+    } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId)
+        != ValidWriteIdList.RangeResponse.NONE) {
+      working.add(delta);
+    }
+  }
+
   /**
    * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
    */
-  private static boolean isDirUsable(Path child, long visibilityTxnId,
-      List<Path> aborted, ValidTxnList validTxnList) {
-    if(validTxnList == null) {
+  private static boolean isDirUsable(Path child, long visibilityTxnId, List<Path> aborted, ValidTxnList validTxnList) {
+    if (validTxnList == null) {
       throw new IllegalArgumentException("No ValidTxnList for " + child);
     }
-    if(!validTxnList.isTxnValid(visibilityTxnId)) {
+    if (!validTxnList.isTxnValid(visibilityTxnId)) {
       boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId);
-      if(isAborted) {
-        aborted.add(child);//so we can clean it up
+      if (isAborted) {
+        aborted.add(child);// so we can clean it up
       }
       LOG.debug("getChildState() ignoring(" + aborted + ") " + child);
       return false;
     }
     return true;
   }
-  public static HdfsFileStatusWithId createOriginalObj(
-      HdfsFileStatusWithId childWithId, FileStatus child) {
+
+  public static HdfsFileStatusWithId createOriginalObj(HdfsFileStatusWithId childWithId, FileStatus child) {
     return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child);
   }
 
@@ -2157,7 +2100,7 @@ public class AcidUtils {
    * Returns the logical end of file for an acid data file.
    *
    * This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
-   * by {@link #getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean, Map, boolean)}
+   * by {@link #getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean)}
    * and so won't be read at all.
    * @param file - data file to read/compute splits on
    */
@@ -2669,7 +2612,7 @@ public class AcidUtils {
           + " from " + jc.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
       return null;
     }
-    Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false, null, true);
+    Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false);
     // Assume that for an MM table, or if there's only the base directory, we are good.
     if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) {
       Utilities.FILE_OP_LOGGER.warn(
@@ -2707,7 +2650,7 @@ public class AcidUtils {
     // If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList.
     ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr);
     Directory acidInfo = AcidUtils.getAcidState(dataPath.getFileSystem(conf), dataPath, conf, validWriteIdList, null,
-        false, null, false);
+        false);
 
     for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
       pathList.add(hfs.getFileStatus().getPath());
@@ -3214,17 +3157,14 @@ public class AcidUtils {
    * @param candidateDirectory the partition directory to analyze
    * @param conf the configuration
    * @param writeIdList the list of write ids that we are reading
-   * @param useFileIds
-   * @param ignoreEmptyFiles
-   * @param tblproperties
-   * @param generateDirSnapshots
+   * @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds
+   * @param ignoreEmptyFiles Ignore files with 0 length
    * @return directory state
    * @throws IOException on errors
    */
   public static Directory getAcidStateFromCache(Supplier<FileSystem> fileSystem,
       Path candidateDirectory, Configuration conf,
-      ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
-      Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
+      ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
 
     int dirCacheDuration = HiveConf.getIntVar(conf,
         ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION);
@@ -3232,7 +3172,7 @@ public class AcidUtils {
     if (dirCacheDuration < 0) {
       LOG.debug("dirCache is not enabled");
       return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList,
-          useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots);
+          useFileIds, ignoreEmptyFiles);
     } else {
       initDirCache(dirCacheDuration);
     }
@@ -3270,8 +3210,7 @@ public class AcidUtils {
     // compute and add to cache
     if (recompute || (value == null)) {
       Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
-          writeIdList, useFileIds, ignoreEmptyFiles, tblproperties,
-          generateDirSnapshots);
+          writeIdList, useFileIds, ignoreEmptyFiles);
       value = new DirInfoValue(writeIdList.writeToString(), dirInfo);
 
       if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index ca234cf..073a3cb 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -696,7 +696,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
     if (hasAcidDirs) {
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
-          fs, dir, conf, validWriteIdList, Ref.from(false), true, null, false);
+          fs, dir, conf, validWriteIdList, Ref.from(false), true);
 
       // Find the base, created for IOW.
       Path base = dirInfo.getBaseDirectory();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 1059cb2..bf871f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1215,8 +1215,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     /**
      * For plain or acid tables this is the root of the partition (or table if not partitioned).
      * For MM table this is delta/ or base/ dir.  In MM case applying of the ValidTxnList that
-     * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} normally does has already
-     * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidWriteIdList)}.
+     * {@link AcidUtils#getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean)}
+     * normally does has already been done in
+     * {@link HiveInputFormat#processPathsForMmRead(List, Configuration, ValidWriteIdList, List, List)}
      */
     private final Path dir;
     private final Ref<Boolean> useFileIds;
@@ -1257,10 +1258,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private Directory getAcidState() throws IOException {
       if (context.isAcid && context.splitStrategyKind == SplitStrategyKind.BI) {
         return AcidUtils.getAcidStateFromCache(fs, dir, context.conf,
-            context.writeIdList, useFileIds, true, null, true);
+            context.writeIdList, useFileIds, true);
       } else {
         return AcidUtils.getAcidState(fs.get(), dir, context.conf, context.writeIdList,
-            useFileIds, true, null, true);
+            useFileIds, true);
       }
     }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 16c9159..6739a2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -463,7 +463,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          */
         //the split is from something other than the 1st file of the logical bucket - compute offset
         AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
-            validWriteIdList, Ref.from(false), true, null, true);
+            validWriteIdList, Ref.from(false), true);
         for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
           int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
           if (bucketIdFromPath != bucketId) {
@@ -577,7 +577,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       assert options.getOffset() == 0;
       assert options.getMaxOffset() == Long.MAX_VALUE;
       AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
-          validWriteIdList, Ref.from(false), true, null, true);
+          validWriteIdList, Ref.from(false), true);
       /**
        * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
        * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 598220b..51c06c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -725,7 +725,7 @@ public class VectorizedOrcAcidRowBatchReader
         //statementId is from directory name (or 0 if there is none)
       .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
     AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf,
-        validWriteIdList, Ref.from(false), true, null, true);
+        validWriteIdList, Ref.from(false), true);
     for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
       int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
       if (bucketIdFromPath != bucketId) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 2a15913..abde748 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -221,7 +221,7 @@ public class Cleaner extends MetaStoreCompactorThread {
       throws IOException, NoSuchObjectException, MetaException {
     Path locPath = new Path(location);
     AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from(
-        false), false, null, false);
+        false), false);
     List<Path> obsoleteDirs = dir.getObsolete();
     /**
      * add anything in 'dir'  that only has data from aborted transactions - no one should be
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 4e5d5b0..4365817 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -264,7 +264,7 @@ public class CompactorMR {
             maxDeltasToHandle, -1, conf, msc, ci.id, jobName);
       }
       //now recompute state since we've done minor compactions and have different 'best' set of deltas
-      dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, null, false);
+      dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false);
     }
 
     StringableList dirsToSearch = new StringableList();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 5b2c937..552e694 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -322,13 +322,13 @@ public class Initiator extends MetaStoreCompactorThread {
     boolean noBase = false;
     Path location = new Path(sd.getLocation());
     FileSystem fs = location.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false, tblproperties, false);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false);
     Path base = dir.getBaseDirectory();
     long baseSize = 0;
     FileStatus stat = null;
     if (base != null) {
       stat = fs.getFileStatus(base);
-      if (!stat.isDir()) {
+      if (!stat.isDirectory()) {
         LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!");
         return null;
       }
@@ -344,7 +344,7 @@ public class Initiator extends MetaStoreCompactorThread {
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     for (AcidUtils.ParsedDelta delta : deltas) {
       stat = fs.getFileStatus(delta.getPath());
-      if (!stat.isDir()) {
+      if (!stat.isDirectory()) {
         LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " +
             "but it's a file!");
         return null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index d83a50f..9344933 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -49,8 +49,7 @@ final class MinorQueryCompactor extends QueryCompactor {
     AcidUtils
         .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
     AcidUtils.Directory dir = AcidUtils
-        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
-            table.getParameters(), false);
+        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false);
     // Set up the session for driver.
     HiveConf conf = new HiveConf(hiveConf);
     conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 5e11d8d..bf7309a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -46,8 +46,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
     LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table
         .getTableName());
     AcidUtils.Directory dir = AcidUtils
-        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
-            table.getParameters(), false);
+        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false);
     QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
 
     // Set up the session for driver.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index 1bdec7d..9f1d379 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -48,9 +48,9 @@ final class MmMinorQueryCompactor extends QueryCompactor {
         "Going to delete directories for aborted transactions for MM table " + table.getDbName()
             + "." + table.getTableName());
 
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null,
-        new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
-        Ref.from(false), false, table.getParameters(), false);
+    AcidUtils.Directory dir = AcidUtils
+        .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
+            Ref.from(false), false);
     QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
 
     HiveConf driverConf = setUpDriverSession(hiveConf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 75941b3..784c95b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -497,7 +497,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
 
       // Don't start compaction or cleaning if not necessary
       AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
-          tblValidWriteIds, Ref.from(false), true, null, false);
+          tblValidWriteIds, Ref.from(false), true);
       if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
         if (needsCleaning(dir, sd)) {
           msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 337f469..61f2bd2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -932,11 +932,13 @@ public class TestTxnCommands2 {
     int[][] tableData = {{1,2},{3,4},{5,6}};
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
     runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
-    txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR));
+    CompactionRequest request = new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR);
+    request.setPartitionname("p=1");
+    txnHandler.compact(request);
     runWorker(hiveConf);
     runCleaner(hiveConf);
     runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3");
-    txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.toString(), CompactionType.MAJOR));
+    txnHandler.compact(request);
     runWorker(hiveConf);
     runCleaner(hiveConf);
     List<String> rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index f351f04..cadaeeb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -165,7 +165,7 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/_done", 0, new byte[0]),
         new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "/tbl/part1"), conf,
-        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
     assertEquals(null, dir.getBaseDirectory());
     assertEquals(0, dir.getCurrentDirectories().size());
     assertEquals(0, dir.getObsolete().size());
@@ -201,7 +201,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
-        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
     assertEquals(null, dir.getBaseDirectory());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -244,7 +244,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
-        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
     List<Path> obsoletes = dir.getObsolete();
     assertEquals(5, obsoletes.size());
@@ -275,7 +275,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
-        new ValidReaderWriteIdList(), null, false, null, true);
+        new ValidReaderWriteIdList(), null, false);
     assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString());
     assertEquals(0, dir.getObsolete().size());
     assertEquals(0, dir.getOriginalFiles().size());
@@ -284,23 +284,6 @@ public class TestAcidUtils {
   }
 
   @Test
-  public void testRecursiveDirListingIsNotReusedWhenSnapshotFalse() throws IOException {
-    Configuration conf = new Configuration();
-    MockFileSystem fs = new MockFileSystem(conf,
-        new MockFile("mock:/tbl/part1/base_0/bucket_0", 500, new byte[0]),
-        new MockFile("mock:/tbl/part1/base_0/_orc_acid_version", 10, new byte[0]));
-    conf.set(ValidTxnList.VALID_TXNS_KEY,
-        new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
-        new ValidReaderWriteIdList(), null, false, null, false);
-    assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString());
-    assertEquals(0, dir.getObsolete().size());
-    assertEquals(0, dir.getOriginalFiles().size());
-    assertEquals(0, dir.getCurrentDirectories().size());
-    assertEquals(2, fs.getNumOpenFileCalls());
-  }
-
-  @Test
   public void testObsoleteOriginals() throws Exception {
     Configuration conf = new Configuration();
     MockFileSystem fs = new MockFileSystem(conf,
@@ -312,7 +295,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:150:"
-        + Long.MAX_VALUE + ":"), null, false, null, false);
+        + Long.MAX_VALUE + ":"), null, false);
     // Obsolete list should include the two original bucket files, and the old base dir
     List<Path> obsoletes = dir.getObsolete();
     assertEquals(3, obsoletes.size());
@@ -335,7 +318,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
-        + Long.MAX_VALUE + ":"), null, false, null, false);
+        + Long.MAX_VALUE + ":"), null, false);
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -372,7 +355,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
-        + Long.MAX_VALUE + ":"), null, false, null, false);
+        + Long.MAX_VALUE + ":"), null, false);
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(5, obsolete.size());
@@ -401,7 +384,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
-        false, null, false);
+        false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -426,7 +409,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
-        false, null, false);
+        false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -443,7 +426,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
-        + Long.MAX_VALUE), null, false, null, false);
+        + Long.MAX_VALUE), null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(1, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -462,7 +445,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:3:"
-        + Long.MAX_VALUE), null, false, null, false);
+        + Long.MAX_VALUE), null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(1, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -488,7 +471,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
-        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
     List<Path> obsoletes = dir.getObsolete();
     assertEquals(7, obsoletes.size());
@@ -531,7 +514,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
-        + Long.MAX_VALUE + ":"), null, false, null, false);
+        + Long.MAX_VALUE + ":"), null, false);
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(3, obsolete.size());
@@ -562,7 +545,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
-        + Long.MAX_VALUE + ":"), null, false, null, false);
+        + Long.MAX_VALUE + ":"), null, false);
     List<Path> obsolete = dir.getObsolete();
     assertEquals(1, obsolete.size());
     assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString());
@@ -591,7 +574,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
-        + Long.MAX_VALUE + ":"), null, false, null, false);
+        + Long.MAX_VALUE + ":"), null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -616,7 +599,7 @@ public class TestAcidUtils {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
     AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
-        false, null, false);
+        false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(3, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index e4440e9..0d7b699 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -3692,7 +3692,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    assertEquals(6, readOpsDelta);
+    assertEquals(5, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3766,7 +3766,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    assertEquals(6, readOpsDelta);
+    assertEquals(5, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index f63c40a..e6fae44 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -597,7 +597,7 @@ public class TestOrcRawRecordMerger {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testEmpty:200:" + Long.MAX_VALUE);
-    AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false, null, false);
+    AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false);
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
         BUCKET);
@@ -668,8 +668,7 @@ public class TestOrcRawRecordMerger {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testNewBaseAndDelta:200:" + Long.MAX_VALUE);
-    AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format, null,
-        use130Format);
+    AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format);
 
     assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
     assertEquals(new Path(root, use130Format ?
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 3a3b267..5e0c386 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -943,7 +943,7 @@ public class TestStreaming {
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
     String... records) throws Exception {
     ValidWriteIdList writeIds = getTransactionContext(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -997,7 +997,8 @@ public class TestStreaming {
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
     String validationQuery, boolean vectorize, String... records) throws Exception {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1048,7 +1049,8 @@ public class TestStreaming {
     return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
   }
   private void checkNothingWritten(Path partitionPath) throws Exception {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
+    AcidUtils.Directory dir =
+        AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1995,7 +1997,7 @@ public class TestStreaming {
     /*now both batches have committed (but not closed) so we for each primary file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false);
     for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -2020,7 +2022,7 @@ public class TestStreaming {
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet committed.
     //lets check that side files exist, etc
-    dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
+    dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false);
     for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());