You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/07/24 02:56:00 UTC

[hive] branch master updated: HIVE-21225: ACID: getAcidState() should cache a recursive dir listing locally (Vaibhav Gumashta reviewed by Vineet Garg)

This is an automated email from the ASF dual-hosted git repository.

vgumashta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 959ebeb  HIVE-21225: ACID: getAcidState() should cache a recursive dir listing locally (Vaibhav Gumashta reviewed by Vineet Garg)
959ebeb is described below

commit 959ebeb680b07d59f4f55939862ebbc2d7f16a92
Author: Vaibhav Gumashta <vg...@apache.org>
AuthorDate: Tue Jul 23 19:55:30 2019 -0700

    HIVE-21225: ACID: getAcidState() should cache a recursive dir listing locally (Vaibhav Gumashta reviewed by Vineet Garg)
---
 .../hive/hcatalog/streaming/TestStreaming.java     |  12 +-
 .../hcatalog/streaming/mutate/StreamingAssert.java |   2 +-
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    | 486 ++++++++++++++++++---
 .../org/apache/hadoop/hive/ql/io/HdfsUtils.java    |   3 +-
 .../apache/hadoop/hive/ql/io/HiveInputFormat.java  |   6 +-
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java      |   4 +-
 .../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java  |  10 +-
 .../ql/io/orc/VectorizedOrcAcidRowBatchReader.java |   5 +-
 .../hadoop/hive/ql/session/SessionState.java       |   2 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |   4 +-
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |  17 +-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |   3 +-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java    |   1 +
 .../apache/hadoop/hive/ql/io/TestAcidUtils.java    | 111 ++---
 .../hive/ql/io/orc/TestInputOutputFormat.java      |  50 ++-
 .../hive/ql/io/orc/TestOrcRawRecordMerger.java     |   5 +-
 .../org/apache/hive/streaming/TestStreaming.java   |  10 +-
 17 files changed, 550 insertions(+), 181 deletions(-)

diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 4dc04f4..bc67d03 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -564,7 +564,7 @@ public class TestStreaming {
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
                                 String... records) throws Exception {
     ValidWriteIdList writeIds = getTransactionContext(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -617,7 +617,8 @@ public class TestStreaming {
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
                                 String validationQuery, boolean vectorize, String... records) throws Exception {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null,
+        false, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -667,7 +668,8 @@ public class TestStreaming {
     return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
   }
   private void checkNothingWritten(Path partitionPath) throws Exception {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null,
+        false, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1250,7 +1252,7 @@ public class TestStreaming {
     /*now both batches have committed (but not closed) so we for each primary file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1275,7 +1277,7 @@ public class TestStreaming {
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet committed.
     //lets check that side files exist, etc
-    dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+    dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
     for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index 78cae72..86f762e 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -96,7 +96,7 @@ public class StreamingAssert {
     writeIds = TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
 
     partitionLocation = getPartitionLocation();
-    dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds);
+    dir = AcidUtils.getAcidState(null, partitionLocation, conf, writeIds, null, false, null, true);
     assertEquals(0, dir.getObsolete().size());
     assertEquals(0, dir.getOriginalFiles().size());
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 295fe7c..f207bf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io;
 
 import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
+import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -27,6 +28,7 @@ import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -40,8 +42,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.hive.common.*;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -183,6 +187,39 @@ public class AcidUtils {
       return !name.startsWith("_") && !name.startsWith(".");
     }
   };
+  
+  public static final PathFilter acidHiddenFileFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path p) {
+      String name = p.getName();
+      // Don't filter out MetaDataFile.METADATA_FILE
+      if (name.startsWith(MetaDataFile.METADATA_FILE)) {
+        return true;
+      }
+      // Don't filter out OrcAcidVersion.ACID_FORMAT
+      if (name.startsWith(OrcAcidVersion.ACID_FORMAT)) {
+        return true;
+      }
+      return !name.startsWith("_") && !name.startsWith(".");
+    }
+  };
+  
+  public static final PathFilter acidTempDirFilter = new PathFilter() {
+    @Override
+    public boolean accept(Path dirPath) {
+      String dirPathStr = dirPath.toString();
+      // We don't want to filter out temp tables
+      if (dirPathStr.contains(SessionState.TMP_PREFIX)) {
+        return true;
+      }
+      if ((dirPathStr.contains("/.")) || (dirPathStr.contains("/_"))) {
+        return false;
+      } else {
+        return true;
+      }
+    }
+  };
+  
   public static final String VISIBILITY_PREFIX = "_v";
   public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+");
 
@@ -422,7 +459,7 @@ public class AcidUtils {
             .writingBase(true);
       } else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
         ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX,
-          bucketFile.getFileSystem(conf));
+          bucketFile.getFileSystem(conf), null);
         result
             .setOldStyle(false)
             .minimumWriteId(parsedDelta.minWriteId)
@@ -430,7 +467,7 @@ public class AcidUtils {
             .bucket(bucket);
       } else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
         ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
-          bucketFile.getFileSystem(conf));
+          bucketFile.getFileSystem(conf), null);
         result
             .setOldStyle(false)
             .minimumWriteId(parsedDelta.minWriteId)
@@ -493,6 +530,12 @@ public class AcidUtils {
     public List<Path> getAbortedDirectories() {
       return abortedDirectories;
     }
+    
+    @Override
+    public String toString() {
+      return "Aborted Directories: " + abortedDirectories + "; isBaseInRawFormat: " + isBaseInRawFormat + "; original: "
+          + original + "; obsolete: " + obsolete + "; deltas: " + deltas + "; base: " + base;
+    }
   }
 
   //This is used for (full) Acid tables.  InsertOnly use NOT_ACID
@@ -1029,26 +1072,26 @@ public class AcidUtils {
   public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
     String deltaDirName = deltaDir.getName();
     if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
-      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs);
+      return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs, null);
     }
-    return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix
+    return parsedDelta(deltaDir, DELTA_PREFIX, fs, null); // default prefix is delta_prefix
   }
 
-  private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs)
+  private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot)
     throws IOException {
-    ParsedDelta p = parsedDelta(path, deltaPrefix, fs);
+    ParsedDelta p = parsedDelta(path, deltaPrefix, fs, dirSnapshot);
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     return new ParsedDelta(p.getMinWriteId(),
         p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId);
   }
 
-  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
+  public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot)
     throws IOException {
     String filename = deltaDir.getName();
     boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
     if (filename.startsWith(deltaPrefix)) {
       //small optimization - delete delta can't be in raw format
-      boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs);
+      boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs, dirSnapshot);
       return parsedDelta(deltaDir, isRawFormat);
     }
     throw new IllegalArgumentException(deltaDir + " does not start with " +
@@ -1117,19 +1160,13 @@ public class AcidUtils {
     return false;
   }
 
-  @VisibleForTesting
-  public static Directory getAcidState(Path directory,
-      Configuration conf,
-      ValidWriteIdList writeIdList) throws IOException {
-    return getAcidState(directory, conf, writeIdList, false, false);
-  }
-
   /** State class for getChildState; cannot modify 2 things in a method. */
   private static class TxnBase {
     private FileStatus status;
     private long writeId = 0;
     private long oldestBaseWriteId = Long.MAX_VALUE;
     private Path oldestBase = null;
+    private HdfsDirSnapshot dirSnapShot;
   }
 
   /**
@@ -1143,33 +1180,10 @@ public class AcidUtils {
    * @return the state of the directory
    * @throws IOException
    */
-  public static Directory getAcidState(Path directory, Configuration conf,
-                                       ValidWriteIdList writeIdList,
-                                       boolean useFileIds,
-                                       boolean ignoreEmptyFiles) throws IOException {
-    return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null);
-  }
-
-  public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf,
-                                       ValidWriteIdList writeIdList,
-                                       boolean useFileIds,
-                                       boolean ignoreEmptyFiles) throws IOException {
-    return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null);
-  }
-
-  public static Directory getAcidState(Path directory, Configuration conf,
-                                       ValidWriteIdList writeIdList,
-                                       Ref<Boolean> useFileIds,
-                                       boolean ignoreEmptyFiles,
-                                       Map<String, String> tblproperties) throws IOException {
-    return getAcidState(null, directory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties);
-  }
-
-  public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf,
-                                       ValidWriteIdList writeIdList,
-                                       Ref<Boolean> useFileIds,
-                                       boolean ignoreEmptyFiles,
-                                       Map<String, String> tblproperties) throws IOException {
+  @VisibleForTesting
+  public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
+      ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
+      Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
     ValidTxnList validTxnList = null;
     String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
     if(!Strings.isNullOrEmpty(s)) {
@@ -1187,30 +1201,36 @@ public class AcidUtils {
       validTxnList.readFromString(s);
     }
 
-    FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem;
+    FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem;
     // The following 'deltas' includes all kinds of delta files including insert & delete deltas.
     final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
     List<ParsedDelta> working = new ArrayList<ParsedDelta>();
     List<Path> originalDirectories = new ArrayList<>();
     final List<Path> obsolete = new ArrayList<>();
     final List<Path> abortedDirectories = new ArrayList<>();
-    List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory);
+    List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory);
 
     TxnBase bestBase = new TxnBase();
     final List<HdfsFileStatusWithId> original = new ArrayList<>();
+    List<HdfsDirSnapshot> dirSnapshots = null;
     if (childrenWithId != null) {
       for (HdfsFileStatusWithId child : childrenWithId) {
-        getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original,
-            obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+        getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete,
+            bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
       }
     } else {
-      List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
-      for (FileStatus child : children) {
-        getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete,
+      if (generateDirSnapshots) {
+        dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory);
+        getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete,
             bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+      } else {
+        List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, candidateDirectory, hiddenFileFilter);
+        for (FileStatus child : children) {
+          getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase,
+              ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+        }
       }
     }
-
     // If we have a base, the original files are obsolete.
     if (bestBase.status != null) {
       // Add original files to obsolete list if any
@@ -1224,13 +1244,16 @@ public class AcidUtils {
       original.clear();
       originalDirectories.clear();
     } else {
-      // Okay, we're going to need these originals.  Recurse through them and figure out what we
-      // really need.
-      for (Path origDir : originalDirectories) {
-        findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true);
+      // Okay, we're going to need these originals.  
+      // Recurse through them and figure out what we really need.
+      // If we already have the original list, do nothing
+      // If dirSnapshots != null, we would have already populated "original"
+      if (dirSnapshots == null) {
+        for (Path origDir : originalDirectories) {
+          findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true);
+        }
       }
     }
-
     Collections.sort(working);
     //so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example
     //and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60,
