You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by kl...@apache.org on 2020/06/15 09:07:43 UTC
[hive] branch master updated: HIVE-23495: AcidUtils.getAcidState
cleanup (Peter Varga, reviewed by Karen Coppage and Marta Kuczora)
This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 50f8765 HIVE-23495: AcidUtils.getAcidState cleanup (Peter Varga, reviewed by Karen Coppage and Marta Kuczora)
50f8765 is described below
commit 50f8765658f2733e43c232c9506770b62a980be2
Author: Peter Varga <pv...@cloudera.com>
AuthorDate: Mon Jun 15 11:07:31 2020 +0200
HIVE-23495: AcidUtils.getAcidState cleanup (Peter Varga, reviewed by Karen Coppage and Marta Kuczora)
Closes #1097
---
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 421 +++++++++------------
.../apache/hadoop/hive/ql/io/HiveInputFormat.java | 2 +-
.../hadoop/hive/ql/io/orc/OrcInputFormat.java | 9 +-
.../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 4 +-
.../ql/io/orc/VectorizedOrcAcidRowBatchReader.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +-
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 6 +-
.../hive/ql/txn/compactor/MinorQueryCompactor.java | 3 +-
.../ql/txn/compactor/MmMajorQueryCompactor.java | 3 +-
.../ql/txn/compactor/MmMinorQueryCompactor.java | 6 +-
.../hadoop/hive/ql/txn/compactor/Worker.java | 2 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 6 +-
.../apache/hadoop/hive/ql/io/TestAcidUtils.java | 49 +--
.../hive/ql/io/orc/TestInputOutputFormat.java | 4 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 5 +-
.../org/apache/hive/streaming/TestStreaming.java | 12 +-
17 files changed, 231 insertions(+), 307 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 4ee8b9d..cd9238a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -29,6 +29,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -1243,11 +1244,10 @@ public class AcidUtils {
/** State class for getChildState; cannot modify 2 things in a method. */
private static class TxnBase {
- private FileStatus status;
+ private Path basePath;
private long writeId = 0;
private long oldestBaseWriteId = Long.MAX_VALUE;
private Path oldestBase = null;
- private HdfsDirSnapshot dirSnapShot;
}
/**
@@ -1255,20 +1255,22 @@ public class AcidUtils {
* base and diff directories. Note that because major compactions don't
* preserve the history, we can't use a base directory that includes a
* write id that we must exclude.
- * @param directory the partition directory to analyze
+ * @param fileSystem optional, it it is not provided, it will be derived from the candidateDirectory
+ * @param candidateDirectory the partition directory to analyze
* @param conf the configuration
* @param writeIdList the list of write ids that we are reading
+ * @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds
+ * @param ignoreEmptyFiles Ignore files with 0 length
* @return the state of the directory
- * @throws IOException
+ * @throws IOException on filesystem errors
*/
@VisibleForTesting
public static Directory getAcidState(FileSystem fileSystem, Path candidateDirectory, Configuration conf,
- ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
- Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
+ ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
ValidTxnList validTxnList = null;
String s = conf.get(ValidTxnList.VALID_TXNS_KEY);
if(!Strings.isNullOrEmpty(s)) {
- /**
+ /*
* getAcidState() is sometimes called on non-transactional tables, e.g.
* OrcInputFileFormat.FileGenerator.callInternal(). e.g. orc_merge3.q In that case
* writeIdList is bogus - doesn't even have a table name.
@@ -1284,36 +1286,29 @@ public class AcidUtils {
FileSystem fs = fileSystem == null ? candidateDirectory.getFileSystem(conf) : fileSystem;
// The following 'deltas' includes all kinds of delta files including insert & delete deltas.
- final List<ParsedDelta> deltas = new ArrayList<ParsedDelta>();
- List<ParsedDelta> working = new ArrayList<ParsedDelta>();
+ final List<ParsedDelta> deltas = new ArrayList<>();
+ List<ParsedDelta> working = new ArrayList<>();
List<Path> originalDirectories = new ArrayList<>();
final List<Path> obsolete = new ArrayList<>();
final List<Path> abortedDirectories = new ArrayList<>();
- List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory);
-
TxnBase bestBase = new TxnBase();
final List<HdfsFileStatusWithId> original = new ArrayList<>();
Map<Path, HdfsDirSnapshot> dirSnapshots = null;
+
+ List<HdfsFileStatusWithId> childrenWithId = tryListLocatedHdfsStatus(useFileIds, fs, candidateDirectory);
+
if (childrenWithId != null) {
for (HdfsFileStatusWithId child : childrenWithId) {
- getChildState(child.getFileStatus(), child, writeIdList, working, originalDirectories, original, obsolete,
- bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
+ getChildState(child, writeIdList, working, originalDirectories, original, obsolete,
+ bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList);
}
} else {
- if (generateDirSnapshots) {
- dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory);
- getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete,
- bestBase, ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
- } else {
- List<FileStatus> children = HdfsUtils.listLocatedStatus(fs, candidateDirectory, hiddenFileFilter);
- for (FileStatus child : children) {
- getChildState(child, null, writeIdList, working, originalDirectories, original, obsolete, bestBase,
- ignoreEmptyFiles, abortedDirectories, tblproperties, fs, validTxnList);
- }
- }
+ dirSnapshots = getHdfsDirSnapshots(fs, candidateDirectory);
+ getChildState(candidateDirectory, dirSnapshots, writeIdList, working, originalDirectories, original, obsolete,
+ bestBase, ignoreEmptyFiles, abortedDirectories, fs, validTxnList);
}
// If we have a base, the original files are obsolete.
- if (bestBase.status != null) {
+ if (bestBase.basePath != null) {
// Add original files to obsolete list if any
for (HdfsFileStatusWithId fswid : original) {
obsolete.add(fswid.getFileStatus().getPath());
@@ -1382,9 +1377,9 @@ public class AcidUtils {
}
}
- if(bestBase.oldestBase != null && bestBase.status == null &&
+ if(bestBase.oldestBase != null && bestBase.basePath == null &&
isCompactedBase(ParsedBase.parseBase(bestBase.oldestBase), fs, dirSnapshots)) {
- /**
+ /*
* If here, it means there was a base_x (> 1 perhaps) but none were suitable for given
* {@link writeIdList}. Note that 'original' files are logically a base_Long.MIN_VALUE and thus
* cannot have any data for an open txn. We could check {@link deltas} has files to cover
@@ -1405,78 +1400,80 @@ public class AcidUtils {
Path base = null;
boolean isBaseInRawFormat = false;
- if (bestBase.status != null) {
- base = bestBase.status.getPath();
+ if (bestBase.basePath != null) {
+ base = bestBase.basePath;
isBaseInRawFormat = MetaDataFile.isRawFormat(base, fs, dirSnapshots != null ? dirSnapshots.get(base) : null);
- if (isBaseInRawFormat && (bestBase.dirSnapShot != null)) {
- for (FileStatus stat : bestBase.dirSnapShot.getFiles()) {
- if ((!ignoreEmptyFiles) || (stat.getLen() != 0)) {
- original.add(createOriginalObj(null, stat));
- }
- }
- }
}
LOG.debug("in directory " + candidateDirectory.toUri().toString() + " base = " + base + " deltas = " +
deltas.size());
- /**
+ /*
* If this sort order is changed and there are tables that have been converted to transactional
* and have had any update/delete/merge operations performed but not yet MAJOR compacted, it
* may result in data loss since it may change how
* {@link org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair} assigns
* {@link RecordIdentifier#rowId} for read (that have happened) and compaction (yet to happen).
*/
- Collections.sort(original, (HdfsFileStatusWithId o1, HdfsFileStatusWithId o2) -> {
- //this does "Path.uri.compareTo(that.uri)"
- return o1.getFileStatus().compareTo(o2.getFileStatus());
- });
- return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original,
- obsolete, deltas, base);
- }
-
- public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshots(final FileSystem fs,
- final Path path) throws IOException {
- try {
- Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<Path, HdfsDirSnapshot>();
- RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
- while (itr.hasNext()) {
- FileStatus fStatus = itr.next();
- Path fPath = fStatus.getPath();
- if (acidHiddenFileFilter.accept(fPath)) {
- if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
- HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+ // this does "Path.uri.compareTo(that.uri)"
+ original.sort(Comparator.comparing(HdfsFileStatusWithId::getFileStatus));
+ return new DirectoryImpl(abortedDirectories, isBaseInRawFormat, original, obsolete, deltas, base);
+ }
+
+ public static Map<Path, HdfsDirSnapshot> getHdfsDirSnapshots(final FileSystem fs, final Path path)
+ throws IOException {
+ Map<Path, HdfsDirSnapshot> dirToSnapshots = new HashMap<>();
+ RemoteIterator<LocatedFileStatus> itr = fs.listFiles(path, true);
+ while (itr.hasNext()) {
+ FileStatus fStatus = itr.next();
+ Path fPath = fStatus.getPath();
+ if (acidHiddenFileFilter.accept(fPath)) {
+ if (fStatus.isDirectory() && acidTempDirFilter.accept(fPath)) {
+ HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(fPath);
+ if (dirSnapshot == null) {
+ dirSnapshot = new HdfsDirSnapshotImpl(fPath);
+ dirToSnapshots.put(fPath, dirSnapshot);
+ }
+ } else {
+ Path parentDirPath = fPath.getParent();
+ if (acidTempDirFilter.accept(parentDirPath)) {
+ while (isChildOfDelta(parentDirPath, path)) {
+ // Some cases there are other directory layers between the delta and the datafiles
+ // (export-import mm table, insert with union all to mm table, skewed tables).
+ // But it does not matter for the AcidState, we just need the deltas and the data files
+ // So build the snapshot with the files inside the delta directory
+ parentDirPath = parentDirPath.getParent();
+ }
+ HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
if (dirSnapshot == null) {
- dirSnapshot = new HdfsDirSnapshotImpl(fPath, fStatus);
- dirToSnapshots.put(fPath, dirSnapshot);
+ dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath);
+ dirToSnapshots.put(parentDirPath, dirSnapshot);
}
- } else {
- Path parentDirPath = fPath.getParent();
- if (acidTempDirFilter.accept(parentDirPath)) {
- HdfsDirSnapshot dirSnapshot = dirToSnapshots.get(parentDirPath);
- if (dirSnapshot == null) {
- FileStatus parentDirFStatus = null;
- if (!parentDirPath.equals(path)) {
- parentDirFStatus = fs.getFileStatus(parentDirPath);
- }
- dirSnapshot = new HdfsDirSnapshotImpl(parentDirPath, parentDirFStatus);
- dirToSnapshots.put(parentDirPath, dirSnapshot);
- }
- // We're not filtering out the metadata file and acid format file, as they represent parts of a valid snapshot
- // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task
- if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) {
- dirSnapshot.addMetadataFile(fStatus);
- } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) {
- dirSnapshot.addOrcAcidFormatFile(fStatus);
- } else {
- dirSnapshot.addFile(fStatus);
- }
+ // We're not filtering out the metadata file and acid format file,
+ // as they represent parts of a valid snapshot
+ // We're not using the cached values downstream, but we can potentially optimize more in a follow-up task
+ if (fStatus.getPath().toString().contains(MetaDataFile.METADATA_FILE)) {
+ dirSnapshot.addMetadataFile(fStatus);
+ } else if (fStatus.getPath().toString().contains(OrcAcidVersion.ACID_FORMAT)) {
+ dirSnapshot.addOrcAcidFormatFile(fStatus);
+ } else {
+ dirSnapshot.addFile(fStatus);
}
}
}
}
- return dirToSnapshots;
- } catch (IOException e) {
- throw new IOException(e);
}
+ return dirToSnapshots;
+ }
+
+ private static boolean isChildOfDelta(Path childDir, Path rootPath) {
+ if (childDir.toUri().toString().length() <= rootPath.toUri().toString().length()) {
+ return false;
+ }
+ // We do not want to look outside the original directory
+ String fullName = childDir.toUri().toString().substring(rootPath.toUri().toString().length() + 1);
+ String dirName = childDir.getName();
+ return (fullName.startsWith(BASE_PREFIX) && !dirName.startsWith(BASE_PREFIX)) ||
+ (fullName.startsWith(DELTA_PREFIX) && !dirName.startsWith(DELTA_PREFIX)) ||
+ (fullName.startsWith(DELETE_DELTA_PREFIX) && !dirName.startsWith(DELETE_DELTA_PREFIX));
}
/**
@@ -1485,7 +1482,7 @@ public class AcidUtils {
* with additional properties about the dir (like isBase etc)
*
*/
- public static interface HdfsDirSnapshot {
+ public interface HdfsDirSnapshot {
public Path getPath();
public void addOrcAcidFormatFile(FileStatus fStatus);
@@ -1496,14 +1493,9 @@ public class AcidUtils {
public FileStatus getMetadataFile(FileStatus fStatus);
- // FileStatus of this HDFS directory
- public FileStatus getFileStatus();
-
// Get the list of files if any within this directory
public List<FileStatus> getFiles();
- public void setFileStatus(FileStatus fStatus);
-
public void addFile(FileStatus file);
// File id or null
@@ -1530,7 +1522,6 @@ public class AcidUtils {
public static class HdfsDirSnapshotImpl implements HdfsDirSnapshot {
private Path dirPath;
- private FileStatus fStatus;
private FileStatus metadataFStatus = null;
private FileStatus orcAcidFormatFStatus = null;
private List<FileStatus> files = new ArrayList<FileStatus>();
@@ -1540,31 +1531,19 @@ public class AcidUtils {
private Boolean isValidBase = null;
private Boolean isCompactedBase = null;
- public HdfsDirSnapshotImpl(Path path, FileStatus fStatus, List<FileStatus> files) {
+ public HdfsDirSnapshotImpl(Path path, List<FileStatus> files) {
this.dirPath = path;
- this.fStatus = fStatus;
this.files = files;
}
- public HdfsDirSnapshotImpl(Path path, FileStatus fStatus) {
+ public HdfsDirSnapshotImpl(Path path) {
this.dirPath = path;
- this.fStatus = fStatus;
}
@Override
public Path getPath() {
return dirPath;
}
-
- @Override
- public FileStatus getFileStatus() {
- return fStatus;
- }
-
- @Override
- public void setFileStatus(FileStatus fStatus) {
- this.fStatus = fStatus;
- }
@Override
public List<FileStatus> getFiles() {
@@ -1676,35 +1655,28 @@ public class AcidUtils {
* causes anything written previously to be ignored (hence the overwrite). In this case, base_x
* is visible if writeid:x is committed for current reader.
*/
- private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList,
- FileSystem fs) throws IOException {
- if(parsedBase.getWriteId() == Long.MIN_VALUE) {
- //such base is created by 1st compaction in case of non-acid to acid table conversion
- //By definition there are no open txns with id < 1.
- return true;
- }
- if (writeIdList.getMinOpenWriteId() != null && parsedBase.getWriteId() <= writeIdList.getMinOpenWriteId()) {
- return true;
- }
- if(isCompactedBase(parsedBase, fs, (HdfsDirSnapshot) null)) {
- return writeIdList.isValidBase(parsedBase.getWriteId());
- }
- //if here, it's a result of IOW
- return writeIdList.isWriteIdValid(parsedBase.getWriteId());
- }
-
- private static boolean isValidBase(HdfsDirSnapshot dirSnapshot, ParsedBase parsedBase, ValidWriteIdList writeIdList,
- FileSystem fs) throws IOException {
+ private static boolean isValidBase(ParsedBase parsedBase, ValidWriteIdList writeIdList, FileSystem fs,
+ HdfsDirSnapshot dirSnapshot) throws IOException {
boolean isValidBase;
- if (dirSnapshot.isValidBase() != null) {
+ if (dirSnapshot != null && dirSnapshot.isValidBase() != null) {
isValidBase = dirSnapshot.isValidBase();
} else {
- if (isCompactedBase(parsedBase, fs, dirSnapshot)) {
+ if (parsedBase.getWriteId() == Long.MIN_VALUE) {
+ //such base is created by 1st compaction in case of non-acid to acid table conversion
+ //By definition there are no open txns with id < 1.
+ isValidBase = true;
+ } else if (writeIdList.getMinOpenWriteId() != null && parsedBase.getWriteId() <= writeIdList
+ .getMinOpenWriteId()) {
+ isValidBase = true;
+ } else if (isCompactedBase(parsedBase, fs, dirSnapshot)) {
isValidBase = writeIdList.isValidBase(parsedBase.getWriteId());
} else {
+ // if here, it's a result of IOW
isValidBase = writeIdList.isWriteIdValid(parsedBase.getWriteId());
}
- dirSnapshot.setIsValidBase(isValidBase);
+ if (dirSnapshot != null) {
+ dirSnapshot.setIsValidBase(isValidBase);
+ }
}
return isValidBase;
}
@@ -1724,78 +1696,39 @@ public class AcidUtils {
HdfsDirSnapshot snapshot) throws IOException {
return parsedBase.getVisibilityTxnId() > 0 || MetaDataFile.isCompacted(parsedBase.getBaseDirPath(), fs, snapshot);
}
-
- private static void getChildState(FileStatus child, HdfsFileStatusWithId childWithId,
- ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
- List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase,
- boolean ignoreEmptyFiles, List<Path> aborted, Map<String, String> tblproperties,
- FileSystem fs, ValidTxnList validTxnList) throws IOException {
- Path p = child.getPath();
- String fn = p.getName();
- if (!child.isDirectory()) {
- if (!ignoreEmptyFiles || child.getLen() != 0) {
- original.add(createOriginalObj(childWithId, child));
- }
- return;
- }
- if (fn.startsWith(BASE_PREFIX)) {
- ParsedBase parsedBase = ParsedBase.parseBase(p);
- if(!isDirUsable(child.getPath(), parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
- return;
- }
- final long writeId = parsedBase.getWriteId();
- if(bestBase.oldestBaseWriteId > writeId) {
- //keep track for error reporting
- bestBase.oldestBase = p;
- bestBase.oldestBaseWriteId = writeId;
- }
- if (bestBase.status == null) {
- if(isValidBase(parsedBase, writeIdList, fs)) {
- bestBase.status = child;
- bestBase.writeId = writeId;
- }
- } else if (bestBase.writeId < writeId) {
- if(isValidBase(parsedBase, writeIdList, fs)) {
- obsolete.add(bestBase.status.getPath());
- bestBase.status = child;
- bestBase.writeId = writeId;
- }
- } else {
- obsolete.add(child.getPath());
+
+ private static void getChildState(HdfsFileStatusWithId childWithId, ValidWriteIdList writeIdList,
+ List<ParsedDelta> working, List<Path> originalDirectories, List<HdfsFileStatusWithId> original,
+ List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<Path> aborted, FileSystem fs,
+ ValidTxnList validTxnList) throws IOException {
+ Path childPath = childWithId.getFileStatus().getPath();
+ String fn = childPath.getName();
+ if (!childWithId.getFileStatus().isDirectory()) {
+ if (!ignoreEmptyFiles || childWithId.getFileStatus().getLen() != 0) {
+ original.add(childWithId);
}
+ } else if (fn.startsWith(BASE_PREFIX)) {
+ processBaseDir(childPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, null);
} else if (fn.startsWith(DELTA_PREFIX) || fn.startsWith(DELETE_DELTA_PREFIX)) {
- String deltaPrefix = fn.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
- ParsedDelta delta = parseDelta(child.getPath(), deltaPrefix, fs, null);
- if(!isDirUsable(child.getPath(), delta.getVisibilityTxnId(), aborted, validTxnList)) {
- return;
- }
- if(ValidWriteIdList.RangeResponse.ALL ==
- writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
- aborted.add(child.getPath());
- }
- else if (writeIdList.isWriteIdRangeValid(
- delta.minWriteId, delta.maxWriteId) != ValidWriteIdList.RangeResponse.NONE) {
- working.add(delta);
- }
+ processDeltaDir(childPath, writeIdList, working, aborted, fs, validTxnList, null);
} else {
// This is just the directory. We need to recurse and find the actual files. But don't
// do this until we have determined there is no base. This saves time. Plus,
// it is possible that the cleaner is running and removing these original files,
// in which case recursing through them could cause us to get an error.
- originalDirectories.add(child.getPath());
+ originalDirectories.add(childPath);
}
}
-
+
private static void getChildState(Path candidateDirectory, Map<Path, HdfsDirSnapshot> dirSnapshots,
ValidWriteIdList writeIdList, List<ParsedDelta> working, List<Path> originalDirectories,
- List<HdfsFileStatusWithId> original,
- List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles, List<Path> aborted,
- Map<String, String> tblproperties, FileSystem fs, ValidTxnList validTxnList) throws IOException {
+ List<HdfsFileStatusWithId> original, List<Path> obsolete, TxnBase bestBase, boolean ignoreEmptyFiles,
+ List<Path> aborted, FileSystem fs, ValidTxnList validTxnList) throws IOException {
for (HdfsDirSnapshot dirSnapshot : dirSnapshots.values()) {
- FileStatus fStat = dirSnapshot.getFileStatus();
Path dirPath = dirSnapshot.getPath();
String dirName = dirPath.getName();
- if (dirPath.equals(candidateDirectory)) {
+ // dirPath may contains the filesystem prefix
+ if (dirPath.toString().endsWith(candidateDirectory.toString())) {
// if the candidateDirectory is itself a delta directory, we need to add originals in that directory
// and return. This is the case when compaction thread calls getChildState.
for (FileStatus fileStatus : dirSnapshot.getFiles()) {
@@ -1804,44 +1737,9 @@ public class AcidUtils {
}
}
} else if (dirName.startsWith(BASE_PREFIX)) {
- bestBase.dirSnapShot = dirSnapshot;
- ParsedBase parsedBase = ParsedBase.parseBase(dirPath);
- if (!isDirUsable(dirPath, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
- continue;
- }
- final long writeId = parsedBase.getWriteId();
- if (bestBase.oldestBaseWriteId > writeId) {
- //keep track for error reporting
- bestBase.oldestBase = dirPath;
- bestBase.oldestBaseWriteId = writeId;
- }
- if (bestBase.status == null) {
- if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
- bestBase.status = fStat;
- bestBase.writeId = writeId;
- }
- } else if (bestBase.writeId < writeId) {
- if (isValidBase(dirSnapshot, parsedBase, writeIdList, fs)) {
- obsolete.add(bestBase.status.getPath());
- bestBase.status = fStat;
- bestBase.writeId = writeId;
- }
- } else {
- obsolete.add(dirPath);
- }
+ processBaseDir(dirPath, writeIdList, obsolete, bestBase, aborted, fs, validTxnList, dirSnapshot);
} else if (dirName.startsWith(DELTA_PREFIX) || dirName.startsWith(DELETE_DELTA_PREFIX)) {
- String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
- ParsedDelta delta = parseDelta(dirPath, deltaPrefix, fs, dirSnapshot);
- if (!isDirUsable(dirPath, delta.getVisibilityTxnId(), aborted, validTxnList)) {
- continue;
- }
- if (ValidWriteIdList.RangeResponse.ALL == writeIdList
- .isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
- aborted.add(dirPath);
- } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId)
- != ValidWriteIdList.RangeResponse.NONE) {
- working.add(delta);
- }
+ processDeltaDir(dirPath, writeIdList, working, aborted, fs, validTxnList, dirSnapshot);
} else {
originalDirectories.add(dirPath);
for (FileStatus stat : dirSnapshot.getFiles()) {
@@ -1852,27 +1750,72 @@ public class AcidUtils {
}
}
}
-
+
+ private static void processBaseDir(Path baseDir, ValidWriteIdList writeIdList, List<Path> obsolete, TxnBase bestBase,
+ List<Path> aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot)
+ throws IOException {
+ ParsedBase parsedBase = ParsedBase.parseBase(baseDir);
+ if (!isDirUsable(baseDir, parsedBase.getVisibilityTxnId(), aborted, validTxnList)) {
+ return;
+ }
+ final long writeId = parsedBase.getWriteId();
+ if (bestBase.oldestBaseWriteId > writeId) {
+ // keep track for error reporting
+ bestBase.oldestBase = baseDir;
+ bestBase.oldestBaseWriteId = writeId;
+ }
+ if (bestBase.basePath == null) {
+ if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) {
+ bestBase.basePath = baseDir;
+ bestBase.writeId = writeId;
+ }
+ } else if (bestBase.writeId < writeId) {
+ if (isValidBase(parsedBase, writeIdList, fs, dirSnapshot)) {
+ obsolete.add(bestBase.basePath);
+ bestBase.basePath = baseDir;
+ bestBase.writeId = writeId;
+ }
+ } else {
+ obsolete.add(baseDir);
+ }
+ }
+
+ private static void processDeltaDir(Path deltadir, ValidWriteIdList writeIdList, List<ParsedDelta> working,
+ List<Path> aborted, FileSystem fs, ValidTxnList validTxnList, AcidUtils.HdfsDirSnapshot dirSnapshot)
+ throws IOException {
+ String dirName = deltadir.getName();
+ String deltaPrefix = dirName.startsWith(DELTA_PREFIX) ? DELTA_PREFIX : DELETE_DELTA_PREFIX;
+ ParsedDelta delta = parseDelta(deltadir, deltaPrefix, fs, dirSnapshot);
+ if (!isDirUsable(deltadir, delta.getVisibilityTxnId(), aborted, validTxnList)) {
+ return;
+ }
+ if (ValidWriteIdList.RangeResponse.ALL == writeIdList.isWriteIdRangeAborted(delta.minWriteId, delta.maxWriteId)) {
+ aborted.add(deltadir);
+ } else if (writeIdList.isWriteIdRangeValid(delta.minWriteId, delta.maxWriteId)
+ != ValidWriteIdList.RangeResponse.NONE) {
+ working.add(delta);
+ }
+ }
+
/**
* checks {@code visibilityTxnId} to see if {@code child} is committed in current snapshot
*/
- private static boolean isDirUsable(Path child, long visibilityTxnId,
- List<Path> aborted, ValidTxnList validTxnList) {
- if(validTxnList == null) {
+ private static boolean isDirUsable(Path child, long visibilityTxnId, List<Path> aborted, ValidTxnList validTxnList) {
+ if (validTxnList == null) {
throw new IllegalArgumentException("No ValidTxnList for " + child);
}
- if(!validTxnList.isTxnValid(visibilityTxnId)) {
+ if (!validTxnList.isTxnValid(visibilityTxnId)) {
boolean isAborted = validTxnList.isTxnAborted(visibilityTxnId);
- if(isAborted) {
- aborted.add(child);//so we can clean it up
+ if (isAborted) {
+ aborted.add(child);// so we can clean it up
}
LOG.debug("getChildState() ignoring(" + aborted + ") " + child);
return false;
}
return true;
}
- public static HdfsFileStatusWithId createOriginalObj(
- HdfsFileStatusWithId childWithId, FileStatus child) {
+
+ public static HdfsFileStatusWithId createOriginalObj(HdfsFileStatusWithId childWithId, FileStatus child) {
return childWithId != null ? childWithId : new HdfsFileStatusWithoutId(child);
}
@@ -2157,7 +2100,7 @@ public class AcidUtils {
* Returns the logical end of file for an acid data file.
*
* This relies on the fact that if delta_x_y has no committed transactions it wil be filtered out
- * by {@link #getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean, Map, boolean)}
+ * by {@link #getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean)}
* and so won't be read at all.
* @param file - data file to read/compute splits on
*/
@@ -2669,7 +2612,7 @@ public class AcidUtils {
+ " from " + jc.get(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY));
return null;
}
- Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false, null, true);
+ Directory acidInfo = AcidUtils.getAcidState(fs, dir, jc, idList, null, false);
// Assume that for an MM table, or if there's only the base directory, we are good.
if (!acidInfo.getCurrentDirectories().isEmpty() && AcidUtils.isFullAcidTable(table)) {
Utilities.FILE_OP_LOGGER.warn(
@@ -2707,7 +2650,7 @@ public class AcidUtils {
// If ACID/MM tables, then need to find the valid state wrt to given ValidWriteIdList.
ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(validWriteIdStr);
Directory acidInfo = AcidUtils.getAcidState(dataPath.getFileSystem(conf), dataPath, conf, validWriteIdList, null,
- false, null, false);
+ false);
for (HdfsFileStatusWithId hfs : acidInfo.getOriginalFiles()) {
pathList.add(hfs.getFileStatus().getPath());
@@ -3214,17 +3157,14 @@ public class AcidUtils {
* @param candidateDirectory the partition directory to analyze
* @param conf the configuration
* @param writeIdList the list of write ids that we are reading
- * @param useFileIds
- * @param ignoreEmptyFiles
- * @param tblproperties
- * @param generateDirSnapshots
+ * @param useFileIds It will be set to true, if the FileSystem supports listing with fileIds
+ * @param ignoreEmptyFiles Ignore files with 0 length
* @return directory state
* @throws IOException on errors
*/
public static Directory getAcidStateFromCache(Supplier<FileSystem> fileSystem,
Path candidateDirectory, Configuration conf,
- ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles,
- Map<String, String> tblproperties, boolean generateDirSnapshots) throws IOException {
+ ValidWriteIdList writeIdList, Ref<Boolean> useFileIds, boolean ignoreEmptyFiles) throws IOException {
int dirCacheDuration = HiveConf.getIntVar(conf,
ConfVars.HIVE_TXN_ACID_DIR_CACHE_DURATION);
@@ -3232,7 +3172,7 @@ public class AcidUtils {
if (dirCacheDuration < 0) {
LOG.debug("dirCache is not enabled");
return getAcidState(fileSystem.get(), candidateDirectory, conf, writeIdList,
- useFileIds, ignoreEmptyFiles, tblproperties, generateDirSnapshots);
+ useFileIds, ignoreEmptyFiles);
} else {
initDirCache(dirCacheDuration);
}
@@ -3270,8 +3210,7 @@ public class AcidUtils {
// compute and add to cache
if (recompute || (value == null)) {
Directory dirInfo = getAcidState(fileSystem.get(), candidateDirectory, conf,
- writeIdList, useFileIds, ignoreEmptyFiles, tblproperties,
- generateDirSnapshots);
+ writeIdList, useFileIds, ignoreEmptyFiles);
value = new DirInfoValue(writeIdList.writeToString(), dirInfo);
if (value.dirInfo != null && value.dirInfo.getBaseDirectory() != null
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index ca234cf..073a3cb 100755
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@ -696,7 +696,7 @@ public class HiveInputFormat<K extends WritableComparable, V extends Writable>
}
if (hasAcidDirs) {
AcidUtils.Directory dirInfo = AcidUtils.getAcidState(
- fs, dir, conf, validWriteIdList, Ref.from(false), true, null, false);
+ fs, dir, conf, validWriteIdList, Ref.from(false), true);
// Find the base, created for IOW.
Path base = dirInfo.getBaseDirectory();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 1059cb2..bf871f0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -1215,8 +1215,9 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
/**
* For plain or acid tables this is the root of the partition (or table if not partitioned).
* For MM table this is delta/ or base/ dir. In MM case applying of the ValidTxnList that
- * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} normally does has already
- * been done in {@link HiveInputFormat#processPathsForMmRead(List, JobConf, ValidWriteIdList)}.
+ * {@link AcidUtils#getAcidState(FileSystem, Path, Configuration, ValidWriteIdList, Ref, boolean)}
+ * normally does has already been done in
+ * {@link HiveInputFormat#processPathsForMmRead(List, Configuration, ValidWriteIdList, List, List)}
*/
private final Path dir;
private final Ref<Boolean> useFileIds;
@@ -1257,10 +1258,10 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
private Directory getAcidState() throws IOException {
if (context.isAcid && context.splitStrategyKind == SplitStrategyKind.BI) {
return AcidUtils.getAcidStateFromCache(fs, dir, context.conf,
- context.writeIdList, useFileIds, true, null, true);
+ context.writeIdList, useFileIds, true);
} else {
return AcidUtils.getAcidState(fs.get(), dir, context.conf, context.writeIdList,
- useFileIds, true, null, true);
+ useFileIds, true);
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index 16c9159..6739a2a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -463,7 +463,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
*/
//the split is from something other than the 1st file of the logical bucket - compute offset
AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
- validWriteIdList, Ref.from(false), true, null, true);
+ validWriteIdList, Ref.from(false), true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
if (bucketIdFromPath != bucketId) {
@@ -577,7 +577,7 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{
assert options.getOffset() == 0;
assert options.getMaxOffset() == Long.MAX_VALUE;
AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, mergerOptions.getRootPath(), conf,
- validWriteIdList, Ref.from(false), true, null, true);
+ validWriteIdList, Ref.from(false), true);
/**
* Note that for reading base_x/ or delta_x_x/ with non-acid schema,
* {@link Options#getRootPath()} is set to base_x/ or delta_x_x/ which causes all it's
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
index 598220b..51c06c7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcAcidRowBatchReader.java
@@ -725,7 +725,7 @@ public class VectorizedOrcAcidRowBatchReader
//statementId is from directory name (or 0 if there is none)
.statementId(syntheticTxnInfo.statementId).bucket(bucketId));
AcidUtils.Directory directoryState = AcidUtils.getAcidState(null, syntheticTxnInfo.folder, conf,
- validWriteIdList, Ref.from(false), true, null, true);
+ validWriteIdList, Ref.from(false), true);
for (HadoopShims.HdfsFileStatusWithId f : directoryState.getOriginalFiles()) {
int bucketIdFromPath = AcidUtils.parseBucketId(f.getFileStatus().getPath());
if (bucketIdFromPath != bucketId) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 2a15913..abde748 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -221,7 +221,7 @@ public class Cleaner extends MetaStoreCompactorThread {
throws IOException, NoSuchObjectException, MetaException {
Path locPath = new Path(location);
AcidUtils.Directory dir = AcidUtils.getAcidState(locPath.getFileSystem(conf), locPath, conf, writeIdList, Ref.from(
- false), false, null, false);
+ false), false);
List<Path> obsoleteDirs = dir.getObsolete();
/**
* add anything in 'dir' that only has data from aborted transactions - no one should be
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 4e5d5b0..4365817 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -264,7 +264,7 @@ public class CompactorMR {
maxDeltasToHandle, -1, conf, msc, ci.id, jobName);
}
//now recompute state since we've done minor compactions and have different 'best' set of deltas
- dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false, null, false);
+ dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf, writeIds, Ref.from(false), false);
}
StringableList dirsToSearch = new StringableList();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 5b2c937..552e694 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -322,13 +322,13 @@ public class Initiator extends MetaStoreCompactorThread {
boolean noBase = false;
Path location = new Path(sd.getLocation());
FileSystem fs = location.getFileSystem(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false, tblproperties, false);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, location, conf, writeIds, Ref.from(false), false);
Path base = dir.getBaseDirectory();
long baseSize = 0;
FileStatus stat = null;
if (base != null) {
stat = fs.getFileStatus(base);
- if (!stat.isDir()) {
+ if (!stat.isDirectory()) {
LOG.error("Was assuming base " + base.toString() + " is directory, but it's a file!");
return null;
}
@@ -344,7 +344,7 @@ public class Initiator extends MetaStoreCompactorThread {
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
for (AcidUtils.ParsedDelta delta : deltas) {
stat = fs.getFileStatus(delta.getPath());
- if (!stat.isDir()) {
+ if (!stat.isDirectory()) {
LOG.error("Was assuming delta " + delta.getPath().toString() + " is a directory, " +
"but it's a file!");
return null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index d83a50f..9344933 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -49,8 +49,7 @@ final class MinorQueryCompactor extends QueryCompactor {
AcidUtils
.setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters()));
AcidUtils.Directory dir = AcidUtils
- .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
- table.getParameters(), false);
+ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false);
// Set up the session for driver.
HiveConf conf = new HiveConf(hiveConf);
conf.set(HiveConf.ConfVars.HIVE_QUOTEDID_SUPPORT.varname, "column");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index 5e11d8d..bf7309a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -46,8 +46,7 @@ final class MmMajorQueryCompactor extends QueryCompactor {
LOG.debug("Going to delete directories for aborted transactions for MM table " + table.getDbName() + "." + table
.getTableName());
AcidUtils.Directory dir = AcidUtils
- .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false,
- table.getParameters(), false);
+ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds, Ref.from(false), false);
QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
// Set up the session for driver.
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index 1bdec7d..9f1d379 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -48,9 +48,9 @@ final class MmMinorQueryCompactor extends QueryCompactor {
"Going to delete directories for aborted transactions for MM table " + table.getDbName()
+ "." + table.getTableName());
- AcidUtils.Directory dir = AcidUtils.getAcidState(null,
- new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
- Ref.from(false), false, table.getParameters(), false);
+ AcidUtils.Directory dir = AcidUtils
+ .getAcidState(null, new Path(storageDescriptor.getLocation()), hiveConf, writeIds,
+ Ref.from(false), false);
QueryCompactor.Util.removeFilesForMmTable(hiveConf, dir);
HiveConf driverConf = setUpDriverSession(hiveConf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 75941b3..784c95b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -497,7 +497,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread {
// Don't start compaction or cleaning if not necessary
AcidUtils.Directory dir = AcidUtils.getAcidState(null, new Path(sd.getLocation()), conf,
- tblValidWriteIds, Ref.from(false), true, null, false);
+ tblValidWriteIds, Ref.from(false), true);
if (!isEnoughToCompact(ci.isMajorCompaction(), dir, sd)) {
if (needsCleaning(dir, sd)) {
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 337f469..61f2bd2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -932,11 +932,13 @@ public class TestTxnCommands2 {
int[][] tableData = {{1,2},{3,4},{5,6}};
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData));
runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=2) (a,b) " + makeValuesClause(tableData));
- txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR));
+ CompactionRequest request = new CompactionRequest("default", Table.ACIDTBLPART.name(), CompactionType.MAJOR);
+ request.setPartitionname("p=1");
+ txnHandler.compact(request);
runWorker(hiveConf);
runCleaner(hiveConf);
runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = b + 1 where a = 3");
- txnHandler.compact(new CompactionRequest("default", Table.ACIDTBLPART.toString(), CompactionType.MAJOR));
+ txnHandler.compact(request);
runWorker(hiveConf);
runCleaner(hiveConf);
List<String> rs = runStatementOnDriver("select p,a,b from " + Table.ACIDTBLPART + " order by p, a, b");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
index f351f04..cadaeeb 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/TestAcidUtils.java
@@ -165,7 +165,7 @@ public class TestAcidUtils {
new MockFile("mock:/tbl/part1/_done", 0, new byte[0]),
new MockFile("mock:/tbl/part1/subdir/000000_0", 0, new byte[0]));
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "/tbl/part1"), conf,
- new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
assertEquals(null, dir.getBaseDirectory());
assertEquals(0, dir.getCurrentDirectories().size());
assertEquals(0, dir.getObsolete().size());
@@ -201,7 +201,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
- new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
assertEquals(null, dir.getBaseDirectory());
List<Path> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -244,7 +244,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
- new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
List<Path> obsoletes = dir.getObsolete();
assertEquals(5, obsoletes.size());
@@ -275,7 +275,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
- new ValidReaderWriteIdList(), null, false, null, true);
+ new ValidReaderWriteIdList(), null, false);
assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString());
assertEquals(0, dir.getObsolete().size());
assertEquals(0, dir.getOriginalFiles().size());
@@ -284,23 +284,6 @@ public class TestAcidUtils {
}
@Test
- public void testRecursiveDirListingIsNotReusedWhenSnapshotFalse() throws IOException {
- Configuration conf = new Configuration();
- MockFileSystem fs = new MockFileSystem(conf,
- new MockFile("mock:/tbl/part1/base_0/bucket_0", 500, new byte[0]),
- new MockFile("mock:/tbl/part1/base_0/_orc_acid_version", 10, new byte[0]));
- conf.set(ValidTxnList.VALID_TXNS_KEY,
- new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
- AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
- new ValidReaderWriteIdList(), null, false, null, false);
- assertEquals("mock:/tbl/part1/base_0", dir.getBaseDirectory().toString());
- assertEquals(0, dir.getObsolete().size());
- assertEquals(0, dir.getOriginalFiles().size());
- assertEquals(0, dir.getCurrentDirectories().size());
- assertEquals(2, fs.getNumOpenFileCalls());
- }
-
- @Test
public void testObsoleteOriginals() throws Exception {
Configuration conf = new Configuration();
MockFileSystem fs = new MockFileSystem(conf,
@@ -312,7 +295,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:150:"
- + Long.MAX_VALUE + ":"), null, false, null, false);
+ + Long.MAX_VALUE + ":"), null, false);
// Obsolete list should include the two original bucket files, and the old base dir
List<Path> obsoletes = dir.getObsolete();
assertEquals(3, obsoletes.size());
@@ -335,7 +318,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
- + Long.MAX_VALUE + ":"), null, false, null, false);
+ + Long.MAX_VALUE + ":"), null, false);
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<Path> obsolete = dir.getObsolete();
assertEquals(2, obsolete.size());
@@ -372,7 +355,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
- + Long.MAX_VALUE + ":"), null, false, null, false);
+ + Long.MAX_VALUE + ":"), null, false);
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<Path> obsolete = dir.getObsolete();
assertEquals(5, obsolete.size());
@@ -401,7 +384,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
- false, null, false);
+ false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -426,7 +409,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
- false, null, false);
+ false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -443,7 +426,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
- + Long.MAX_VALUE), null, false, null, false);
+ + Long.MAX_VALUE), null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(1, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -462,7 +445,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:3:"
- + Long.MAX_VALUE), null, false, null, false);
+ + Long.MAX_VALUE), null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(1, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -488,7 +471,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, new MockPath(fs, "mock:/tbl/part1"), conf,
- new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false, null, false);
+ new ValidReaderWriteIdList("tbl:100:" + Long.MAX_VALUE + ":"), null, false);
assertEquals("mock:/tbl/part1/base_49", dir.getBaseDirectory().toString());
List<Path> obsoletes = dir.getObsolete();
assertEquals(7, obsoletes.size());
@@ -531,7 +514,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
- + Long.MAX_VALUE + ":"), null, false, null, false);
+ + Long.MAX_VALUE + ":"), null, false);
assertEquals("mock:/tbl/part1/base_50", dir.getBaseDirectory().toString());
List<Path> obsolete = dir.getObsolete();
assertEquals(3, obsolete.size());
@@ -562,7 +545,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:"
- + Long.MAX_VALUE + ":"), null, false, null, false);
+ + Long.MAX_VALUE + ":"), null, false);
List<Path> obsolete = dir.getObsolete();
assertEquals(1, obsolete.size());
assertEquals("mock:/tbl/part1/delete_delta_50_50", obsolete.get(0).toString());
@@ -591,7 +574,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidCompactorWriteIdList("tbl:4:"
- + Long.MAX_VALUE + ":"), null, false, null, false);
+ + Long.MAX_VALUE + ":"), null, false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(2, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
@@ -616,7 +599,7 @@ public class TestAcidUtils {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[] {50}, new BitSet(), 1000, 55).writeToString());
AcidUtils.Directory dir = AcidUtils.getAcidState(fs, part, conf, new ValidReaderWriteIdList("tbl:100:4:4"), null,
- false, null, false);
+ false);
List<AcidUtils.ParsedDelta> delts = dir.getCurrentDirectories();
assertEquals(3, delts.size());
assertEquals("mock:/tbl/part1/delta_1_1", delts.get(0).getPath().toString());
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
index e4440e9..0d7b699 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java
@@ -3692,7 +3692,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- assertEquals(6, readOpsDelta);
+ assertEquals(5, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
@@ -3766,7 +3766,7 @@ public class TestInputOutputFormat {
readOpsDelta = statistics.getReadOps() - readOpsBefore;
}
}
- assertEquals(6, readOpsDelta);
+ assertEquals(5, readOpsDelta);
// revert back to local fs
conf.set("fs.defaultFS", "file:///");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index f63c40a..e6fae44 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -597,7 +597,7 @@ public class TestOrcRawRecordMerger {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testEmpty:200:" + Long.MAX_VALUE);
- AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false, null, false);
+ AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, false);
Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(),
BUCKET);
@@ -668,8 +668,7 @@ public class TestOrcRawRecordMerger {
conf.set(ValidTxnList.VALID_TXNS_KEY,
new ValidReadTxnList(new long[0], new BitSet(), 1000, Long.MAX_VALUE).writeToString());
ValidWriteIdList writeIdList = new ValidReaderWriteIdList("testNewBaseAndDelta:200:" + Long.MAX_VALUE);
- AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format, null,
- use130Format);
+ AcidUtils.Directory directory = AcidUtils.getAcidState(fs, root, conf, writeIdList, null, use130Format);
assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory());
assertEquals(new Path(root, use130Format ?
diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
index 3a3b267..5e0c386 100644
--- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
+++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java
@@ -943,7 +943,7 @@ public class TestStreaming {
private void checkDataWritten(Path partitionPath, long minTxn, long maxTxn, int buckets, int numExpectedFiles,
String... records) throws Exception {
ValidWriteIdList writeIds = getTransactionContext(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false, null, false);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, writeIds, null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -997,7 +997,8 @@ public class TestStreaming {
*/
private void checkDataWritten2(Path partitionPath, long minTxn, long maxTxn, int numExpectedFiles,
String validationQuery, boolean vectorize, String... records) throws Exception {
- AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1048,7 +1049,8 @@ public class TestStreaming {
return TxnCommonUtils.createValidReaderWriteIdList(v.get(0));
}
private void checkNothingWritten(Path partitionPath) throws Exception {
- AcidUtils.Directory dir = AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false, null, false);
+ AcidUtils.Directory dir =
+ AcidUtils.getAcidState(null, partitionPath, conf, getTransactionContext(conf), null, false);
Assert.assertEquals(0, dir.getObsolete().size());
Assert.assertEquals(0, dir.getOriginalFiles().size());
List<AcidUtils.ParsedDelta> current = dir.getCurrentDirectories();
@@ -1995,7 +1997,7 @@ public class TestStreaming {
/*now both batches have committed (but not closed) so we for each primary file we expect a side
file to exist and indicate the true length of primary file*/
FileSystem fs = partLoc.getFileSystem(conf);
- AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
+ AcidUtils.Directory dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false);
for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());
@@ -2020,7 +2022,7 @@ public class TestStreaming {
//so each of 2 deltas has 1 bucket0 and 1 bucket0_flush_length. Furthermore, each bucket0
//has now received more data(logically - it's buffered) but it is not yet committed.
//lets check that side files exist, etc
- dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false, null, false);
+ dir = AcidUtils.getAcidState(fs, partLoc, conf, getTransactionContext(conf), null, false);
for (AcidUtils.ParsedDelta pd : dir.getCurrentDirectories()) {
for (FileStatus stat : fs.listStatus(pd.getPath(), AcidUtils.bucketFileFilter)) {
Path lengthFile = OrcAcidUtils.getSideFile(stat.getPath());