You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2020/02/24 22:08:13 UTC
[hive] branch master updated: HIVE-22825 : Reduce directory lookup
cost for acid tables (Rajesh Balamohan via Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0767c5d HIVE-22825 : Reduce directory lookup cost for acid tables (Rajesh Balamohan via Ashutosh Chauhan)
0767c5d is described below
commit 0767c5d549b2618f3449fc14a0214dc34a5c8a42
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Mon Feb 24 14:07:28 2020 -0800
HIVE-22825 : Reduce directory lookup cost for acid tables (Rajesh Balamohan via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 3 +
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 168 +++++++++++++++++++++
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 75 +++++----
.../hive/ql/io/orc/TestInputOutputFormat.java | 74 ++++-----
.../orc/TestVectorizedOrcAcidRowBatchReader.java | 2 +-
5 files changed, 246 insertions(+), 76 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 583603c..e419dc5 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2689,6 +2689,9 @@ public class HiveConf extends Configuration {
"are not hidden by the INSERT OVERWRITE."),
HIVE_TXN_STATS_ENABLED("hive.txn.stats.enabled", true,
"Whether Hive supports transactional stats (accurate stats for transactional tables)"),
+ HIVE_TXN_ACID_DIR_CACHE_DURATION("hive.txn.acid.dir.cache.duration",
+ 120, "Enable dir cache for ACID tables specified in minutes."
+ + "0 indicates cache is disabled. "),
HIVE_TXN_READONLY_ENABLED("hive.txn.readonly.enabled", false,
"Enables read-only transaction classification and related optimizations"),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 5d57509..76ea6c9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -35,11 +35,16 @@ import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -164,6 +169,9 @@ public class AcidUtils {
public static final Pattern LEGACY_BUCKET_DIGIT_PATTERN = Pattern.compile("^[0-9]{6}");
public static final Pattern BUCKET_PATTERN = Pattern.compile("bucket_([0-9]+)(_[0-9]+)?$");
+ private static Cache<String, DirInfoValue> dirCache;
+ private static AtomicBoolean dirCacheInited = new AtomicBoolean();
+
/**
* A write into a non-aicd table produces files like 0000_0 or 0000_0_copy_1
* (Unless via Load Data statement)
@@ -548,6 +556,7 @@ public class AcidUtils {
private final List<Path> obsolete;
private final List<ParsedDelta> deltas;
private final Path base;
+ private List<HdfsFileStatusWithId> baseFiles;
public DirectoryImpl(List<Path> abortedDirectories,
boolean isBaseInRawFormat, List<HdfsFileStatusWithId> original,
@@ -566,6 +575,14 @@ public class AcidUtils {
return base;
}
+ public List<HdfsFileStatusWithId> getBaseFiles() {
+ return baseFiles;
+ }
+
+ void setBaseFiles(List<HdfsFileStatusWithId> baseFiles) {
+ this.baseFiles = baseFiles;
+ }
+
@Override
public boolean isBaseInRawFormat() {
return isBaseInRawFormat;
@@ -845,6 +862,9 @@ public class AcidUtils {
* @return the base directory to read
*/
Path getBaseDirectory();
+
+ List<HdfsFileStatusWithId> getBaseFiles();
+
boolean isBaseInRawFormat();
/**
@@ -3105,4 +3125,152 @@ public class AcidUtils {
astSearcher.simpleBreadthFirstSearch(tree, pattern) != null)) ?
TxnType.READ_ONLY : TxnType.DEFAULT;
}
+
+ public static List<HdfsFileStatusWithId> findBaseFiles(
+ Path base, Ref<Boolean> useFileIds, Supplier<FileSystem> fs) throws IOException {
+ Boolean val = useFileIds.value;
+ if (val == null || val) {
+ try {
+ List<HdfsFileStatusWithId> result = SHIMS.listLocatedHdfsStatus(
+ fs.get(), base, AcidUtils.hiddenFileFilter);
+ if (val == null) {
+ useFileIds.value = true; // The call succeeded, so presumably the API is there.
+ }
+ return result;
+ } catch (Throwable t) {
+ LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
+ if (val == null && t instanceof UnsupportedOperationException) {
+ useFileIds.value = false;
+ }
+ }
+ }
+
+ // Fall back to regular API and create states without ID.
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs.get(), base, AcidUtils.hiddenFileFilter);
+ List<HdfsFileStatusWithId> result = new ArrayList<>(children.size());
+ for (FileStatus child : children) {
+ result.add(AcidUtils.createOriginalObj(null, child));
+ }
+ return result;
+ }
+
+ private static void initDirCache(int durationInMts) {
+ if (dirCacheInited.get()) {
+ LOG.debug("DirCache got initialized already");
+ return;
+ }
+ dirCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(durationInMts, TimeUnit.MINUTES)
+ .softValues()
+ .build();
+ dirCacheInited.set(true);
+ }
+
+ /**
+ * Tries to get directory details from cache. For now, cache is valid only
+ * when base directory is available and no deltas are present. This should
+ * be used only in BI strategy and for ACID tables.
+ *
+ * @param fileSystem file system supplier
+ * @param candidateDirectory the partition directory to analyze
+ * @param conf the configuration
+ * @param writeIdList the list of write ids that we are reading
+ * @param useFileIds
+ * @param ignoreEmptyFiles
+ * @param tblproperties
+ * @param generateDirSnapshots
+ * @return directory state
+ * @throws IOException on errors
+ */
+ public static Directory getAcidStateFromCache(Supplier<FileSystem> fileSystem,
+ Path candidateDirectory, Configuration conf,
+ ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
+ Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
+
+ int dirCacheDuration = HiveConf.getIntVar(conf,
+ ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION);
+
+ if (dirCacheDuration <= 0) {
+ LOG.debug("dirCache is not enabled");
+ return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList,
+ useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots);
+ } else {
+ initDirCache(dirCacheDuration);
+ }
+
+ /*
+ * Cache for single case, where base directory is there without deltas.
+ * In case of changes, cache would get invalidated based on
+ * open/aborted list
+ */
+ //dbName + tableName + dir
+ String key = writeIdList.getTableName() + "_" + candidateDirectory.toString();
+ DirInfoValue value = dirCache.getIfPresent(key);
+
+ // in case of open/aborted txns, recompute dirInfo
+ long[] exceptions = writeIdList.getInvalidWriteIds();
+ boolean recompute = (exceptions != null && exceptions.length > 0);
+
+ if (recompute) {
+ LOG.info("invalidating cache entry for key: {}", key);
+ dirCache.invalidate(key);
+ value = null;
+ }
+
+ if (value != null) {
+ // double check writeIds
+ if (!value.getTxnString().equalsIgnoreCase(writeIdList.writeToString())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.info("writeIdList: {} from cache: {} is not matching "
+ + "for key: {}", writeIdList.writeToString(),
+ value.getTxnString(), key);
+ }
+ recompute = true;
+ }
+ }
+
+ // compute and add to cache
+ if (recompute || (value == null)) {
+ Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
+ writeIdList, useFileIds, ignoreEmptyFiles, tblproperties,
+ generateDirSnapshots);
+ value = new DirInfoValue(writeIdList.writeToString(), dirInfo);
+
+ if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null
+ && value.dirInfo.getCurrentDirectories().isEmpty()) {
+ populateBaseFiles(dirInfo, useFileIds, fileSystem);
+ dirCache.put(key, value);
+ }
+ } else {
+ LOG.info("Got {} from cache, cache size: {}", key, dirCache.size());
+ }
+ return value.getDirInfo();
+ }
+
+ private static void populateBaseFiles(Directory dirInfo,
+ Ref<Boolean> useFileIds, Supplier<FileSystem> fileSystem) throws IOException {
+ if (dirInfo.getBaseDirectory() != null) {
+ // Cache base directory contents
+ List<HdfsFileStatusWithId> children = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fileSystem);
+ ((DirectoryImpl) dirInfo).setBaseFiles(children);
+ }
+ }
+
+ static class DirInfoValue {
+ private String txnString;
+ private AcidUtils.Directory dirInfo;
+
+ DirInfoValue(String txnString, AcidUtils.Directory dirInfo) {
+ this.txnString = txnString;
+ this.dirInfo = dirInfo;
+ }
+
+ String getTxnString() {
+ return txnString;
+ }
+
+ AcidUtils.Directory getDirInfo() {
+ return dirInfo;
+ }
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 03f7086..7c8f479 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -44,6 +44,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
@@ -1210,7 +1211,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
*/
static final class FileGenerator implements Callable<AcidDirInfo> {
private final Context context;
- private final FileSystem fs;
+ private final Supplier<FileSystem> fs;
/**
* For plain or acid tables this is the root of the partition (or table if not partitioned).
* For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that
@@ -1222,12 +1223,12 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private final UserGroupInformation ugi;
@VisibleForTesting
- FileGenerator(Context context, FileSystem fs, Path dir, boolean useFileIds,
+ FileGenerator(Context context, Supplier<FileSystem> fs, Path dir, boolean useFileIds,
UserGroupInformation ugi) {
this(context, fs, dir, Ref.from(useFileIds), ugi);
}
- FileGenerator(Context context, FileSystem fs, Path dir, Ref<Boolean> useFileIds,
+ FileGenerator(Context context, Supplier<FileSystem> fs, Path dir, Ref<Boolean> useFileIds,
UserGroupInformation ugi) {
this.context = context;
this.fs = fs;
@@ -1253,6 +1254,17 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
+ private Directory getAcidState() throws IOException {
+ if (context.isAcid && context.splitStrategyKind == SplitStrategyKind.BI) {
+ return AcidUtils.getAcidStateFromCache(fs, dir, context.conf,
+ context.writeIdList, useFileIds, true, null, true);
+ } else {
+ return AcidUtils.getAcidState(fs.get(), dir, context.conf, context.writeIdList,
+ useFileIds, true, null, true);
+ }
+ }
+
+
private AcidDirInfo callInternal() throws IOException {
if (context.acidOperationalProperties != null
&& context.acidOperationalProperties.isInsertOnly()) {
@@ -1265,16 +1277,15 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
context.conf.getBoolean("mapred.input.dir.recursive", false));
List<HdfsFileStatusWithId> originals = new ArrayList<>();
List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
- AcidUtils.findOriginals(fs, dir, originals, useFileIds, true, isRecursive);
+ AcidUtils.findOriginals(fs.get(), dir, originals, useFileIds, true, isRecursive);
for (HdfsFileStatusWithId fileId : originals) {
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
}
- return new AcidDirInfo(fs, dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals,
+ return new AcidDirInfo(fs.get(), dir, new AcidUtils.DirectoryImpl(Lists.newArrayList(), true, originals,
Lists.newArrayList(), Lists.newArrayList(), null), baseFiles, new ArrayList<>());
}
//todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
- AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
- fs, dir, context.conf, context.writeIdList, useFileIds, true, null, true);
+ AcidUtils.Directory dirInfo = getAcidState();
// find the base files (original or new style)
List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
if (dirInfo.getBaseDirectory() == null) {
@@ -1283,7 +1294,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
baseFiles.add(new AcidBaseFileInfo(fileId, AcidUtils.AcidBaseFileType.ORIGINAL_BASE));
}
} else {
- List<HdfsFileStatusWithId> compactedBaseFiles = findBaseFiles(dirInfo.getBaseDirectory(), useFileIds);
+ List<HdfsFileStatusWithId> compactedBaseFiles = dirInfo.getBaseFiles();
+ if (compactedBaseFiles == null) {
+ compactedBaseFiles = AcidUtils.findBaseFiles(dirInfo.getBaseDirectory(), useFileIds, fs);
+ }
for (HdfsFileStatusWithId fileId : compactedBaseFiles) {
baseFiles.add(new AcidBaseFileInfo(fileId, dirInfo.isBaseInRawFormat() ?
AcidUtils.AcidBaseFileType.ORIGINAL_BASE : AcidUtils.AcidBaseFileType.ACID_SCHEMA));
@@ -1324,7 +1338,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
if (val == null || val) {
try {
List<HdfsFileStatusWithId> insertDeltaFiles =
- SHIMS.listLocatedHdfsStatus(fs, parsedDelta.getPath(), bucketFilter);
+ SHIMS.listLocatedHdfsStatus(fs.get(), parsedDelta.getPath(), bucketFilter);
for (HdfsFileStatusWithId fileId : insertDeltaFiles) {
baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
}
@@ -1340,7 +1354,8 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
}
// Fall back to regular API and create statuses without ID.
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, parsedDelta.getPath(), bucketFilter);
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs.get(),
+ parsedDelta.getPath(), bucketFilter);
for (FileStatus child : children) {
HdfsFileStatusWithId fileId = AcidUtils.createOriginalObj(null, child);
baseFiles.add(new AcidBaseFileInfo(fileId, deltaType));
@@ -1359,35 +1374,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
// should be considered as usual.
parsedDeltas.addAll(dirInfo.getCurrentDirectories());
}
- return new AcidDirInfo(fs, dir, dirInfo, baseFiles, parsedDeltas);
- }
-
- private List<HdfsFileStatusWithId> findBaseFiles(
- Path base, Ref<Boolean> useFileIds) throws IOException {
- Boolean val = useFileIds.value;
- if (val == null || val) {
- try {
- List<HdfsFileStatusWithId> result = SHIMS.listLocatedHdfsStatus(
- fs, base, AcidUtils.hiddenFileFilter);
- if (val == null) {
- useFileIds.value = true; // The call succeeded, so presumably the API is there.
- }
- return result;
- } catch (Throwable t) {
- LOG.error("Failed to get files with ID; using regular API: " + t.getMessage());
- if (val == null && t instanceof UnsupportedOperationException) {
- useFileIds.value = false;
- }
- }
- }
-
- // Fall back to regular API and create states without ID.
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, base, AcidUtils.hiddenFileFilter);
- List<HdfsFileStatusWithId> result = new ArrayList<>(children.size());
- for (FileStatus child : children) {
- result.add(AcidUtils.createOriginalObj(null, child));
- }
- return result;
+ return new AcidDirInfo(fs.get(), dir, dirInfo, baseFiles, parsedDeltas);
}
}
@@ -1837,8 +1824,14 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
Path[] paths = getInputPaths(conf);
CompletionService<AcidDirInfo> ecs = new ExecutorCompletionService<>(Context.threadPool);
for (Path dir : paths) {
- FileSystem fs = dir.getFileSystem(conf);
- FileGenerator fileGenerator = new FileGenerator(context, fs, dir, useFileIds, ugi);
+ Supplier<FileSystem> fsSupplier = () -> {
+ try {
+ return dir.getFileSystem(conf);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ FileGenerator fileGenerator = new FileGenerator(context, fsSupplier, dir, useFileIds, ugi);
pathFutures.add(ecs.submit(fileGenerator));
}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index 5f5ea4e..3884a10 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -526,7 +526,7 @@ public class TestInputOutputFormat {
final OrcInputFormat.Context context = new OrcInputFormat.Context(
conf, n);
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
- context, fs, new MockPath(fs, "mock:/a/b"), false, null);
+ context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null);
List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
final SplitStrategy splitStrategy = splitStrategies.get(0);
@@ -549,7 +549,7 @@ public class TestInputOutputFormat {
final OrcInputFormat.Context context = new OrcInputFormat.Context(
conf, n);
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
- context, fs, new MockPath(fs, "mock:/a/b"), false, null);
+ context, () -> fs, new MockPath(fs, "mock:/a/b"), false, null);
List<SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
final SplitStrategy splitStrategy = splitStrategies.get(0);
@@ -567,14 +567,14 @@ public class TestInputOutputFormat {
@Test
public void testFileGenerator() throws Exception {
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
- MockFileSystem fs = new MockFileSystem(conf,
+ final MockFileSystem fs = new MockFileSystem(conf,
new MockFile("mock:/a/b/part-00", 1000, new byte[1]),
new MockFile("mock:/a/b/part-01", 1000, new byte[1]),
new MockFile("mock:/a/b/_part-02", 1000, new byte[1]),
new MockFile("mock:/a/b/.part-03", 1000, new byte[1]),
new MockFile("mock:/a/b/part-04", 1000, new byte[1]));
OrcInputFormat.FileGenerator gen =
- new OrcInputFormat.FileGenerator(context, fs,
+ new OrcInputFormat.FileGenerator(context, () -> fs,
new MockPath(fs, "mock:/a/b"), false, null);
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
@@ -582,13 +582,13 @@ public class TestInputOutputFormat {
conf.set("mapreduce.input.fileinputformat.split.maxsize", "500");
context = new OrcInputFormat.Context(conf);
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs1 = new MockFileSystem(conf,
new MockFile("mock:/a/b/part-00", 1000, new byte[1000]),
new MockFile("mock:/a/b/part-01", 1000, new byte[1000]),
new MockFile("mock:/a/b/_part-02", 1000, new byte[1000]),
new MockFile("mock:/a/b/.part-03", 1000, new byte[1000]),
new MockFile("mock:/a/b/part-04", 1000, new byte[1000]));
- gen = new OrcInputFormat.FileGenerator(context, fs,
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs1,
new MockPath(fs, "mock:/a/b"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
@@ -608,7 +608,7 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/delta_001_002/bucket_000000", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delta_001_002/bucket_000001", 1000, new byte[1], new MockBlock("host1")));
OrcInputFormat.FileGenerator gen =
- new OrcInputFormat.FileGenerator(context, fs,
+ new OrcInputFormat.FileGenerator(context, () -> fs,
new MockPath(fs, "mock:/a"), false, null);
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -630,11 +630,11 @@ public class TestInputOutputFormat {
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
// Case 1: Test with just originals => Single split strategy with two splits.
- MockFileSystem fs = new MockFileSystem(conf,
+ final MockFileSystem fs = new MockFileSystem(conf,
new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")));
OrcInputFormat.FileGenerator gen =
- new OrcInputFormat.FileGenerator(context, fs,
+ new OrcInputFormat.FileGenerator(context, () -> fs,
new MockPath(fs, "mock:/a"), false, null);
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
@@ -648,12 +648,13 @@ public class TestInputOutputFormat {
// Case 2: Test with originals and base => Single split strategy with two splits on compacted
// base since the presence of a base will make the originals obsolete.
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs1 = new MockFileSystem(conf,
new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
- gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs1, new MockPath(fs1,
+ "mock:/a"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -665,14 +666,15 @@ public class TestInputOutputFormat {
assertFalse(splits.get(1).isOriginal());
// Case 3: Test with originals and deltas => Two split strategies with two splits for each.
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs3 = new MockFileSystem(conf,
new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
- gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs3,
+ new MockPath(fs3, "mock:/a"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(2, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -697,12 +699,13 @@ public class TestInputOutputFormat {
// The reason why we are able to do so is because the valid user data has already been considered
// as base for the covered buckets. Hence, the uncovered buckets do not have any relevant
// data and we can just ignore them.
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs4 = new MockFileSystem(conf,
new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delete_delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
- gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs4, new MockPath(fs4,
+ "mock:/a"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(2, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -718,7 +721,7 @@ public class TestInputOutputFormat {
// Case 5: Test with originals, compacted_base, insert_deltas, delete_deltas (exhaustive test)
// This should just generate one strategy with splits for base and insert_deltas.
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs5 = new MockFileSystem(conf,
new MockFile("mock:/a/b/000000_0", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/b/000000_1", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
@@ -727,7 +730,8 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")),
new MockFile("mock:/a/delete_delta_0000002_0000002_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
- gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs5, new MockPath(fs5,
+ "mock:/a"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.ACIDSplitStrategy);
@@ -789,7 +793,9 @@ public class TestInputOutputFormat {
conf.set("fs.mock.impl", MockFileSystem.class.getName());
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
- OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(context, fs, new MockPath(fs, "mock:/a"),
+ OrcInputFormat.FileGenerator gen =
+ new OrcInputFormat.FileGenerator(context, () -> fs, new MockPath(fs,
+ "mock:/a"),
false, null);
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
@@ -821,14 +827,14 @@ public class TestInputOutputFormat {
public void testBIStrategySplitBlockBoundary() throws Exception {
conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI");
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
- MockFileSystem fs = new MockFileSystem(conf,
+ final MockFileSystem fs = new MockFileSystem(conf,
new MockFile("mock:/a/b/part-00", 1000, new byte[1], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-01", 1000, new byte[1], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-02", 1000, new byte[1], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-03", 1000, new byte[1], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-04", 1000, new byte[1], new MockBlock("host1", "host2")));
OrcInputFormat.FileGenerator gen =
- new OrcInputFormat.FileGenerator(context, fs,
+ new OrcInputFormat.FileGenerator(context, () -> fs,
new MockPath(fs, "mock:/a/b"), false, null);
List<OrcInputFormat.SplitStrategy<?>> splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
@@ -838,14 +844,14 @@ public class TestInputOutputFormat {
assertEquals(5, numSplits);
context = new OrcInputFormat.Context(conf);
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs0 = new MockFileSystem(conf,
new MockFile("mock:/a/b/part-00", 1000, new byte[1000], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-01", 1000, new byte[1000], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-02", 1000, new byte[1000], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-03", 1000, new byte[1000], new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-04", 1000, new byte[1000], new MockBlock("host1", "host2")));
- gen = new OrcInputFormat.FileGenerator(context, fs,
- new MockPath(fs, "mock:/a/b"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs0,
+ new MockPath(fs0, "mock:/a/b"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -854,7 +860,7 @@ public class TestInputOutputFormat {
assertEquals(5, numSplits);
context = new OrcInputFormat.Context(conf);
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs1 = new MockFileSystem(conf,
new MockFile("mock:/a/b/part-00", 1000, new byte[1100], new MockBlock("host1", "host2"),
new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-01", 1000, new byte[1100], new MockBlock("host1", "host2"),
@@ -865,8 +871,8 @@ public class TestInputOutputFormat {
new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-04", 1000, new byte[1100], new MockBlock("host1", "host2"),
new MockBlock("host1", "host2")));
- gen = new OrcInputFormat.FileGenerator(context, fs,
- new MockPath(fs, "mock:/a/b"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs1,
+ new MockPath(fs1, "mock:/a/b"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -875,7 +881,7 @@ public class TestInputOutputFormat {
assertEquals(10, numSplits);
context = new OrcInputFormat.Context(conf);
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs2 = new MockFileSystem(conf,
new MockFile("mock:/a/b/part-00", 1000, new byte[2000], new MockBlock("host1", "host2"),
new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-01", 1000, new byte[2000], new MockBlock("host1", "host2"),
@@ -886,8 +892,8 @@ public class TestInputOutputFormat {
new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-04", 1000, new byte[2000], new MockBlock("host1", "host2"),
new MockBlock("host1", "host2")));
- gen = new OrcInputFormat.FileGenerator(context, fs,
- new MockPath(fs, "mock:/a/b"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs2,
+ new MockPath(fs2, "mock:/a/b"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -896,7 +902,7 @@ public class TestInputOutputFormat {
assertEquals(10, numSplits);
context = new OrcInputFormat.Context(conf);
- fs = new MockFileSystem(conf,
+ final MockFileSystem fs3 = new MockFileSystem(conf,
new MockFile("mock:/a/b/part-00", 1000, new byte[2200], new MockBlock("host1", "host2"),
new MockBlock("host1", "host2"), new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-01", 1000, new byte[2200], new MockBlock("host1", "host2"),
@@ -907,8 +913,8 @@ public class TestInputOutputFormat {
new MockBlock("host1", "host2"), new MockBlock("host1", "host2")),
new MockFile("mock:/a/b/part-04", 1000, new byte[2200], new MockBlock("host1", "host2"),
new MockBlock("host1", "host2"), new MockBlock("host1", "host2")));
- gen = new OrcInputFormat.FileGenerator(context, fs,
- new MockPath(fs, "mock:/a/b"), false, null);
+ gen = new OrcInputFormat.FileGenerator(context, () -> fs3,
+ new MockPath(fs3, "mock:/a/b"), false, null);
splitStrategies = createSplitStrategies(context, gen);
assertEquals(1, splitStrategies.size());
assertEquals(true, splitStrategies.get(0) instanceof OrcInputFormat.BISplitStrategy);
@@ -1009,9 +1015,9 @@ public class TestInputOutputFormat {
}
public OrcInputFormat.AcidDirInfo createAdi(
- OrcInputFormat.Context context, MockFileSystem fs, String path) throws IOException {
+ OrcInputFormat.Context context, final MockFileSystem fs, String path) throws IOException {
return new OrcInputFormat.FileGenerator(
- context, fs, new MockPath(fs, path), false, null).call();
+ context, () -> fs, new MockPath(fs, path), false, null).call();
}
private List<OrcInputFormat.SplitStrategy<?>> createSplitStrategies(
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
index a8f18d1..6fe47d5 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java
@@ -1086,7 +1086,7 @@ public class TestVectorizedOrcAcidRowBatchReader {
AcidUtils.AcidOperationalProperties.getDefault().toInt());
OrcInputFormat.Context context = new OrcInputFormat.Context(conf);
OrcInputFormat.FileGenerator gen = new OrcInputFormat.FileGenerator(
- context, fs, root, false, null);
+ context, () -> fs, root, false, null);
OrcInputFormat.AcidDirInfo adi = gen.call();
return OrcInputFormat.determineSplitStrategies(
null, context, adi.fs, adi.splitPath, adi.baseFiles, adi.deleteEvents,