@@ -1299,8 +1322,20 @@ public class AcidUtils {
               minOpenWriteId, bestBase.oldestBase.toString()));
     }
 
-    final Path base = bestBase.status == null ? null : bestBase.status.getPath();
-    LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
+    Path base = null;
+    boolean isBaseInRawFormat = false;
+    if (bestBase.status != null) {
+      base = bestBase.status.getPath();
+      isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, null);
+      if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) {
+        for (FileStatus stat : bestBase.dirSnapShot.getFiles()) {
+          if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
+            original.add(createOriginalObj(null, stat));
+          }
+        }
+      }
+    }
+    LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " +
         deltas.size());
     /**
      * If this sort order is changed and there are tables that have been converted to transactional
@@ -1313,12 +1348,228 @@ public class AcidUtils {
       //this does "Path.uri.compareTo(that.uri)"
       return o1.getFileStatus().compareTo(o2.getFileStatus());
     });
-
-    // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good.
-    final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs);
     return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original,
         obsolete, deltas, base);
   }
+  
+  public static List<HdfsDirSnapshot> getHdfsDirSnapshots(final FileSystem fs, final Path path) throws IOException {
+    try {
+      Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<Path, HdfsDirSnapshot>();
+      RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
+      while (itr.hasNext()) {
+        FileStatus fStatus = itr.next();
+        Path fPath = fStatus.getPath();
+        if (acidHiddenFileFilter.accept(fPath)) {
+          if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
+            HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+            if (dirSnapshot == null) {
+              dirSnapshot = new HdfsDirSnapshotImpl(fPath, fStatus);
+              dirToSnapshots.put(fPath, dirSnapshot);
+            }
+          } else {
+            Path parentDirPath = fPath.getParent();
+            if (acidTempDirFilter.accept(parentDirPath)) {
+              FileStatus parentDirFStatus = fs.getFileStatus(parentDirPath);
+              HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
+              if (dirSnapshot == null) {
+                dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus);
+                dirToSnapshots.put(parentDirPath, dirSnapshot);
+              }
+              // We're not filtering out the metadata file and acid format file, as they represent parts of a valid snapshot
+              // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task
+              if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) {
+                dirSnapshot.addMetadataFile(fStatus);
+              } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) {
+                dirSnapshot.addOrcAcidFormatFile(fStatus);
+              } else {
+                dirSnapshot.addFile(fStatus);
+              }
+            }
+          }
+        }
+      }
+      return new ArrayList<HdfsDirSnapshot>(dirToSnapshots.values());
+    } catch (IOException e) {
+      e.printStackTrace();
+      throw new IOException(e);
+    }
+  }
+  
+  /**
+   * DFS dir listing. 
+   * Captures a dir and the corresponding list of files it contains,
+   * with additional properties about the dir (like isBase etc)
+   *
+   */
+  public static interface HdfsDirSnapshot {
+    public Path getPath();
+
+    public void addOrcAcidFormatFile(FileStatus fStatus);
+    
+    public FileStatus getOrcAcidFormatFile();
+
+    public void addMetadataFile(FileStatus fStatus);
+    
+    public FileStatus getMetadataFile(FileStatus fStatus);
+
+    // FileStatus of this HDFS directory
+    public FileStatus getFileStatus();
+    
+    // Get the list of files if any within this directory
+    public List<FileStatus> getFiles();
+    
+    public void setFileStatus(FileStatus fStatus);
+    
+    public void addFile(FileStatus file);
+    
+    // File id or null
+    public Long getFileId();
+    
+    public Boolean isRawFormat();
+    
+    public void setIsRawFormat(boolean isRawFormat);
+    
+    public Boolean isBase();
+    
+    public void setIsBase(boolean isBase);
+    
+    public Boolean isValidBase();    
+    
+    public void setIsValidBase(boolean isValidBase);
+    
+    public Boolean isCompactedBase();    
+    
+    public void setIsCompactedBase(boolean isCompactedBase);
+  }
+  
+  public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot {
+    private Path dirPath;
+    private FileStatus fStatus;
+    private FileStatus metadataFStatus = null;
+    private FileStatus orcAcidFormatFStatus = null;
+    private List<FileStatus> files = new ArrayList<FileStatus>();
+    private Long fileId = null;
+    private Boolean isRawFormat = null;
+    private Boolean isBase = null;
+    private Boolean isValidBase = null;
+    private Boolean isCompactedBase = null;
+
+    public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List<FileStatus> files) {
+      this.dirPath = path;
+      this.fStatus = fStatus;
+      this.files = files;
+    }
+    
+    public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) {
+      this.dirPath = path;
+      this.fStatus = fStatus;
+    }
+    
+    @Override
+    public Path getPath() {
+      return dirPath;
+    }
+    
+    @Override
+    public FileStatus getFileStatus() {
+      return fStatus;
+    }
+    
+    @Override
+    public void setFileStatus(FileStatus fStatus) {
+      this.fStatus = fStatus;
+    }
+
+    @Override
+    public List<FileStatus> getFiles() {
+      return files;
+    }
+    
+    @Override
+    public void addFile(FileStatus file) {
+      files.add(file);
+    }
+
+    @Override
+    public Long getFileId() {
+      return fileId;
+    }
+
+    @Override
+    public Boolean isRawFormat() {
+      return isRawFormat;
+    }
+    
+    @Override
+    public void setIsRawFormat(boolean isRawFormat) {
+       this.isRawFormat = isRawFormat;
+    }
+
+    @Override
+    public Boolean isBase() {
+      return isBase;
+    }
+
+    @Override
+    public Boolean isValidBase() {
+      return isValidBase;
+    }
+
+    @Override
+    public Boolean isCompactedBase() {
+      return isCompactedBase;
+    }
+
+    @Override
+    public void setIsBase(boolean isBase) {
+      this.isBase = isBase;  
+    }
+
+    @Override
+    public void setIsValidBase(boolean isValidBase) {
+      this.isValidBase = isValidBase;
+    }
+
+    @Override
+    public void setIsCompactedBase(boolean isCompactedBase) {
+      this.isCompactedBase = isCompactedBase;     
+    }
+
+    @Override
+    public void addOrcAcidFormatFile(FileStatus fStatus) {
+      this.orcAcidFormatFStatus = fStatus;
+    }
+    
+    @Override
+    public FileStatus getOrcAcidFormatFile() {
+      return orcAcidFormatFStatus;
+    }
+
+    @Override
+    public void addMetadataFile(FileStatus fStatus) {
+      this.metadataFStatus = fStatus;
+    }
+
+    @Override
+    public FileStatus getMetadataFile(FileStatus fStatus) {
+      return metadataFStatus;
+    }
+    
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("Path: " + dirPath);
+      sb.append("; ");
+      sb.append("Files: { ");
+      for (FileStatus fstatus : files) {
+        sb.append(fstatus);
+        sb.append(", ");
+      }
+      sb.append(" }");
+      return sb.toString();
+    }
+  }
+  
   /**
    * We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view)
    * A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct
@@ -1342,6 +1593,19 @@ public class AcidUtils {
     //if here, it's a result of IOW
     return writeIdList.isWriteIdValid(parsedBase.getWriteId());
   }
+  
+  private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parsedBase, ValidWriteIdList writeIdList,
+      FileSystem fs) throws IOException {
+    boolean isValidBase;
+    if (dirSnapshot.isValidBase() != null) {
+      isValidBase = dirSnapshot.isValidBase();
+    } else {
+      isValidBase = isValidBase(parsedBase, writeIdList, fs);
+      dirSnapshot.setIsValidBase(isValidBase);
+    }
+    return isValidBase;
+  }
+  
   /**
    * Returns {@code true} if {@code parsedBase} was created by compaction.
    * As of Hive 4.0 we can tell if a directory is a result of compaction based on the
@@ -1349,10 +1613,10 @@ public class AcidUtils {
    * that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first
    * since that is the cheaper test.*/
   private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException {
-    return parsedBase.getVisibilityTxnId() > 0 ||
-        MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs);
+    return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs);
 
   }
