You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/07/24 02:56:00 UTC
[hive] branch master updated: HIVE-21225: ACID: getAcidState()
should cache a recursive dir listing locally (Vaibhav Gumashta reviewed by
Vineet Garg)
This is an automated email from the ASF dual-hosted git repository.
vgumashta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 959ebeb HIVE-21225: ACID: getAcidState() should cache a recursive dir listing locally (Vaibhav Gumashta reviewed by Vineet Garg)
959ebeb is described below
commit 959ebeb680b07d59f4f55939862ebbc2d7f16a92
Author: Vaibhav Gumashta <vg...@apache.org>
AuthorDate: Tue Jul 23 19:55:30 2019 -0700
HIVE-21225: ACID: getAcidState() should cache a recursive dir listing locally (Vaibhav Gumashta reviewed by Vineet Garg)
---
.../hive/hcatalog/streaming/TestStreaming.java | 12 +-
.../hcatalog/streaming/mutate/StreamingAssert.java | 2 +-
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 486 ++++++++++++++++++---
.../org/apache/hadoop/hive/ql/io/HdfsUtils.java | 3 +-
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 6 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 4 +-
.../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 10 +-
.../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 5 +-
.../hadoop/hive/ql/session/SessionState.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 4 +-
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 17 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 3 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 1 +
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 111 ++---
.../hive/ql/io/orc/TestInputOutputFormat.java | 50 ++-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 5 +-
.../org/apache/hive/streaming/TestStreaming.java | 10 +-
17 files changed, 550 insertions(+), 181 deletions(-)
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
index 4dc04f4..bc67d03 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/TestStreaming.java
@@ -564,7 +564,7 @@ public class TestStreaming {
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidWriteIdList writeIds = getTransactionContext(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -617,7 +617,8 @@ public class TestStreaming {
*/
private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
String validationQuery, boolean vectorize, String... records) throws Exception {
- AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null,
+ false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -667,7 +668,8 @@ public class TestStreaming {
return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
}
private void checkNothingWritten(Path partitionPath) throws Exception {
- AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null,
+ false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1250,7 +1252,7 @@ public class TestStreaming {
/*now both batches have committed (but not closed) so we for each primary file we expect a side
file to exist and indicate the true length of primary file*/
FileSystem fs = partLoc.getFileSystem(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1275,7 +1277,7 @@ public class TestStreaming {
//so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0
//has now received more data(logically - it's buffered) but it is not yet committed.
//lets check that side files exist, etc
- dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+ dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
for(AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for(FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
index 78cae72..86f762e 100644
--- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
+++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/StreamingAssert.java
@@ -96,7 +96,7 @@ public class StreamingAssert {
writeIds = TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
partitionLocation = getPartitionLocation();
- dir = AcidUtils.getAcidState(partitionLocation, conf, writeIds);
+ dir = AcidUtils.getAcidState(null, partitionLocation, conf, writeIds, null, false, null, true);
assertEquals(0, dir.getObsolete().size());
assertEquals(0, dir.getOriginalFiles().size());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 295fe7c..f207bf2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io;
import static org.apache.hadoop.hive.ql.exec.Utilities.COPY_KEYWORD;
+import static org.apache.hadoop.hive.ql.exec.AbstractFileMergeOperator.UNION_SUDBIR_PREFIX;
import java.io.IOException;
import java.io.Serializable;
@@ -27,6 +28,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -40,8 +42,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.hive.common.*;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -183,6 +187,39 @@ public class AcidUtils {
return !name.startsWith("_") && !name.startsWith(".");
}
};
+
+ public static final PathFilter acidHiddenFileFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path p) {
+ String name = p.getName();
+ // Don't filter out MetaDataFile.METADATA_FILE
+ if (name.startsWith(MetaDataFile.METADATA_FILE)) {
+ return true;
+ }
+ // Don't filter out OrcAcidVersion.ACID_FORMAT
+ if (name.startsWith(OrcAcidVersion.ACID_FORMAT)) {
+ return true;
+ }
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ public static final PathFilter acidTempDirFilter = new PathFilter() {
+ @Override
+ public boolean accept(Path dirPath) {
+ String dirPathStr = dirPath.toString();
+ // We don't want to filter out temp tables
+ if (dirPathStr.contains(SessionState.TMP_PREFIX)) {
+ return true;
+ }
+ if ((dirPathStr.contains("/.")) || (dirPathStr.contains("/_"))) {
+ return false;
+ } else {
+ return true;
+ }
+ }
+ };
+
public static final String VISIBILITY_PREFIX = "_v";
public static final Pattern VISIBILITY_PATTERN = Pattern.compile(VISIBILITY_PREFIX + "\\d+");
@@ -422,7 +459,7 @@ public class AcidUtils {
.writingBase(true);
} else if (bucketFile.getParent().getName().startsWith(DELTA_PREFIX)) {
ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELTA_PREFIX,
- bucketFile.getFileSystem(conf));
+ bucketFile.getFileSystem(conf), null);
result
.setOldStyle(false)
.minimumWriteId(parsedDelta.minWriteId)
@@ -430,7 +467,7 @@ public class AcidUtils {
.bucket(bucket);
} else if (bucketFile.getParent().getName().startsWith(DELETE_DELTA_PREFIX)) {
ParsedDelta parsedDelta = parsedDelta(bucketFile.getParent(), DELETE_DELTA_PREFIX,
- bucketFile.getFileSystem(conf));
+ bucketFile.getFileSystem(conf), null);
result
.setOldStyle(false)
.minimumWriteId(parsedDelta.minWriteId)
@@ -493,6 +530,12 @@ public class AcidUtils {
public List<Path> getAbortedDirectories() {
return abortedDirectories;
}
+
+ @Override
+ public String toString() {
+ return "Aborted Directories: " + abortedDirectories + "; isBaseInRawFormat: " + isBaseInRawFormat + "; original: "
+ + original + "; obsolete: " + obsolete + "; deltas: " + deltas + "; base: " + base;
+ }
}
//This is used for (full) Acid tables. InsertOnly use NOT_ACID
@@ -1029,26 +1072,26 @@ public class AcidUtils {
public static ParsedDelta parsedDelta(Path deltaDir, FileSystem fs) throws IOException {
String deltaDirName = deltaDir.getName();
if (deltaDirName.startsWith(DELETE_DELTA_PREFIX)) {
- return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs);
+ return parsedDelta(deltaDir, DELETE_DELTA_PREFIX, fs, null);
}
- return parsedDelta(deltaDir, DELTA_PREFIX, fs); // default prefix is delta_prefix
+ return parsedDelta(deltaDir, DELTA_PREFIX, fs, null); // default prefix is delta_prefix
}
- private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs)
+ private static ParsedDelta parseDelta(Path path, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot)
throws IOException {
- ParsedDelta p = parsedDelta(path, deltaPrefix, fs);
+ ParsedDelta p = parsedDelta(path, deltaPrefix, fs, dirSnapshot);
boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
return new ParsedDelta(p.getMinWriteId(),
p.getMaxWriteId(), path, p.statementId, isDeleteDelta, p.isRawFormat(), p.visibilityTxnId);
}
- public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs)
+ public static ParsedDelta parsedDelta(Path deltaDir, String deltaPrefix, FileSystem fs, HdfsDirSnapshot dirSnapshot)
throws IOException {
String filename = deltaDir.getName();
boolean isDeleteDelta = deltaPrefix.equals(DELETE_DELTA_PREFIX);
if (filename.startsWith(deltaPrefix)) {
//small optimization - delete delta can't be in raw format
- boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs);
+ boolean isRawFormat = !isDeleteDelta && MetaDataFile.isRawFormat(deltaDir, fs, dirSnapshot);
return parsedDelta(deltaDir, isRawFormat);
}
throw new IllegalArgumentException(deltaDir + " does not start with " +
@@ -1117,19 +1160,13 @@ public class AcidUtils {
return false;
}
- @VisibleForTesting
- public static Directory getAcidState(Path directory,
- Configuration conf,
- ValidWriteIdList writeIdList) throws IOException {
- return getAcidState(directory, conf, writeIdList, false, false);
- }
-
/** State class for getChildState; cannot modify 2 things in a method. */
private static class TxnBase {
private FileStatus status;
private long writeId = 0;
private long oldestBaseWriteId = Long.MAX_VALUE;
private Path oldestBase = null;
+ private HdfsDirSnapshot dirSnapShot;
}
/**
@@ -1143,33 +1180,10 @@ public class AcidUtils {
* @return the state of the directory
* @throws IOException
*/
- public static Directory getAcidState(Path directory, Configuration conf,
- ValidWriteIdList writeIdList,
- boolean useFileIds,
- boolean ignoreEmptyFiles) throws IOException {
- return getAcidState(directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null);
- }
-
- public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf,
- ValidWriteIdList writeIdList,
- boolean useFileIds,
- boolean ignoreEmptyFiles) throws IOException {
- return getAcidState(fileSystem, directory, conf, writeIdList, Ref.from(useFileIds), ignoreEmptyFiles, null);
- }
-
- public static Directory getAcidState(Path directory, Configuration conf,
- ValidWriteIdList writeIdList,
- Ref<Boolean> useFileIds,
- boolean ignoreEmptyFiles,
- Map<String, String> tblproperties) throws IOException {
- return getAcidState(null, directory, conf, writeIdList, useFileIds, ignoreEmptyFiles, tblproperties);
- }
-
- public static Directory getAcidState(FileSystem fileSystem, Path directory, Configuration conf,
- ValidWriteIdList writeIdList,
- Ref<Boolean> useFileIds,
- boolean ignoreEmptyFiles,
- Map<String, String> tblproperties) throws IOException {
+ @VisibleForTesting
+ public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
+ ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
+ Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
ValidTxnList validTxnList = null;
String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
if(!Strings.isNullOrEmpty(s)) {
@@ -1187,30 +1201,36 @@ public class AcidUtils {
validTxnList.readFromString(s);
}
- FileSystem fs = fileSystem == null ? directory.getFileSystem(conf) : fileSystem;
+ FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem;
// The following 'deltas' includes all kinds of delta files including insert & delete deltas.
final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
List<ParsedDelta> working = new ArrayList<ParsedDelta>();
List<Path> originalDirectories = new ArrayList<>();
final List<Path> obsolete = new ArrayList<>();
final List<Path> abortedDirectories = new ArrayList<>();
- List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, directory);
+ List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory);
TxnBase bestBase = new TxnBase();
final List<HdfsFileStatusWithId> original = new ArrayList<>();
+ List<HdfsDirSnapshot> dirSnapshots = null;
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
- getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original,
- obsolete, bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+ getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete,
+ bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
}
} else {
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, directory, hiddenFileFilter);
- for (FileStatus child : children) {
- getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete,
+ if (generateDirSnapshots) {
+ dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory);
+ getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete,
bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+ } else {
+ List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, candidateDirectory, hiddenFileFilter);
+ for (FileStatus child : children) {
+ getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase,
+ ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+ }
}
}
-
// If we have a base, the original files are obsolete.
if (bestBase.status != null) {
// Add original files to obsolete list if any
@@ -1224,13 +1244,16 @@ public class AcidUtils {
original.clear();
originalDirectories.clear();
} else {
- // Okay, we're going to need these originals. Recurse through them and figure out what we
- // really need.
- for (Path origDir : originalDirectories) {
- findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true);
+ // Okay, we're going to need these originals.
+ // Recurse through them and figure out what we really need.
+ // If we already have the original list, do nothing
+ // If dirSnapshots != null, we would have already populated "original"
+ if (dirSnapshots == null) {
+ for (Path origDir : originalDirectories) {
+ findOriginals(fs, origDir, original, useFileIds, ignoreEmptyFiles, true);
+ }
}
}
-
Collections.sort(working);
//so now, 'working' should be sorted like delta_5_20 delta_5_10 delta_11_20 delta_51_60 for example
//and we want to end up with the best set containing all relevant data: delta_5_20 delta_51_60,
@@ -1299,8 +1322,20 @@ public class AcidUtils {
minOpenWriteId, bestBase.oldestBase.toString()));
}
- final Path base = bestBase.status == null ? null : bestBase.status.getPath();
- LOG.debug("in directory " + directory.toUri().toString() + " base = " + base + " deltas = " +
+ Path base = null;
+ boolean isBaseInRawFormat = false;
+ if (bestBase.status != null) {
+ base = bestBase.status.getPath();
+ isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, null);
+ if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) {
+ for (FileStatus stat : bestBase.dirSnapShot.getFiles()) {
+ if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
+ original.add(createOriginalObj(null, stat));
+ }
+ }
+ }
+ }
+ LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " +
deltas.size());
/**
* If this sort order is changed and there are tables that have been converted to transactional
@@ -1313,12 +1348,228 @@ public class AcidUtils {
//this does "Path.uri.compareTo(that.uri)"
return o1.getFileStatus().compareTo(o2.getFileStatus());
});
-
- // Note: isRawFormat is invalid for non-ORC tables. It will always return true, so we're good.
- final boolean isBaseInRawFormat = base != null && MetaDataFile.isRawFormat(base, fs);
return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original,
obsolete, deltas, base);
}
+
+ public static List<HdfsDirSnapshot> getHdfsDirSnapshots(final FileSystem fs, final Path path) throws IOException {
+ try {
+ Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<Path, HdfsDirSnapshot>();
+ RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
+ while (itr.hasNext()) {
+ FileStatus fStatus = itr.next();
+ Path fPath = fStatus.getPath();
+ if (acidHiddenFileFilter.accept(fPath)) {
+ if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
+ HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+ if (dirSnapshot == null) {
+ dirSnapshot = new HdfsDirSnapshotImpl(fPath, fStatus);
+ dirToSnapshots.put(fPath, dirSnapshot);
+ }
+ } else {
+ Path parentDirPath = fPath.getParent();
+ if (acidTempDirFilter.accept(parentDirPath)) {
+ FileStatus parentDirFStatus = fs.getFileStatus(parentDirPath);
+ HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
+ if (dirSnapshot == null) {
+ dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus);
+ dirToSnapshots.put(parentDirPath, dirSnapshot);
+ }
+ // We're not filtering out the metadata file and acid format file, as they represent parts of a valid snapshot
+ // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task
+ if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) {
+ dirSnapshot.addMetadataFile(fStatus);
+ } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) {
+ dirSnapshot.addOrcAcidFormatFile(fStatus);
+ } else {
+ dirSnapshot.addFile(fStatus);
+ }
+ }
+ }
+ }
+ }
+ return new ArrayList<HdfsDirSnapshot>(dirToSnapshots.values());
+ } catch (IOException e) {
+ e.printStackTrace();
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * DFS dir listing.
+ * Captures a dir and the corresponding list of files it contains,
+ * with additional properties about the dir (like isBase etc)
+ *
+ */
+ public static interface HdfsDirSnapshot {
+ public Path getPath();
+
+ public void addOrcAcidFormatFile(FileStatus fStatus);
+
+ public FileStatus getOrcAcidFormatFile();
+
+ public void addMetadataFile(FileStatus fStatus);
+
+ public FileStatus getMetadataFile(FileStatus fStatus);
+
+ // FileStatus of this HDFS directory
+ public FileStatus getFileStatus();
+
+ // Get the list of files if any within this directory
+ public List<FileStatus> getFiles();
+
+ public void setFileStatus(FileStatus fStatus);
+
+ public void addFile(FileStatus file);
+
+ // File id or null
+ public Long getFileId();
+
+ public Boolean isRawFormat();
+
+ public void setIsRawFormat(boolean isRawFormat);
+
+ public Boolean isBase();
+
+ public void setIsBase(boolean isBase);
+
+ public Boolean isValidBase();
+
+ public void setIsValidBase(boolean isValidBase);
+
+ public Boolean isCompactedBase();
+
+ public void setIsCompactedBase(boolean isCompactedBase);
+ }
+
+ public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot {
+ private Path dirPath;
+ private FileStatus fStatus;
+ private FileStatus metadataFStatus = null;
+ private FileStatus orcAcidFormatFStatus = null;
+ private List<FileStatus> files = new ArrayList<FileStatus>();
+ private Long fileId = null;
+ private Boolean isRawFormat = null;
+ private Boolean isBase = null;
+ private Boolean isValidBase = null;
+ private Boolean isCompactedBase = null;
+
+ public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List<FileStatus> files) {
+ this.dirPath = path;
+ this.fStatus = fStatus;
+ this.files = files;
+ }
+
+ public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) {
+ this.dirPath = path;
+ this.fStatus = fStatus;
+ }
+
+ @Override
+ public Path getPath() {
+ return dirPath;
+ }
+
+ @Override
+ public FileStatus getFileStatus() {
+ return fStatus;
+ }
+
+ @Override
+ public void setFileStatus(FileStatus fStatus) {
+ this.fStatus = fStatus;
+ }
+
+ @Override
+ public List<FileStatus> getFiles() {
+ return files;
+ }
+
+ @Override
+ public void addFile(FileStatus file) {
+ files.add(file);
+ }
+
+ @Override
+ public Long getFileId() {
+ return fileId;
+ }
+
+ @Override
+ public Boolean isRawFormat() {
+ return isRawFormat;
+ }
+
+ @Override
+ public void setIsRawFormat(boolean isRawFormat) {
+ this.isRawFormat = isRawFormat;
+ }
+
+ @Override
+ public Boolean isBase() {
+ return isBase;
+ }
+
+ @Override
+ public Boolean isValidBase() {
+ return isValidBase;
+ }
+
+ @Override
+ public Boolean isCompactedBase() {
+ return isCompactedBase;
+ }
+
+ @Override
+ public void setIsBase(boolean isBase) {
+ this.isBase = isBase;
+ }
+
+ @Override
+ public void setIsValidBase(boolean isValidBase) {
+ this.isValidBase = isValidBase;
+ }
+
+ @Override
+ public void setIsCompactedBase(boolean isCompactedBase) {
+ this.isCompactedBase = isCompactedBase;
+ }
+
+ @Override
+ public void addOrcAcidFormatFile(FileStatus fStatus) {
+ this.orcAcidFormatFStatus = fStatus;
+ }
+
+ @Override
+ public FileStatus getOrcAcidFormatFile() {
+ return orcAcidFormatFStatus;
+ }
+
+ @Override
+ public void addMetadataFile(FileStatus fStatus) {
+ this.metadataFStatus = fStatus;
+ }
+
+ @Override
+ public FileStatus getMetadataFile(FileStatus fStatus) {
+ return metadataFStatus;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Path: " + dirPath);
+ sb.append("; ");
+ sb.append("Files: { ");
+ for (FileStatus fstatus : files) {
+ sb.append(fstatus);
+ sb.append(", ");
+ }
+ sb.append(" }");
+ return sb.toString();
+ }
+ }
+
/**
* We can only use a 'base' if it doesn't have an open txn (from specific reader's point of view)
* A 'base' with open txn in its range doesn't have 'enough history' info to produce a correct
@@ -1342,6 +1593,19 @@ public class AcidUtils {
//if here, it's a result of IOW
return writeIdList.isWriteIdValid(parsedBase.getWriteId());
}
+
+ private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parsedBase, ValidWriteIdList writeIdList,
+ FileSystem fs) throws IOException {
+ boolean isValidBase;
+ if (dirSnapshot.isValidBase() != null) {
+ isValidBase = dirSnapshot.isValidBase();
+ } else {
+ isValidBase = isValidBase(parsedBase, writeIdList, fs);
+ dirSnapshot.setIsValidBase(isValidBase);
+ }
+ return isValidBase;
+ }
+
/**
* Returns {@code true} if {@code parsedBase} was created by compaction.
* As of Hive 4.0 we can tell if a directory is a result of compaction based on the
@@ -1349,10 +1613,10 @@ public class AcidUtils {
* that, have to rely on the {@link MetaDataFile} in the directory. So look at the filename first
* since that is the cheaper test.*/
private static boolean isCompactedBase(ParsedBase parsedBase, FileSystem fs) throws IOException {
- return parsedBase.getVisibilityTxnId() > 0 ||
- MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs);
+ return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs);
}
+
private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase,
@@ -1393,7 +1657,7 @@ public class AcidUtils {
}
} else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) {
String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
- ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs);
+ ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null);
if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) {
return;
}
@@ -1413,6 +1677,75 @@ public class AcidUtils {
originalDirectories.add(child.getPath());
}
}
+
+ private static void getChildState(Path candidateDirectory, List<HdfsDirSnapshot> dirSnapshots, ValidWriteIdList writeIdList,
+ List<ParsedDelta> working, List<Path> originalDirectories, List<HdfsFileStatusWithId> original,
+ List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<Path> aborted,
+ Map<String, String> tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException {
+ for (HdfsDirSnapshot dirSnapshot : dirSnapshots) {
+ FileStatus fStat = dirSnapshot.getFileStatus();
+ Path dirPath = dirSnapshot.getPath();
+ String dirName = dirPath.getName();
+ if (dirName.startsWith(BASE_PREFIX)) {
+ bestBase.dirSnapShot = dirSnapshot;
+ ParsedBase parsedBase = ParsedBase.parseBase(dirPath);
+ if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
+ continue;
+ }
+ final long writeId = parsedBase.getWriteId();
+ if (bestBase.oldestBaseWriteId > writeId) {
+ //keep track for error reporting
+ bestBase.oldestBase = dirPath;
+ bestBase.oldestBaseWriteId = writeId;
+ }
+ if (bestBase.status == null) {
+ if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
+ bestBase.status = fStat;
+ bestBase.writeId = writeId;
+ }
+ } else if (bestBase.writeId < writeId) {
+ if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
+ obsolete.add(bestBase.status.getPath());
+ bestBase.status = fStat;
+ bestBase.writeId = writeId;
+ }
+ } else {
+ obsolete.add(dirPath);
+ }
+ } else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) {
+ String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
+ ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs, dirSnapshot);
+ if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) {
+ continue;
+ }
+ if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId,
+ delta.maxWriteId)) {
+ aborted.add(dirPath);
+ } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId,
+ delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) {
+ if (delta.isRawFormat) {
+ for (FileStatus stat : dirSnapshot.getFiles()) {
+ if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
+ original.add(createOriginalObj(null, stat));
+ }
+ }
+ } else {
+ working.add(delta);
+ }
+ }
+ } else {
+ if (!candidateDirectory.equals(dirPath)) {
+ originalDirectories.add(dirPath);
+ }
+ for (FileStatus stat : dirSnapshot.getFiles()) {
+ if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
+ original.add(createOriginalObj(null, stat));
+ }
+ }
+ }
+ }
+ }
+
/**
* checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
*/
@@ -1494,10 +1827,13 @@ public class AcidUtils {
}
}
- private static List<HdfsFileStatusWithId> tryListLocatedHdfsStatus(Ref<Boolean> useFileIds,
- FileSystem fs, Path directory) {
- Boolean val = useFileIds.value;
+ private static List<HdfsFileStatusWithId> tryListLocatedHdfsStatus(Ref<Boolean> useFileIds, FileSystem fs,
+ Path directory) {
List<HdfsFileStatusWithId> childrenWithId = null;
+ if (useFileIds == null) {
+ return childrenWithId;
+ }
+ Boolean val = useFileIds.value;
if (val == null || val) {
try {
childrenWithId = SHIMS.listLocatedHdfsStatus(fs, directory, hiddenFileFilter);
@@ -2012,7 +2348,7 @@ public class AcidUtils {
*/
public static class MetaDataFile {
//export command uses _metadata....
- private static final String METADATA_FILE = "_metadata_acid";
+ public static final String METADATA_FILE = "_metadata_acid";
private static final String CURRENT_VERSION = "0";
//todo: enums? that have both field name and value list
private interface Field {
@@ -2077,8 +2413,9 @@ public class AcidUtils {
* This is only meaningful for full CRUD tables - Insert-only tables have all their data
* in raw format by definition.
* @param baseOrDeltaDir base or delta file.
+ * @param dirSnapshot
*/
- public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs) throws IOException {
+ public static boolean isRawFormat(Path baseOrDeltaDir, FileSystem fs, HdfsDirSnapshot dirSnapshot) throws IOException {
//todo: this could be optimized - for full CRUD table only base_x and delta_x_x could have
// files in raw format delta_x_y (x != y) whether from streaming ingested or compaction
// must be native Acid format by definition
@@ -2099,13 +2436,19 @@ public class AcidUtils {
}
}
//if here, have to check the files
- Path dataFile = chooseFile(baseOrDeltaDir, fs);
+ Path dataFile;
+ if ((dirSnapshot != null) && (dirSnapshot.getFiles() != null) && (dirSnapshot.getFiles().size() > 0)) {
+ dataFile = dirSnapshot.getFiles().get(0).getPath();
+ } else {
+ dataFile = chooseFile(baseOrDeltaDir, fs);
+ }
if (dataFile == null) {
//directory is empty or doesn't have any that could have been produced by load data
return false;
}
return isRawFormatFile(dataFile, fs);
}
+
public static boolean isRawFormatFile(Path dataFile, FileSystem fs) throws IOException {
try {
Reader reader = OrcFile.createReader(dataFile, OrcFile.readerOptions(fs.getConf()));
@@ -2227,7 +2570,7 @@ public class AcidUtils {
+ " from " + jc.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
return null;
}
- Directory acidInfo = AcidUtils.getAcidState(dir, jc, idList);
+ Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false, null, true);
// Assume that for an MM table, or if there's only the base directory, we are good.
if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) {
Utilities.FILE_OP_LOGGER.warn(
@@ -2264,7 +2607,8 @@ public class AcidUtils {
// If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList.
ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr);
- Directory acidInfo = AcidUtils.getAcidState(dataPath, conf, validWriteIdList);
+ Directory acidInfo = AcidUtils.getAcidState(dataPath.getFileSystem(conf), dataPath, conf, validWriteIdList, null,
+ false, null, false);
for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
pathList.add(hfs.getFileStatus().getPath());
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
index 9d5ba3d..4de5c8c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HdfsUtils.java
@@ -26,8 +26,9 @@ import java.net.URI;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
-
+import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index cff7e04..7e71c77 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -568,12 +568,12 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
boolean allowOriginals = HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_ALLOW_ORIGINALS);
for (Path dir : dirs) {
- processForWriteIds(
+ processForWriteIdsForMmRead(
dir, conf, validWriteIdList, allowOriginals, finalPaths, pathsWithFileOriginals);
}
}
- private static void processForWriteIds(Path dir, Configuration conf,
+ private static void processForWriteIdsForMmRead(Path dir, Configuration conf,
ValidWriteIdList validWriteIdList, boolean allowOriginals, List<Path> finalPaths,
List<Path> pathsWithFileOriginals) throws IOException {
FileSystem fs = dir.getFileSystem(conf);
@@ -605,7 +605,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
if (hasAcidDirs) {
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
- fs, dir, conf, validWriteIdList, Ref.from(false), true, null);
+ fs, dir, conf, validWriteIdList, Ref.from(false), true, null, false);
// Find the base, created for IOW.
Path base = dirInfo.getBaseDirectory();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 707e38c..ec4994d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1273,7 +1273,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
}
//todo: shouldn't ignoreEmptyFiles be set based on ExecutionEngine?
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
- fs, dir, context.conf, context.writeIdList, useFileIds, true, null);
+ fs, dir, context.conf, context.writeIdList, useFileIds, true, null, false);
// find the base files (original or new style)
List<AcidBaseFileInfo> baseFiles = new ArrayList<>();
if (dirInfo.getBaseDirectory() == null) {
@@ -2463,7 +2463,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
//it may also be null if there is no base - only deltas
mergerOptions.baseDir(baseDirectory);
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
- isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf));
+ isOriginal = AcidUtils.MetaDataFile.isRawFormat(baseDirectory, baseDirectory.getFileSystem(conf), null);
mergerOptions.rootPath(baseDirectory.getParent());
} else {
isOriginal = true;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index b1ede05..2ac6232 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hive.common.util.Ref;
import com.google.common.annotations.VisibleForTesting;
@@ -462,9 +463,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
* contents to be in {@link org.apache.hadoop.hive.ql.io.AcidUtils.Directory#getOriginalFiles()}
*/
//the split is from something other than the 1st file of the logical bucket - compute offset
- AcidUtils.Directory directoryState
- = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false,
- true);
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
+ validWriteIdList, Ref.from(false), true, null, true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
if (bucketIdFromPath != bucketId) {
@@ -577,8 +577,8 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
//when compacting each split needs to process the whole logical bucket
assert options.getOffset() == 0;
assert options.getMaxOffset() == Long.MAX_VALUE;
- AcidUtils.Directory directoryState
- = AcidUtils.getAcidState(mergerOptions.getRootPath(), conf, validWriteIdList, false, true);
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
+ validWriteIdList, Ref.from(false), true, null, true);
/**
* Note that for reading base_x/ or delta_x_x/ with non-acid schema,
* {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 15f1f94..374b105 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hive.shims.HadoopShims;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hive.common.util.Ref;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.OrcConf;
@@ -722,8 +723,8 @@ public class VectorizedOrcAcidRowBatchReader
int bucketProperty = BucketCodec.V1.encode(new AcidOutputFormat.Options(conf)
//statementId is from directory name (or 0 if there is none)
.statementId(syntheticTxnInfo.statementId).bucket(bucketId));
- AcidUtils.Directory directoryState = AcidUtils.getAcidState(syntheticTxnInfo.folder, conf,
- validWriteIdList, false, true);
+ AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf,
+ validWriteIdList, Ref.from(false), true, null, true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
if (bucketIdFromPath != bucketId) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index 9d631ed..4632361 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -123,7 +123,7 @@ import com.google.common.collect.Maps;
public class SessionState {
private static final Logger LOG = LoggerFactory.getLogger(SessionState.class);
- private static final String TMP_PREFIX = "_tmp_space.db";
+ public static final String TMP_PREFIX = "_tmp_space.db";
private static final String LOCAL_SESSION_PATH_KEY = "_hive.local.session.path";
private static final String HDFS_SESSION_PATH_KEY = "_hive.hdfs.session.path";
private static final String TMP_TABLE_SPACE_KEY = "_hive.tmp_table_space";
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 57eb506..5dfa7ca 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
@@ -225,7 +226,8 @@ public class Cleaner extends MetaStoreCompactorThread {
private void removeFiles(String location, ValidWriteIdList writeIdList, CompactionInfo ci)
throws IOException, NoSuchObjectException {
Path locPath = new Path(location);
- AcidUtils.Directory dir = AcidUtils.getAcidState(locPath, conf, writeIdList);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from(
+ false), false, null, false);
List<Path> obsoleteDirs = dir.getObsolete();
/**
* add anything in 'dir' that only has data from aborted transactions - no one should be
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 67a5e6d..7c79d7b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -267,8 +267,8 @@ public class CompactorMR {
// and discovering that in getSplits is too late as we then have no way to pass it to our
// mapper.
- AcidUtils.Directory dir = AcidUtils.getAcidState(
- new Path(sd.getLocation()), conf, writeIds, false, true);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), true,
+ null, false);
if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
return;
@@ -298,7 +298,7 @@ public class CompactorMR {
maxDeltasToHandle, -1, conf, msc, ci.id, jobName);
}
//now recompute state since we've done minor compactions and have different 'best' set of deltas
- dir = AcidUtils.getAcidState(new Path(sd.getLocation()), conf, writeIds);
+ dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, null, false);
}
StringableList dirsToSearch = new StringableList();
@@ -341,9 +341,8 @@ public class CompactorMR {
private void runCrudCompaction(HiveConf hiveConf, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds,
CompactionInfo ci) throws IOException {
AcidUtils.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(t.getParameters()));
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()), hiveConf, writeIds,
- Ref.from(false), false,
- t.getParameters());
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), hiveConf, writeIds, Ref.from(
+ false), false, t.getParameters(), false);
if (!isEnoughToCompact(dir, sd)) {
return;
@@ -446,8 +445,8 @@ public class CompactorMR {
StorageDescriptor sd, ValidWriteIdList writeIds, CompactionInfo ci) throws IOException {
LOG.debug("Going to delete directories for aborted transactions for MM table "
+ t.getDbName() + "." + t.getTableName());
- AcidUtils.Directory dir = AcidUtils.getAcidState(new Path(sd.getLocation()),
- conf, writeIds, Ref.from(false), false, t.getParameters());
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false),
+ false, t.getParameters(), false);
removeFilesForMmTable(conf, dir);
// Then, actually do the compaction.
@@ -1036,7 +1035,7 @@ public class CompactorMR {
dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
boolean sawBase = dir.getName().startsWith(AcidUtils.BASE_PREFIX);
boolean isRawFormat = !dir.getName().startsWith(AcidUtils.DELETE_DELTA_PREFIX)
- && AcidUtils.MetaDataFile.isRawFormat(dir, fs);//deltes can't be raw format
+ && AcidUtils.MetaDataFile.isRawFormat(dir, fs, null);//deltes can't be raw format
FileStatus[] files = fs.listStatus(dir, isRawFormat ? AcidUtils.originalBucketFilter
: AcidUtils.bucketFileFilter);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 6168fc0..5fb2552 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -291,7 +292,7 @@ public class Initiator extends MetaStoreCompactorThread {
boolean noBase = false;
Path location = new Path(sd.getLocation());
FileSystem fs = location.getFileSystem(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, false, false);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false, tblproperties, false);
Path base = dir.getBaseDirectory();
long baseSize = 0;
FileStatus stat = null;
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index d4abf42..dddf063 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -394,6 +394,7 @@ public class TestTxnCommands2 {
//run Compaction
runStatementOnDriver("alter table "+ TestTxnCommands2.Table.NONACIDORCTBL +" compact 'major'");
TestTxnCommands2.runWorker(hiveConf);
+ //TestTxnCommands2.runCleaner(hiveConf);
rs = runStatementOnDriver("select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.NONACIDORCTBL + " order by a,b");
LOG.warn("after compact");
for(String s : rs) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index ea31557..e4f0529 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -18,14 +18,16 @@
package org.apache.hadoop.hive.ql.io;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import java.util.BitSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
@@ -161,9 +163,8 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/random", 500, new byte[0]),
new MockFile("mock:/tbl/part1/_done", 0, new byte[0]),
new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(new MockPath(fs, "/tbl/part1"), conf,
- new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "/tbl/part1"), conf,
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
assertEquals(null, dir.getBaseDirectory());
assertEquals(0, dir.getCurrentDirectories().size());
assertEquals(0, dir.getObsolete().size());
@@ -198,9 +199,8 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_101_101/bucket_0", 0, new byte[0]));
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(new MockPath(fs,
- "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
assertEquals(null, dir.getBaseDirectory());
List<Path> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -242,17 +242,20 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delta_90_120/bucket_0", 0, new byte[0]));
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(new MockPath(fs,
- "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
- List<Path> obsolete = dir.getObsolete();
- assertEquals(5, obsolete.size());
- assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString());
- assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString());
- assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(2).toString());
- assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(3).toString());
- assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(4).toString());
+ List<Path> obsoletes = dir.getObsolete();
+ assertEquals(5, obsoletes.size());
+ Set<String> obsoletePathNames = new HashSet<String>();
+ for (Path obsolete : obsoletes) {
+ obsoletePathNames.add(obsolete.toString());
+ }
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029"));
assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
assertEquals(1, deltas.size());
@@ -273,12 +276,12 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:150:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:150:"
+ + Long.MAX_VALUE + ":"), null, false, null, false);
// Obsolete list should include the two original bucket files, and the old base dir
- List<Path> obsolete = dir.getObsolete();
- assertEquals(3, obsolete.size());
- assertEquals("mock:/tbl/part1/base_5", obsolete.get(0).toString());
+ List<Path> obsoletes = dir.getObsolete();
+ assertEquals(3, obsoletes.size());
+ assertEquals("mock:/tbl/part1/base_5", obsoletes.get(0).toString());
assertEquals("mock:/tbl/part1/base_10", dir.getBaseDirectory().toString());
}
@@ -296,8 +299,8 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "mock:/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+ + Long.MAX_VALUE + ":"), null, false, null, false);
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<Path> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -333,8 +336,8 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "mock:/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir
- = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+ + Long.MAX_VALUE + ":"), null, false, null, false);
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<Path> obsolete = dir.getObsolete();
assertEquals(5, obsolete.size());
@@ -362,7 +365,8 @@ public class TestAcidUtils {
//hypothetically, txn 50 is open and writing write ID 4
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
- AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
+ false, null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -386,7 +390,8 @@ public class TestAcidUtils {
//hypothetically, txn 50 is open and writing write ID 4
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
- AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
+ false, null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -402,8 +407,8 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "mock:/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:4:" + Long.MAX_VALUE));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
+ + Long.MAX_VALUE), null, false, null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(1, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -421,8 +426,8 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "mock:/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:3:" + Long.MAX_VALUE));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:3:"
+ + Long.MAX_VALUE), null, false, null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(1, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -447,19 +452,22 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/delete_delta_110_110/bucket_0", 0, new byte[0]));
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(new MockPath(fs,
- "mock:/tbl/part1"), conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
- List<Path> obsolete = dir.getObsolete();
- assertEquals(7, obsolete.size());
- assertEquals("mock:/tbl/part1/base_10", obsolete.get(0).toString());
- assertEquals("mock:/tbl/part1/base_5", obsolete.get(1).toString());
- assertEquals("mock:/tbl/part1/delete_delta_025_030", obsolete.get(2).toString());
- assertEquals("mock:/tbl/part1/delta_025_030", obsolete.get(3).toString());
- assertEquals("mock:/tbl/part1/delta_025_025", obsolete.get(4).toString());
- assertEquals("mock:/tbl/part1/delete_delta_029_029", obsolete.get(5).toString());
- assertEquals("mock:/tbl/part1/delta_029_029", obsolete.get(6).toString());
+ List<Path> obsoletes = dir.getObsolete();
+ assertEquals(7, obsoletes.size());
+ Set<String> obsoletePathNames = new HashSet<String>();
+ for (Path obsolete : obsoletes) {
+ obsoletePathNames.add(obsolete.toString());
+ }
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_5"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/base_10"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_025_030"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_030"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_025_025"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delete_delta_029_029"));
+ assertTrue(obsoletePathNames.contains("mock:/tbl/part1/delta_029_029"));
assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
assertEquals(2, deltas.size());
@@ -487,8 +495,8 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "mock:/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+ + Long.MAX_VALUE + ":"), null, false, null, false);
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<Path> obsolete = dir.getObsolete();
assertEquals(3, obsolete.size());
@@ -518,8 +526,8 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "mock:/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
+ + Long.MAX_VALUE + ":"), null, false, null, false);
List<Path> obsolete = dir.getObsolete();
assertEquals(1, obsolete.size());
assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString());
@@ -547,8 +555,8 @@ public class TestAcidUtils {
Path part = new MockPath(fs, "mock:/tbl/part1");
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir =
- AcidUtils.getAcidState(part, conf, new ValidCompactorWriteIdList("tbl:4:" + Long.MAX_VALUE + ":"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
+ + Long.MAX_VALUE + ":"), null, false, null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -572,7 +580,8 @@ public class TestAcidUtils {
//hypothetically, txn 50 is open and writing write ID 4
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
- AcidUtils.Directory dir = AcidUtils.getAcidState(part, conf, new ValidReaderWriteIdList("tbl:100:4:4"));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
+ false, null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(3, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index b5958fa..97a18d0 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -748,13 +748,13 @@ public class TestInputOutputFormat {
try {
MockFileSystem fs = new MockFileSystem(conf);
MockFileSystem.addGlobalFile(
- new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
+ new MockFile("mock:/a/base_0000001/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
MockFileSystem.addGlobalFile(
- new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+ new MockFile("mock:/a/base_0000001/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
MockFileSystem.addGlobalFile(
- new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
+ new MockFile("mock:/a/base_0000001/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
MockFileSystem.addGlobalFile(
- new MockFile("mock:/a/delta_0000001_0000001_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
+ new MockFile("mock:/a/base_0000001/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
MockFileSystem.addGlobalFile(
new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
MockFileSystem.addGlobalFile(
@@ -763,6 +763,14 @@ public class TestInputOutputFormat {
new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
MockFileSystem.addGlobalFile(
new MockFile("mock:/a/delta_0000002_0000002_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
+ MockFileSystem.addGlobalFile(
+ new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00000", 1000, new byte[1], new MockBlock("host1")));
+ MockFileSystem.addGlobalFile(
+ new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00001", 1000, new byte[1], new MockBlock("host1")));
+ MockFileSystem.addGlobalFile(
+ new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00002", 1000, new byte[1], new MockBlock("host1")));
+ MockFileSystem.addGlobalFile(
+ new MockFile("mock:/a/delta_0000003_0000003_0000/bucket_00003", 1000, new byte[1], new MockBlock("host1")));
conf.set("bucket_count", "4");
//set up props for read
@@ -803,7 +811,7 @@ public class TestInputOutputFormat {
System.out.println("STATS TRACE END - " + testCaseName.getMethodName());
int delta = readsAfter - readsBefore;
//HIVE-16812 adds 1 read of the footer of each file (only if delete delta exists)
- assertEquals(8, delta);
+ assertEquals(12, delta);
} finally {
MockFileSystem.clearGlobalFiles();
}
@@ -2812,7 +2820,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktable
+ // call-1: getAcidState - mock:/mocktable
// call-2: open - mock:/mocktable/0_0
// call-3: open - mock:/mocktable/0_1
assertEquals(3, readOpsDelta);
@@ -2870,7 +2878,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl
+ // call-1: getAcidState - mock:/mocktbl
// call-2: open - mock:/mocktbl/0_0
// call-3: open - mock:/mocktbl/0_1
assertEquals(3, readOpsDelta);
@@ -2890,7 +2898,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl
+ // call-1: getAcidState - mock:/mocktbl
assertEquals(1, readOpsDelta);
// enable cache and use default strategy
@@ -2909,7 +2917,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl
+ // call-1: getAcidState - mock:/mocktbl
// call-2: open - mock:/mocktbl/0_0
// call-3: open - mock:/mocktbl/0_1
assertEquals(3, readOpsDelta);
@@ -2927,7 +2935,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl
+ // call-1: getAcidState - mock:/mocktbl
assertEquals(1, readOpsDelta);
// revert back to local fs
@@ -2981,7 +2989,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktable
+ // call-1: getAcidState - mock:/mocktable
// call-2: open - mock:/mocktbl1/0_0
// call-3: open - mock:/mocktbl1/0_1
assertEquals(3, readOpsDelta);
@@ -3020,7 +3028,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktable
+ // call-1: getAcidState - mock:/mocktable
// call-2: open - mock:/mocktbl1/0_0
// call-3: open - mock:/mocktbl1/0_1
assertEquals(3, readOpsDelta);
@@ -3038,7 +3046,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl1
+ // call-1: getAcidState - mock:/mocktbl1
assertEquals(1, readOpsDelta);
// revert back to local fs
@@ -3093,7 +3101,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl2
+ // call-1: getAcidState - mock:/mocktbl2
// call-2: open - mock:/mocktbl2/0_0
// call-3: open - mock:/mocktbl2/0_1
assertEquals(3, readOpsDelta);
@@ -3115,7 +3123,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl2
+ // call-1: getAcidState - mock:/mocktbl2
// call-2: open - mock:/mocktbl2/0_1
assertEquals(2, readOpsDelta);
@@ -3136,7 +3144,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl2
+ // call-1: getAcidState - mock:/mocktbl2
// call-2: open - mock:/mocktbl2/0_0
assertEquals(2, readOpsDelta);
@@ -3153,7 +3161,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- // call-1: listLocatedStatus - mock:/mocktbl2
+ // call-1: getAcidState - mock:/mocktbl2
assertEquals(1, readOpsDelta);
// revert back to local fs
@@ -3525,7 +3533,7 @@ public class TestInputOutputFormat {
// call-6: getAcidState - split 2 => mock:/mocktable5 (to compute offset for original read)
// call-7: open to read footer - split 2 => mock:/mocktable5/0_0 (to get row count)
// call-8: file status - split 2 => mock:/mocktable5/0_0
- assertEquals(8, readOpsDelta);
+ assertEquals(12, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3604,7 +3612,7 @@ public class TestInputOutputFormat {
// call-4: AcidUtils.getAcidState - split 2 => ls mock:/mocktable6
// call-5: read footer - split 2 => mock:/mocktable6/0_0 (to get offset since it's original file)
// call-6: file stat - split 2 => mock:/mocktable6/0_0
- assertEquals(6, readOpsDelta);
+ assertEquals(10, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3679,7 +3687,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- assertEquals(7, readOpsDelta);
+ assertEquals(12, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3753,7 +3761,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- assertEquals(5, readOpsDelta);
+ assertEquals(10, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 8451462..d4fd773 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -595,7 +595,7 @@ public class TestOrcRawRecordMerger {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testEmpty:200:" + Long.MAX_VALUE);
- AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, writeIdList);
+ AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false, null, false);
Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
BUCKET);
@@ -666,7 +666,8 @@ public class TestOrcRawRecordMerger {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testNewBaseAndDelta:200:" + Long.MAX_VALUE);
- AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, writeIdList);
+ AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format, null,
+ use130Format);
assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
assertEquals(new Path(root, use130Format ?
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index c6d7e7f..dbff263 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -945,7 +945,7 @@ public class TestStreaming {
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidWriteIdList writeIds = getTransactionContext(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, writeIds);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -999,7 +999,7 @@ public class TestStreaming {
*/
private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
String validationQuery, boolean vectorize, String... records) throws Exception {
- AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1050,7 +1050,7 @@ public class TestStreaming {
return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
}
private void checkNothingWritten(Path partitionPath) throws Exception {
- AcidUtils.Directory dir = AcidUtils.getAcidState(partitionPath, conf, getTransactionContext(conf));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1947,7 +1947,7 @@ public class TestStreaming {
/*now both batches have committed (but not closed) so we for each primary file we expect a side
file to exist and indicate the true length of primary file*/
FileSystem fs = partLoc.getFileSystem(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -1972,7 +1972,7 @@ public class TestStreaming {
//so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0
//has now received more data(logically - it's buffered) but it is not yet committed.
//lets check that side files exist, etc
- dir = AcidUtils.getAcidState(partLoc, conf, getTransactionContext(conf));
+ dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());