You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/02/24 22:08:13 UTC

[hive] branch master updated: HIVE-22825 : Reduce directory lookup cost for acid tables (Rajesh Balamohan via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0767c5d  HIVE-22825 : Reduce directory lookup cost for acid tables (Rajesh Balamohan via Ashutosh Chauhan)
0767c5d is described below

commit 0767c5d549b2618f3449fc14a0214dc34a5c8a42
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Mon Feb 24 14:07:28 2020 -0800

    HIVE-22825 : Reduce directory lookup cost for acid tables (Rajesh Balamohan via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   3 +
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    | 168 +++++++++++++++++++++
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java      |  75 +++++----
 .../hive/ql/io/orc/TestInputOutputFormat.java      |  74 ++++-----
 .../orc/TestVectorizedOrcAcidRowBatchReader.java   |   2 +-
 5 files changed, 246 insertions(+), 76 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 583603c..e419dc5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2689,6 +2689,9 @@ public class HiveConf extends Configuration {
         "are not hidden by the INSERT OVERWRITE."),
     HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true,
         "Whether Hive supports transactional stats (accurate stats for transactional tables)"),
+    HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration",
+        120, "Enable dir cache for ACID tables specified in minutes."
+        + "0 indicates cache is disabled. "),
 
     HIVE_TXN_READONLY_ENABLED("hive.txn.readonly.enabled", false,
       "Enables read-only transaction classification and related optimizations"),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 5d57509..76ea6c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -35,11 +35,16 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import com.google.common.base.Strings;
 import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -164,6 +169,9 @@ public class AcidUtils {
   public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
   public static final Pattern BUCKET_PATTERN = Pattern.compile("bucket_([0-9]+)(_[0-9]+)?$");
 
+  private static Cache<String, DirInfoValue> dirCache;
+  private static AtomicBoolean dirCacheInited = new AtomicBoolean();
+
   /**
    * A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
    * (Unless via Load Data statement)
@@ -548,6 +556,7 @@ public class AcidUtils {
     private final List<Path> obsolete;
     private final List<ParsedDelta> deltas;
     private final Path base;
+    private List<HdfsFileStatusWithId> baseFiles;
 
     public DirectoryImpl(List<Path> abortedDirectories,
         boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
@@ -566,6 +575,14 @@ public class AcidUtils {
       return base;
     }
 
+    public List<HdfsFileStatusWithId> getBaseFiles() {
+      return baseFiles;
+    }
+
+    void setBaseFiles(List<HdfsFileStatusWithId> baseFiles) {
+      this.baseFiles = baseFiles;
+    }
+
     @Override
     public boolean isBaseInRawFormat() {
       return isBaseInRawFormat;
@@ -845,6 +862,9 @@ public class AcidUtils {
      * @return the base directory to read
      */
     Path getBaseDirectory();
+
+    List<HdfsFileStatusWithId> getBaseFiles();
+
     boolean isBaseInRawFormat();
 
     /**
@@ -3105,4 +3125,152 @@ public class AcidUtils {
             astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ?
       TxnType.READ_ONLY : TxnType.DEFAULT;
   }
+
+  public static List<HdfsFileStatusWithId> findBaseFiles(
+      Path base, Ref<Boolean> useFileIds, Supplier<FileSystem> fs) throws IOException {
+    Boolean val = useFileIds.value;
+    if (val == null || val) {
+      try {
+        List<HdfsFileStatusWithId> result = SHIMS.listLocatedHdfsStatus(
+            fs.get(), base, AcidUtils.hiddenFileFilter);
+        if (val == null) {
+          useFileIds.value = true; // The call succeeded, so presumably the API is there.
+        }
+        return result;
+      } catch (Throwable t) {
+        LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+        if (val == null && t instanceof UnsupportedOperationException) {
+          useFileIds.value = false;
+        }
+      }
+    }
+
+    // Fall back to regular API and create states without ID.
+    List<FileStatus> children = HdfsUtils.listLocatedStatus(fs.get(), base, AcidUtils.hiddenFileFilter);
+    List<HdfsFileStatusWithId> result = new ArrayList<>(children.size());
+    for (FileStatus child : children) {
+      result.add(AcidUtils.createOriginalObj(null, child));
+    }
+    return result;
+  }
+
+  private static void initDirCache(int durationInMts) {
+    if (dirCacheInited.get()) {
+      LOG.debug("DirCache got initialized already");
+      return;
+    }
+    dirCache = CacheBuilder.newBuilder()
+        .expireAfterWrite(durationInMts, TimeUnit.MINUTES)
+        .softValues()
+        .build();
+    dirCacheInited.set(true);
+  }
+
+  /**
+   * Tries to get directory details from cache. For now, cache is valid only
+   * when base directory is available and no deltas are present. This should
+   * be used only in BI strategy and for ACID tables.
+   *
+   * @param fileSystem file system supplier
+   * @param candidateDirectory the partition directory to analyze
+   * @param conf the configuration
+   * @param writeIdList the list of write ids that we are reading
+   * @param useFileIds
+   * @param ignoreEmptyFiles
+   * @param tblproperties
+   * @param generateDirSnapshots
+   * @return directory state
+   * @throws IOException on errors
+   */
+  public static Directory getAcidStateFromCache(Supplier<FileSystem> fileSystem,
+      Path candidateDirectory, Configuration conf,
+      ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
+      Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
+
+    int dirCacheDuration = HiveConf.getIntVar(conf,
+        ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION);
+
+    if (dirCacheDuration <= 0) {
+      LOG.debug("dirCache is not enabled");
+      return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList,
+          useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots);
+    } else {
+      initDirCache(dirCacheDuration);
+    }
+
+    /*
+     * Cache for single case, where base directory is there without deltas.
+     * In case of changes, cache would get invalidated based on
+     * open/aborted list
+     */
+    //dbName + tableName + dir
+    String key = writeIdList.getTableName() + "_" + candidateDirectory.toString();
+    DirInfoValue value = dirCache.getIfPresent(key);
+
+    // in case of open/aborted txns, recompute dirInfo
+    long[] exceptions = writeIdList.getInvalidWriteIds();
+    boolean recompute = (exceptions != null && exceptions.length > 0);
+
+    if (recompute) {
+      LOG.info("invalidating cache entry for key: {}", key);
+      dirCache.invalidate(key);
+      value = null;
+    }
+
+    if (value != null) {
+      // double check writeIds
+      if (!value.getTxnString().equalsIgnoreCase(writeIdList.writeToString())) {
+        if (LOG.isDebugEnabled()) {
+          LOG.info("writeIdList: {} from cache: {} is not matching "
+              + "for key: {}", writeIdList.writeToString(),
+              value.getTxnString(), key);
+        }
+        recompute = true;
+      }
+    }
+
+    // compute and add to cache
+    if (recompute || (value == null)) {
+      Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
+          writeIdList, useFileIds, ignoreEmptyFiles, tblproperties,
+          generateDirSnapshots);
+      value = new DirInfoValue(writeIdList.writeToString(), dirInfo);
+
+      if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null
+          && value.dirInfo.getCurrentDirectories().isEmpty()) {
+        populateBaseFiles(dirInfo, useFileIds, fileSystem);
+        dirCache.put(key, value);
+      }
+    } else {
+      LOG.info("Got {} from cache, cache size: {}", key, dirCache.size());
+    }
+    return value.getDirInfo();
+  }
+
+  private static void populateBaseFiles(Directory dirInfo,
+      Ref<Boolean> useFileIds, Supplier<FileSystem> fileSystem) throws IOException {
+    if (dirInfo.getBaseDirectory() != null) {
+      // Cache base directory contents
+      List<HdfsFileStatusWithId> children = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fileSystem);
+      ((DirectoryImpl) dirInfo).setBaseFiles(children);
+    }
+  }
+
+  static class DirInfoValue {
+    private String txnString;
+    private AcidUtils.Directory dirInfo;
+
+    DirInfoValue(String txnString, AcidUtils.Directory dirInfo) {
+      this.txnString = txnString;
+      this.dirInfo = dirInfo;
+    }
+
+    String getTxnString() {
+      return txnString;
+    }
+
+    AcidUtils.Directory getDirInfo() {
+      return dirInfo;
+    }
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 03f7086..7c8f479 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -44,6 +44,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
@@ -1210,7 +1211,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
    */
   static final class FileGenerator implements Callable<AcidDirInfo> {
     private final Context context;
-    private final FileSystem fs;
+    private final Supplier<FileSystem> fs;
     /**
      * For plain or acid tables this is the root of the partition (or table if not partitioned).
      * For MM table this is delta/ or base/ dir.  In MM case applying of the ValidTxnList that
@@ -1222,12 +1223,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     private final UserGroupInformation ugi;
 
     @VisibleForTesting
-    FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds,
+    FileGenerator(Context context, Supplier<FileSystem> fs, Path dir, boolean useFileIds,
         UserGroupInformation ugi) {
       this(context, fs, dir, Ref.from(useFileIds), ugi);
     }
 
-    FileGenerator(Context context, FileSystem fs, Path dir, Ref<Boolean> useFileIds,
+    FileGenerator(Context context, Supplier<FileSystem> fs, Path dir, Ref<Boolean> useFileIds,
         UserGroupInformation ugi) {
       this.context = context;
       this.fs = fs;
@@ -1253,6 +1254,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
       }
     }
 
+    private Directory getAcidState() throws IOException {
+      if (context.isAcid && context.splitStrategyKind == SplitStrategyKind.BI) {
+        return AcidUtils.getAcidStateFromCache(fs, dir, context.conf,
+            context.writeIdList, useFileIds, true, null, true);
+      } else {
+        return AcidUtils.getAcidState(fs.get(), dir, context.conf, context.writeIdList,
+            useFileIds, true, null, true);
+      }
+    }
+
+
     private AcidDirInfo callInternal() throws IOException {
       if (context.acidOperationalProperties != null
           && context.acidOperationalProperties.isInsertOnly()) {
@@ -1265,16 +1277,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             context.conf.getBoolean("mapred.input.dir.recursive", false));
         List<HdfsFileStatusWithId> originals = new ArrayList<>();
         List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
-        AcidUtils.findOriginals(fs, dir, originals, useFileIds, true, isRecursive);
+        AcidUtils.findOriginals(fs.get(), dir, originals, useFileIds, true, isRecursive);
         for (HdfsFileStatusWithId fileId : originals) {
           baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
         }
-        return new AcidDirInfo(fs, dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals,
+        return new AcidDirInfo(fs.get(), dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals,
             Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>());
       }
       //todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
-      AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
-          fs, dir, context.conf, context.writeIdList, useFileIds, true, null, true);
+      AcidUtils.Directory dirInfo  = getAcidState();
       // find the base files (original or new style)
       List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
       if (dirInfo.getBaseDirectory() == null) {
@@ -1283,7 +1294,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
           baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
         }
       } else {
-        List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds);
+        List<HdfsFileStatusWithId> compactedBaseFiles = dirInfo.getBaseFiles();
+        if (compactedBaseFiles == null) {
+          compactedBaseFiles = AcidUtils.findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fs);
+        }
         for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
           baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ?
             AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
@@ -1324,7 +1338,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
             if (val == null || val) {
               try {
                 List<HdfsFileStatusWithId> insertDeltaFiles =
-                    SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter);
+                    SHIMS.listLocatedHdfsStatus(fs.get(), parsedDelta.getPath(), bucketFilter);
                 for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
                   baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
                 }
@@ -1340,7 +1354,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
               }
             }
             // Fall back to regular API and create statuses without ID.
-            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter);
+            List<FileStatus> children = HdfsUtils.listLocatedStatus(fs.get(),
+                parsedDelta.getPath(), bucketFilter);
             for (FileStatus child : children) {
               HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
               baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
@@ -1359,35 +1374,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
         // should be considered as usual.
         parsedDeltas.addAll(dirInfo.getCurrentDirectories());
       }
-      return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas);
-    }
-
-    private List<HdfsFileStatusWithId> findBaseFiles(
-        Path base, Ref<Boolean> useFileIds) throws IOException {
-      Boolean val = useFileIds.value;
-      if (val == null || val) {
-        try {
-          List<HdfsFileStatusWithId> result = SHIMS.listLocatedHdfsStatus(
-              fs, base, AcidUtils.hiddenFileFilter);
-          if (val == null) {
-            useFileIds.value = true; // The call succeeded, so presumably the API is there.
-          }
-          return result;
-        } catch (Throwable t) {
-          LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
-          if (val == null && t instanceof UnsupportedOperationException) {
-            useFileIds.value = false;
-          }
-        }
-      }
-
-      // Fall back to regular API and create states without ID.
-      List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter);
-      List<HdfsFileStatusWithId> result = new ArrayList<>(children.size());
-      for (FileStatus child : children) {
-        result.add(AcidUtils.createOriginalObj(null, child));
-      }
-      return result;
+      return new AcidDirInfo(fs.get(), dir, dirInfo, baseFiles, parsedDeltas);
     }
   }
 
@@ -1837,8 +1824,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
     Path[] paths = getInputPaths(conf);
     CompletionService<AcidDirInfo> ecs = new ExecutorCompletionService<>(Context.threadPool);
     for (Path dir : paths) {
-      FileSystem fs = dir.getFileSystem(conf);
-      FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds, ugi);
+      Supplier<FileSystem> fsSupplier = () -> {
+        try {
+          return dir.getFileSystem(conf);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      };
+      FileGenerator fileGenerator = new FileGenerator(context, fsSupplier, dir, useFileIds, ugi);
       pathFutures.add(ecs.submit(fileGenerator));
     }
 
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 5f5ea4e..3884a10 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -526,7 +526,7 @@ public class TestInputOutputFormat {
           final OrcInputFormat.Context context = new OrcInputFormat.Context(
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
-              context, fs, new MockPath(fs, "mock:/a/b"), false, null);
+              context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null);
           List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
           assertEquals(1, splitStrategies.size());
           final SplitStrategy splitStrategy = splitStrategies.get(0);
@@ -549,7 +549,7 @@ public class TestInputOutputFormat {
           final OrcInputFormat.Context context = new OrcInputFormat.Context(
               conf, n);
           OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
-              context, fs, new MockPath(fs, "mock:/a/b"), false, null);
+              context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null);
           List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
           assertEquals(1, splitStrategies.size());
           final SplitStrategy splitStrategy = splitStrategies.get(0);
@@ -567,14 +567,14 @@ public class TestInputOutputFormat {
   @Test
   public void testFileGenerator() throws Exception {
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
-    MockFileSystem fs = new MockFileSystem(conf,
+    final MockFileSystem fs = new MockFileSystem(conf,
         new MockFile("mock:/a/b/part-00", 1000, new byte[1]),
         new MockFile("mock:/a/b/part-01", 1000, new byte[1]),
         new MockFile("mock:/a/b/_part-02", 1000, new byte[1]),
         new MockFile("mock:/a/b/.part-03", 1000, new byte[1]),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1]));
     OrcInputFormat.FileGenerator gen =
-      new OrcInputFormat.FileGenerator(context, fs,
+      new OrcInputFormat.FileGenerator(context, () -> fs,
           new MockPath(fs, "mock:/a/b"), false, null);
     List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
@@ -582,13 +582,13 @@ public class TestInputOutputFormat {
 
     conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
     context = new OrcInputFormat.Context(conf);
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs1 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/part-00", 1000, new byte[1000]),
         new MockFile("mock:/a/b/part-01", 1000, new byte[1000]),
         new MockFile("mock:/a/b/_part-02", 1000, new byte[1000]),
         new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
-    gen = new OrcInputFormat.FileGenerator(context, fs,
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs1,
             new MockPath(fs, "mock:/a/b"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
@@ -608,7 +608,7 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/delta_001_002/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delta_001_002/bucket_000001", 1000, new byte[1], new MockBlock("host1")));
     OrcInputFormat.FileGenerator gen =
-        new OrcInputFormat.FileGenerator(context, fs,
+        new OrcInputFormat.FileGenerator(context, () -> fs,
             new MockPath(fs, "mock:/a"), false, null);
     List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -630,11 +630,11 @@ public class TestInputOutputFormat {
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
 
     // Case 1: Test with just originals => Single split strategy with two splits.
-    MockFileSystem fs = new MockFileSystem(conf,
+    final MockFileSystem fs = new MockFileSystem(conf,
         new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")));
     OrcInputFormat.FileGenerator gen =
-        new OrcInputFormat.FileGenerator(context, fs,
+        new OrcInputFormat.FileGenerator(context, () -> fs,
             new MockPath(fs, "mock:/a"), false, null);
     List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
@@ -648,12 +648,13 @@ public class TestInputOutputFormat {
 
     // Case 2: Test with originals and base => Single split strategy with two splits on compacted
     // base since the presence of a base will make the originals obsolete.
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs1 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
-    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs1, new MockPath(fs1,
+        "mock:/a"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -665,14 +666,15 @@ public class TestInputOutputFormat {
     assertFalse(splits.get(1).isOriginal());
 
     // Case 3: Test with originals and deltas => Two split strategies with two splits for each.
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs3 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
-    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs3,
+        new MockPath(fs3, "mock:/a"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(2, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -697,12 +699,13 @@ public class TestInputOutputFormat {
     // The reason why we are able to do so is because the valid user data has already been considered
     // as base for the covered buckets. Hence, the uncovered buckets do not have any relevant
     // data and we can just ignore them.
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs4 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
-    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs4, new MockPath(fs4,
+        "mock:/a"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(2, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -718,7 +721,7 @@ public class TestInputOutputFormat {
 
     // Case 5: Test with originals, compacted_base, insert_deltas, delete_deltas (exhaustive test)
     // This should just generate one strategy with splits for base and insert_deltas.
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs5 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
@@ -727,7 +730,8 @@ public class TestInputOutputFormat {
         new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
         new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
-    gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs5, new MockPath(fs5,
+        "mock:/a"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -789,7 +793,9 @@ public class TestInputOutputFormat {
       conf.set("fs.mock.impl", MockFileSystem.class.getName());
       OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
 
-      OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"),
+      OrcInputFormat.FileGenerator gen =
+          new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs,
+              "mock:/a"),
         false, null);
       List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
       assertEquals(1, splitStrategies.size());
@@ -821,14 +827,14 @@ public class TestInputOutputFormat {
   public void testBIStrategySplitBlockBoundary() throws Exception {
     conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
-    MockFileSystem fs = new MockFileSystem(conf,
+    final MockFileSystem fs = new MockFileSystem(conf,
         new MockFile("mock:/a/b/part-00", 1000, new byte[1], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-01", 1000, new byte[1], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-02", 1000, new byte[1], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-03", 1000, new byte[1], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1], new MockBlock("host1", "host2")));
     OrcInputFormat.FileGenerator gen =
-        new OrcInputFormat.FileGenerator(context, fs,
+        new OrcInputFormat.FileGenerator(context, () -> fs,
             new MockPath(fs, "mock:/a/b"), false, null);
     List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
@@ -838,14 +844,14 @@ public class TestInputOutputFormat {
     assertEquals(5, numSplits);
 
     context = new OrcInputFormat.Context(conf);
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs0 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/part-00", 1000, new byte[1000], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-01", 1000, new byte[1000], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-02", 1000, new byte[1000], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-03", 1000, new byte[1000], new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new MockBlock("host1", "host2")));
-    gen = new OrcInputFormat.FileGenerator(context, fs,
-        new MockPath(fs, "mock:/a/b"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs0,
+        new MockPath(fs0, "mock:/a/b"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -854,7 +860,7 @@ public class TestInputOutputFormat {
     assertEquals(5, numSplits);
 
     context = new OrcInputFormat.Context(conf);
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs1 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/part-00", 1000, new byte[1100], new MockBlock("host1", "host2"),
             new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-01", 1000, new byte[1100], new MockBlock("host1", "host2"),
@@ -865,8 +871,8 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-04", 1000, new byte[1100], new MockBlock("host1", "host2"),
             new MockBlock("host1", "host2")));
-    gen = new OrcInputFormat.FileGenerator(context, fs,
-        new MockPath(fs, "mock:/a/b"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs1,
+        new MockPath(fs1, "mock:/a/b"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -875,7 +881,7 @@ public class TestInputOutputFormat {
     assertEquals(10, numSplits);
 
     context = new OrcInputFormat.Context(conf);
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs2 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/part-00", 1000, new byte[2000], new MockBlock("host1", "host2"),
             new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-01", 1000, new byte[2000], new MockBlock("host1", "host2"),
@@ -886,8 +892,8 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-04", 1000, new byte[2000], new MockBlock("host1", "host2"),
             new MockBlock("host1", "host2")));
-    gen = new OrcInputFormat.FileGenerator(context, fs,
-        new MockPath(fs, "mock:/a/b"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs2,
+        new MockPath(fs2, "mock:/a/b"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -896,7 +902,7 @@ public class TestInputOutputFormat {
     assertEquals(10, numSplits);
 
     context = new OrcInputFormat.Context(conf);
-    fs = new MockFileSystem(conf,
+    final MockFileSystem fs3 = new MockFileSystem(conf,
         new MockFile("mock:/a/b/part-00", 1000, new byte[2200], new MockBlock("host1", "host2"),
             new MockBlock("host1", "host2"), new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-01", 1000, new byte[2200], new MockBlock("host1", "host2"),
@@ -907,8 +913,8 @@ public class TestInputOutputFormat {
             new MockBlock("host1", "host2"), new MockBlock("host1", "host2")),
         new MockFile("mock:/a/b/part-04", 1000, new byte[2200], new MockBlock("host1", "host2"),
             new MockBlock("host1", "host2"), new MockBlock("host1", "host2")));
-    gen = new OrcInputFormat.FileGenerator(context, fs,
-        new MockPath(fs, "mock:/a/b"), false, null);
+    gen = new OrcInputFormat.FileGenerator(context, () -> fs3,
+        new MockPath(fs3, "mock:/a/b"), false, null);
     splitStrategies = createSplitStrategies(context, gen);
     assertEquals(1, splitStrategies.size());
     assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -1009,9 +1015,9 @@ public class TestInputOutputFormat {
   }
 
   public OrcInputFormat.AcidDirInfo createAdi(
-      OrcInputFormat.Context context, MockFileSystem fs, String path) throws IOException {
+      OrcInputFormat.Context context, final MockFileSystem fs, String path) throws IOException {
     return new OrcInputFormat.FileGenerator(
-        context, fs, new MockPath(fs, path), false, null).call();
+        context, () -> fs, new MockPath(fs, path), false, null).call();
   }
 
   private List<OrcInputFormat.SplitStrategy<?>> createSplitStrategies(
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index a8f18d1..6fe47d5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -1086,7 +1086,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
         AcidUtils.AcidOperationalProperties.getDefault().toInt());
     OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
     OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
-        context, fs, root, false, null);
+        context, () -> fs, root, false, null);
     OrcInputFormat.AcidDirInfo adi = gen.call();
     return OrcInputFormat.determineSplitStrategies(
         null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,