+  
   private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
       ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
       List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase,
@@ -1393,7 +1657,7 @@ public class AcidUtils {
       }
     } else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) {
       String deltaPrefix = fn.startsWith(DELTA_PREFIX)  ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
-      ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs);
+      ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null);
       if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) {
         return;
       }
@@ -1413,6 +1677,75 @@ public class AcidUtils {
       originalDirectories.add(child.getPath());
     }
   }
+  
+  private static void getChildState(Path candidateDirectory, List<HdfsDirSnapshot> dirSnapshots, ValidWriteIdList writeIdList,
+      List<ParsedDelta> working, List<Path> originalDirectories, List<HdfsFileStatusWithId> original,
+      List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<Path> aborted,
+      Map<String, String> tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException {
+    for (HdfsDirSnapshot dirSnapshot : dirSnapshots) {
+      FileStatus fStat = dirSnapshot.getFileStatus();
+      Path dirPath = dirSnapshot.getPath();
+      String dirName = dirPath.getName();
+      if (dirName.startsWith(BASE_PREFIX)) {
+        bestBase.dirSnapShot = dirSnapshot;
+        ParsedBase parsedBase = ParsedBase.parseBase(dirPath);
+        if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
+          continue;
+        }
+        final long writeId = parsedBase.getWriteId();
+        if (bestBase.oldestBaseWriteId > writeId) {
+          //keep track for error reporting
+          bestBase.oldestBase = dirPath;
+          bestBase.oldestBaseWriteId = writeId;
+        }
+        if (bestBase.status == null) {
+          if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
+            bestBase.status = fStat;
+            bestBase.writeId = writeId;
+          }
+        } else if (bestBase.writeId < writeId) {
+          if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
+            obsolete.add(bestBase.status.getPath());
+            bestBase.status = fStat;
+            bestBase.writeId = writeId;
+          }
+        } else {
+          obsolete.add(dirPath);
+        }
+      } else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) {
+        String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
+        ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs, dirSnapshot);
+        if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) {
+          continue;
+        }
+        if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId,
+            delta.maxWriteId)) {
+          aborted.add(dirPath);
+        } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId,
+            delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) {
+          if (delta.isRawFormat) {
+            for (FileStatus stat : dirSnapshot.getFiles()) {
+              if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
+                original.add(createOriginalObj(null, stat));
+              }
+            }
+          } else {
+            working.add(delta);
+          }
+        }
+      } else {
+        if (!candidateDirectory.equals(dirPath)) {
+          originalDirectories.add(dirPath);
+        }
+        for (FileStatus stat : dirSnapshot.getFiles()) {
+          if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
+            original.add(createOriginalObj(null, stat));
+          }
+        }
+      }
+    }
+  }
+  
   /**
    * checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
    */
@@ -1494,10 +1827,13 @@ public class AcidUtils {
     }
   }
 
-  private static List<HdfsFileStatusWithId> tryListLocatedHdfsStatus(Ref<Boolean> useFileIds,
-                                                                     FileSystem fs, Path directory) {
-    Boolean val = useFileIds.value;
+  private static List<HdfsFileStatusWithId> tryListLocatedHdfsStatus(Ref<Boolean> useFileIds, FileSystem fs,
+      Path directory) {
     List<HdfsFileStatusWithId> childrenWithId = null;
+    if (useFileIds == null) {
+      return childrenWithId;
+    }
+    Boolean val = useFileIds.value;
     if (val == null || val) {
       try {
         childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
@@ -2012,7 +2348,7 @@ public class AcidUtils {
    */
   public static class MetaDataFile {
     //export command uses _metadata....
-    private static final String METADATA_FILE = "_metadata_acid";
+    public static final String METADATA_FILE = "_metadata_acid";
     private static final String CURRENT_VERSION = "0";
     //todo: enums? that have both field name and value list
     private interface Field {
@@ -2077,8 +2413,9 @@ public class AcidUtils {
      * This is only meaningful for full CRUD tables - Insert-only tables have all their data
      * in raw format by definition.
      * @param baseOrDeltaDir base or delta file.
+     * @param dirSnapshot 
      */
-    public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+    public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException {
       //todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have
       // files in raw format delta_x_y (x != y) whether from streaming ingested or compaction
       // must be native Acid format by definition
@@ -2099,13 +2436,19 @@ public class AcidUtils {
         }
       }
       //if here, have to check the files
-      Path dataFile = chooseFile(baseOrDeltaDir, fs);
+      Path dataFile;
+      if ((dirSnapshot != null) && (dirSnapshot.getFiles() != null) && (dirSnapshot.getFiles().size() > 0)) {
+        dataFile = dirSnapshot.getFiles().get(0).getPath();
+      } else {
+        dataFile = chooseFile(baseOrDeltaDir, fs);
+      }
       if (dataFile == null) {
         //directory is empty or doesn't have any that could have been produced by load data
         return false;
       }
       return isRawFormatFile(dataFile, fs);
     }
+    
     public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOException {
       try {
         Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf()));
@@ -2227,7 +2570,7 @@ public class AcidUtils {
           + " from " + jc.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
       return null;
     }
-    Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList);
+    Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false, null, true);
     // Assume that for an MM table, or if there's only the base directory, we are good.
     if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) {
       Utilities.FILE_OP_LOGGER.warn(
@@ -2264,7 +2607,8 @@ public class AcidUtils {
 
     // If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList.
     ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr);
-    Directory acidInfo = AcidUtils.getAcidState(dataPath, conf, validWriteIdList);
+    Directory acidInfo = AcidUtils.getAcidState(dataPath.getFileSystem(conf), dataPath, conf, validWriteIdList, null,
+        false, null, false);
 
     for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
       pathList.add(hfs.getFileStatus().getPath());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index 9d5ba3d..4de5c8c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -26,8 +26,9 @@ import java.net.URI;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
-
+import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index cff7e04..7e71c77 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -568,12 +568,12 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
     boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS);
     for (Path dir : dirs) {
-      processForWriteIds(
+      processForWriteIdsForMmRead(
           dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
     }
   }
 
-  private static void processForWriteIds(Path dir, Configuration conf,
+  private static void processForWriteIdsForMmRead(Path dir, Configuration conf,
       ValidWriteIdList validWriteIdList, boolean allowOriginals, List<Path> finalPaths,
       List<Path> pathsWithFileOriginals) throws IOException {
     FileSystem fs = dir.getFileSystem(conf);
@@ -605,7 +605,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
     }
     if (hasAcidDirs) {
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
-          fs, dir, conf, validWriteIdList, Ref.from(false), true, null);
+          fs, dir, conf, validWriteIdList, Ref.from(false), true, null, false);
 
       // Find the base, created for IOW.
       Path base = dirInfo.getBaseDirectory();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 707e38c..ec4994d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1273,7 +1273,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
       //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
       AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
-          fs, dir, context.conf, context.writeIdList, useFileIds, true, null);
+          fs, dir, context.conf, context.writeIdList, useFileIds, true, null, false);
       // find the base files (original or new style)
       List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
       if (dirInfo.getBaseDirectory() == null) {
@@ -2463,7 +2463,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       //it may also be null if there is no base - only deltas
       mergerOptions.baseDir(baseDirectory);
       if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
-        isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf));
+        isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf), null);
         mergerOptions.rootPath(baseDirectory.getParent());
       } else {
         isOriginal = true;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index b1ede05..2ac6232 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
+import org.apache.hive.common.util.Ref;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -462,9 +463,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
          * contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
          */
         //the split is from something other than the 1st file of the logical bucket - compute offset
-        AcidUtils.Directory directoryState
-                = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false,
-          true);
+        AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
+            validWriteIdList, Ref.from(false), true, null, true);
         for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
           int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
           if (bucketIdFromPath != bucketId) {
@@ -577,8 +577,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
       //when compacting each split needs to process the whole logical bucket
       assert options.getOffset() == 0;
       assert options.getMaxOffset() == Long.MAX_VALUE;
-      AcidUtils.Directory directoryState
-              = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true);
+      AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
+          validWriteIdList, Ref.from(false), true, null, true);
       /**
        * Note that for reading base_x/ or delta_x_x/ with non-acid schema,
        * {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 15f1f94..374b105 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hive.common.util.Ref;
 import org.apache.orc.ColumnStatistics;
 import org.apache.orc.IntegerColumnStatistics;
 import org.apache.orc.OrcConf;
@@ -722,8 +723,8 @@ public class VectorizedOrcAcidRowBatchReader
     int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
         //statementId is from directory name (or 0 if there is none)
       .statementId(syntheticTxnInfo.statementId).bucket(bucketId));
-    AcidUtils.Directory directoryState = AcidUtils.getAcidState(syntheticTxnInfo.folder, conf,
-        validWriteIdList, false, true);
+    AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf,
+        validWriteIdList, Ref.from(false), true, null, true);
     for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
       int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
       if (bucketIdFromPath != bucketId) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 9d631ed..4632361 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -123,7 +123,7 @@ import com.google.common.collect.Maps;
 public class SessionState {
   private static final Logger LOG = LoggerFactory.getLogger(SessionState.class);
 
-  private static final String TMP_PREFIX = "_tmp_space.db";
+  public static final String TMP_PREFIX = "_tmp_space.db";
   private static final String LOCAL_SESSION_PATH_KEY = "_hive.local.session.path";
   private static final String HDFS_SESSION_PATH_KEY = "_hive.hdfs.session.path";
   private static final String TMP_TABLE_SPACE_KEY = "_hive.tmp_table_space";
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 57eb506..5dfa7ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
@@ -225,7 +226,8 @@ public class Cleaner extends MetaStoreCompactorThread {
   private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
           throws IOException, NoSuchObjectException {
     Path locPath = new Path(location);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from(
+        false), false, null, false);
     List<Path> obsoleteDirs = dir.getObsolete();
     /**
      * add anything in 'dir'  that only has data from aborted transactions - no one should be
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 67a5e6d..7c79d7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -267,8 +267,8 @@ public class CompactorMR {
     // and discovering that in getSplits is too late as we then have no way to pass it to our
     // mapper.
 
-    AcidUtils.Directory dir = AcidUtils.getAcidState(
-        new Path(sd.getLocation()), conf, writeIds, false, true);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), true,
+        null, false);
 
     if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
       return;
@@ -298,7 +298,7 @@ public class CompactorMR {
             maxDeltasToHandle, -1, conf, msc, ci.id, jobName);
       }
       //now recompute state since we've done minor compactions and have different 'best' set of deltas
-      dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds);
+      dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, null, false);
     }
 
     StringableList dirsToSearch = new StringableList();
@@ -341,9 +341,8 @@ public class CompactorMR {
   private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
       CompactionInfo ci) throws IOException {
     AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds,
-      Ref.from(false), false,
-        t.getParameters());
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(
+        false), false, t.getParameters(), false);
 
     if (!isEnoughToCompact(dir, sd)) {
       return;
@@ -446,8 +445,8 @@ public class CompactorMR {
       StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
     LOG.debug("Going to delete directories for aborted transactions for MM table "
         + t.getDbName() + "." + t.getTableName());
-    AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()),
-        conf, writeIds, Ref.from(false), false, t.getParameters());
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false),
+        false, t.getParameters(), false);
     removeFilesForMmTable(conf, dir);
 
     // Then, actually do the compaction.
@@ -1036,7 +1035,7 @@ public class CompactorMR {
             dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
           boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
           boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
-            && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format
+            && AcidUtils.MetaDataFile.isRawFormat(dir, fs, null);//deltes can't be raw format
 
           FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter
             : AcidUtils.bucketFileFilter);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 6168fc0..5fb2552 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -291,7 +292,7 @@ public class Initiator extends MetaStoreCompactorThread {
     boolean noBase = false;
     Path location = new Path(sd.getLocation());
     FileSystem fs = location.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, false, false);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false, tblproperties, false);
     Path base = dir.getBaseDirectory();
     long baseSize = 0;
     FileStatus stat = null;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index d4abf42..dddf063 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -394,6 +394,7 @@ public class TestTxnCommands2 {
     //run Compaction
     runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
     TestTxnCommands2.runWorker(hiveConf);
+    //TestTxnCommands2.runCleaner(hiveConf);
     rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b");
     LOG.warn("after compact");
     for(String s : rs) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index ea31557..e4f0529 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -18,14 +18,16 @@
 package org.apache.hadoop.hive.ql.io;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
@@ -161,9 +163,8 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/random", 500, new byte[0]),
         new MockFile("mock:/tbl/part1/_done", 0, new byte[0]),
         new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf,
-            new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "/tbl/part1"), conf,
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
     assertEquals(null, dir.getBaseDirectory());
     assertEquals(0, dir.getCurrentDirectories().size());
     assertEquals(0, dir.getObsolete().size());
@@ -198,9 +199,8 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0]));
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(new MockPath(fs,
-            "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
     assertEquals(null, dir.getBaseDirectory());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -242,17 +242,20 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0]));
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(new MockPath(fs,
-            "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
-    List<Path> obsolete = dir.getObsolete();
-    assertEquals(5, obsolete.size());
-    assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString());
-    assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString());
-    assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).toString());
-    assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).toString());
-    assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).toString());
+    List<Path> obsoletes = dir.getObsolete();
+    assertEquals(5, obsoletes.size());
+    Set<String> obsoletePathNames = new HashSet<String>();
+    for (Path obsolete : obsoletes) {
+      obsoletePathNames.add(obsolete.toString());
+    }
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029"));
     assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     assertEquals(1, deltas.size());
@@ -273,12 +276,12 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:150:"
+        + Long.MAX_VALUE + ":"), null, false, null, false);
     // Obsolete list should include the two original bucket files, and the old base dir
-    List<Path> obsolete = dir.getObsolete();
-    assertEquals(3, obsolete.size());
-    assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).toString());
+    List<Path> obsoletes = dir.getObsolete();
+    assertEquals(3, obsoletes.size());
+    assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).toString());
     assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
   }
 
@@ -296,8 +299,8 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "mock:/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+        + Long.MAX_VALUE + ":"), null, false, null, false);
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(2, obsolete.size());
@@ -333,8 +336,8 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "mock:/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir
-            = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+        + Long.MAX_VALUE + ":"), null, false, null, false);
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(5, obsolete.size());
@@ -362,7 +365,8 @@ public class TestAcidUtils {
     //hypothetically, txn 50 is open and writing write ID 4
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
-    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
+        false, null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -386,7 +390,8 @@ public class TestAcidUtils {
     //hypothetically, txn 50 is open and writing write ID 4
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
-    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
+        false, null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -402,8 +407,8 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "mock:/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:4:" + Long.MAX_VALUE));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
+        + Long.MAX_VALUE), null, false, null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(1, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -421,8 +426,8 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "mock:/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:3:" + Long.MAX_VALUE));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:3:"
+        + Long.MAX_VALUE), null, false, null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(1, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -447,19 +452,22 @@ public class TestAcidUtils {
         new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new byte[0]));
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(new MockPath(fs,
-            "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
+        new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
     assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
-    List<Path> obsolete = dir.getObsolete();
-    assertEquals(7, obsolete.size());
-    assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString());
-    assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString());
-    assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).toString());
-    assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).toString());
-    assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).toString());
-    assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).toString());
-    assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).toString());
+    List<Path> obsoletes = dir.getObsolete();
+    assertEquals(7, obsoletes.size());
+    Set<String> obsoletePathNames = new HashSet<String>();
+    for (Path obsolete : obsoletes) {
+      obsoletePathNames.add(obsolete.toString());
+    }
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_025_030"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_029_029"));
+    assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029"));
     assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
     assertEquals(2, deltas.size());
@@ -487,8 +495,8 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "mock:/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+        + Long.MAX_VALUE + ":"), null, false, null, false);
     assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
     List<Path> obsolete = dir.getObsolete();
     assertEquals(3, obsolete.size());
@@ -518,8 +526,8 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "mock:/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+        + Long.MAX_VALUE + ":"), null, false, null, false);
     List<Path> obsolete = dir.getObsolete();
     assertEquals(1, obsolete.size());
     assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString());
@@ -547,8 +555,8 @@ public class TestAcidUtils {
     Path part = new MockPath(fs, "mock:/tbl/part1");
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
-    AcidUtils.Directory dir =
-        AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:4:" + Long.MAX_VALUE + ":"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
+        + Long.MAX_VALUE + ":"), null, false, null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(2, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -572,7 +580,8 @@ public class TestAcidUtils {
     //hypothetically, txn 50 is open and writing write ID 4
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
-    AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4"));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
+        false, null, false);
     List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
     assertEquals(3, delts.size());
     assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index b5958fa..97a18d0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -748,13 +748,13 @@ public class TestInputOutputFormat {
     try {
       MockFileSystem fs = new MockFileSystem(conf);
       MockFileSystem.addGlobalFile(
-        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
+        new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
       MockFileSystem.addGlobalFile(
-        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+        new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
       MockFileSystem.addGlobalFile(
-        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
+        new MockFile("mock:/a/base_0000001/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
       MockFileSystem.addGlobalFile(
-        new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
+        new MockFile("mock:/a/base_0000001/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
       MockFileSystem.addGlobalFile(
         new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
       MockFileSystem.addGlobalFile(
@@ -763,6 +763,14 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
       MockFileSystem.addGlobalFile(
         new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
+      MockFileSystem.addGlobalFile(
+          new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
+        MockFileSystem.addGlobalFile(
+          new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+        MockFileSystem.addGlobalFile(
+          new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
+        MockFileSystem.addGlobalFile(
+          new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
 
       conf.set("bucket_count", "4");
       //set up props for read
@@ -803,7 +811,7 @@ public class TestInputOutputFormat {
       System.out.println("STATS TRACE END - " + testCaseName.getMethodName());
       int delta = readsAfter - readsBefore;
       //HIVE-16812 adds 1 read of the footer of each file (only if delete delta exists)
-      assertEquals(8, delta);
+      assertEquals(12, delta);
     } finally {
       MockFileSystem.clearGlobalFiles();
     }
@@ -2812,7 +2820,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktable
+    // call-1: getAcidState - mock:/mocktable
     // call-2: open - mock:/mocktable/0_0
     // call-3: open - mock:/mocktable/0_1
     assertEquals(3, readOpsDelta);
@@ -2870,7 +2878,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl
+    // call-1: getAcidState - mock:/mocktbl
     // call-2: open - mock:/mocktbl/0_0
     // call-3: open - mock:/mocktbl/0_1
     assertEquals(3, readOpsDelta);
@@ -2890,7 +2898,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl
+    // call-1: getAcidState - mock:/mocktbl
     assertEquals(1, readOpsDelta);
 
     // enable cache and use default strategy
@@ -2909,7 +2917,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl
+    // call-1: getAcidState - mock:/mocktbl
     // call-2: open - mock:/mocktbl/0_0
     // call-3: open - mock:/mocktbl/0_1
     assertEquals(3, readOpsDelta);
@@ -2927,7 +2935,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl
+    // call-1: getAcidState - mock:/mocktbl
     assertEquals(1, readOpsDelta);
 
     // revert back to local fs
@@ -2981,7 +2989,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktable
+    // call-1: getAcidState - mock:/mocktable
     // call-2: open - mock:/mocktbl1/0_0
     // call-3: open - mock:/mocktbl1/0_1
     assertEquals(3, readOpsDelta);
@@ -3020,7 +3028,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktable
+    // call-1: getAcidState - mock:/mocktable
     // call-2: open - mock:/mocktbl1/0_0
     // call-3: open - mock:/mocktbl1/0_1
     assertEquals(3, readOpsDelta);
@@ -3038,7 +3046,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl1
+    // call-1: getAcidState - mock:/mocktbl1
     assertEquals(1, readOpsDelta);
 
     // revert back to local fs
@@ -3093,7 +3101,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl2
+    // call-1: getAcidState - mock:/mocktbl2
     // call-2: open - mock:/mocktbl2/0_0
     // call-3: open - mock:/mocktbl2/0_1
     assertEquals(3, readOpsDelta);
@@ -3115,7 +3123,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl2
+    // call-1: getAcidState - mock:/mocktbl2
     // call-2: open - mock:/mocktbl2/0_1
     assertEquals(2, readOpsDelta);
 
@@ -3136,7 +3144,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl2
+    // call-1: getAcidState - mock:/mocktbl2
     // call-2: open - mock:/mocktbl2/0_0
     assertEquals(2, readOpsDelta);
 
@@ -3153,7 +3161,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    // call-1: listLocatedStatus - mock:/mocktbl2
+    // call-1: getAcidState - mock:/mocktbl2
     assertEquals(1, readOpsDelta);
 
     // revert back to local fs
@@ -3525,7 +3533,7 @@ public class TestInputOutputFormat {
     // call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read)
     // call-7: open to read footer - split 2 => mock:/mocktable5/0_0 (to get row count)
     // call-8: file status - split 2 => mock:/mocktable5/0_0
-    assertEquals(8, readOpsDelta);
+    assertEquals(12, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3604,7 +3612,7 @@ public class TestInputOutputFormat {
     // call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6
     // call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file)
     // call-6: file stat - split 2 => mock:/mocktable6/0_0
-    assertEquals(6, readOpsDelta);
+    assertEquals(10, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3679,7 +3687,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    assertEquals(7, readOpsDelta);
+    assertEquals(12, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
@@ -3753,7 +3761,7 @@ public class TestInputOutputFormat {
         readOpsDelta = statistics.getReadOps() - readOpsBefore;
       }
     }
-    assertEquals(5, readOpsDelta);
+    assertEquals(10, readOpsDelta);
 
     // revert back to local fs
     conf.set("fs.defaultFS", "file:///");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 8451462..d4fd773 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -595,7 +595,7 @@ public class TestOrcRawRecordMerger {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testEmpty:200:" + Long.MAX_VALUE);
-    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, writeIdList);
+    AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false, null, false);
 
     Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
         BUCKET);
@@ -666,7 +666,8 @@ public class TestOrcRawRecordMerger {
     conf.set(ValidTxnList.VALID_TXNS_KEY,
         new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
     ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testNewBaseAndDelta:200:" + Long.MAX_VALUE);
-    AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, writeIdList);
+    AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format, null,
+        use130Format);
 
     assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
     assertEquals(new Path(root, use130Format ?
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index c6d7e7f..dbff263 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -945,7 +945,7 @@ public class TestStreaming {
   private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
     String... records) throws Exception {
     ValidWriteIdList writeIds = getTransactionContext(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -999,7 +999,7 @@ public class TestStreaming {
    */
   private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
     String validationQuery, boolean vectorize, String... records) throws Exception {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1050,7 +1050,7 @@ public class TestStreaming {
     return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
   }
   private void checkNothingWritten(Path partitionPath) throws Exception {
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
     Assert.assertEquals(0, dir.getObsolete().size());
     Assert.assertEquals(0, dir.getOriginalFiles().size());
     List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1947,7 +1947,7 @@ public class TestStreaming {
     /*now both batches have committed (but not closed) so we for each primary file we expect a side
     file to exist and indicate the true length of primary file*/
     FileSystem fs = partLoc.getFileSystem(conf);
-    AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+    AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
     for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1972,7 +1972,7 @@ public class TestStreaming {
     //so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length.  Furthermore, each bucket0
     //has now received more data(logically - it's buffered) but it is not yet committed.
     //lets check that side files exist, etc
-    dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+    dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
     for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
       for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
         Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());