You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/25 21:43:47 UTC
[28/28] hive git commit: HIVE-14953 : don't use globStatus on S3 in
MM tables (Sergey Shelukhin)
HIVE-14953 : don't use globStatus on S3 in MM tables (Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0f7f4ed8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0f7f4ed8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0f7f4ed8
Branch: refs/heads/hive-14535
Commit: 0f7f4ed83fffb355666ebf0a0a259872156a133c
Parents: 65a380d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Oct 25 14:06:14 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Oct 25 14:06:14 2016 -0700
----------------------------------------------------------------------
.../hadoop/hive/common/ValidWriteIds.java | 5 +-
.../org/apache/hadoop/hive/conf/HiveConf.java | 4 +
.../apache/hadoop/hive/ql/exec/Utilities.java | 174 +++++++++++++------
.../apache/hadoop/hive/ql/metadata/Hive.java | 35 ++--
4 files changed, 150 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index 6b38247..c61b63a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -122,18 +122,17 @@ public class ValidWriteIds {
public static class IdPathFilter implements PathFilter {
- private final String mmDirName, tmpPrefix;
+ private final String mmDirName;
private final boolean isMatch;
public IdPathFilter(long writeId, boolean isMatch) {
this.mmDirName = ValidWriteIds.getMmFilePrefix(writeId);
- this.tmpPrefix = "_tmp." + mmDirName;
this.isMatch = isMatch;
}
@Override
public boolean accept(Path path) {
String name = path.getName();
- return isMatch == (name.equals(mmDirName) || name.equals(tmpPrefix));
+ return isMatch == name.equals(mmDirName);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8a00f07..6848811 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3141,6 +3141,10 @@ public class HiveConf extends Configuration {
"MM write ID will not be removed up for that long after it has been aborted;\n" +
"this is to work around potential races e.g. with FS visibility, when deleting files."),
+
+ HIVE_MM_AVOID_GLOBSTATUS_ON_S3("hive.mm.avoid.s3.globstatus", true,
+ "Whether to use listFiles (optimized on S3) instead of globStatus when on S3."),
+
HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
"hive.security.authenticator.manager,hive.security.authorization.manager," +
"hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager," +
http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a7050ab..e0af81e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -85,8 +85,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
@@ -1524,6 +1526,18 @@ public final class Utilities {
? conf.getTable().getNumBuckets() : 0;
return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null);
}
+
+ private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
+ FileStatus[] items = fs.listStatus(path);
+ // remove empty directory since DP insert should not generate empty partitions.
+ // empty directories could be generated by crashed Task/ScriptOperator
+ if (items.length != 0) return false;
+ if (!fs.delete(path, true)) {
+ LOG.error("Cannot delete empty directory " + path);
+ throw new IOException("Cannot delete empty directory " + path);
+ }
+ return true;
+ }
public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
int dpLevels, int numBuckets, Configuration hconf, Long mmWriteId) throws IOException {
@@ -1535,21 +1549,15 @@ public final class Utilities {
if (dpLevels > 0) {
FileStatus parts[] = fileStats;
for (int i = 0; i < parts.length; ++i) {
- assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
+ assert parts[i].isDirectory() : "dynamic partition " + parts[i].getPath()
+ " is not a directory";
- Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + parts[i].getPath());
- FileStatus[] items = fs.listStatus(parts[i].getPath());
-
- // remove empty directory since DP insert should not generate empty partitions.
- // empty directories could be generated by crashed Task/ScriptOperator
- if (items.length == 0) {
- if (!fs.delete(parts[i].getPath(), true)) {
- LOG.error("Cannot delete empty directory " + parts[i].getPath());
- throw new IOException("Cannot delete empty directory " + parts[i].getPath());
- }
+ Path path = parts[i].getPath();
+ Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + path);
+ if (removeEmptyDpDirectory(fs, path)) {
parts[i] = null;
continue;
}
+ FileStatus[] items = fs.listStatus(path);
if (mmWriteId != null) {
Path mmDir = parts[i].getPath();
@@ -1575,8 +1583,7 @@ public final class Utilities {
throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items));
}
Path mmDir = items[0].getPath();
- if (!items[0].isDirectory()
- || !mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) {
+ if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) {
throw new IOException("Unexpected non-MM directory " + mmDir);
}
Utilities.LOG14535.info(
@@ -3803,31 +3810,98 @@ public final class Utilities {
}
}
- public static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
- int lbLevels, PathFilter filter, long mmWriteId) throws IOException {
+ public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
+ int lbLevels, PathFilter filter, long mmWriteId, Configuration conf) throws IOException {
+ int skipLevels = dpLevels + lbLevels;
+ if (filter == null) {
+ filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
+ }
+ if (skipLevels == 0) {
+ return statusToPath(fs.listStatus(path, filter));
+ }
+ if (fs.getScheme().equalsIgnoreCase("s3a")
+ && HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3)) {
+ return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
+ }
+ return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, mmWriteId);
+ }
+
+ private static Path[] statusToPath(FileStatus[] statuses) {
+ if (statuses == null) return null;
+ Path[] paths = new Path[statuses.length];
+ for (int i = 0; i < statuses.length; ++i) {
+ paths[i] = statuses[i].getPath();
+ }
+ return paths;
+ }
+
+ private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs,
+ Path path, int skipLevels, PathFilter filter) throws IOException {
+ String lastRelDir = null;
+ HashSet<Path> results = new HashSet<Path>();
+ String relRoot = Path.getPathWithoutSchemeAndAuthority(path).toString();
+ if (!relRoot.endsWith(Path.SEPARATOR)) {
+ relRoot += Path.SEPARATOR;
+ }
+ RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(path, true);
+ while (allFiles.hasNext()) {
+ LocatedFileStatus lfs = allFiles.next();
+ Path dirPath = Path.getPathWithoutSchemeAndAuthority(lfs.getPath());
+ String dir = dirPath.toString();
+ if (!dir.startsWith(relRoot)) {
+ throw new IOException("Path " + lfs.getPath() + " is not under " + relRoot
+ + " (when shortened to " + dir + ")");
+ }
+ String subDir = dir.substring(relRoot.length());
+ Utilities.LOG14535.info("Looking at " + subDir + " from " + lfs.getPath());
+ // If sorted, we'll skip a bunch of files.
+ if (lastRelDir != null && subDir.startsWith(lastRelDir)) continue;
+ int startIx = skipLevels > 0 ? -1 : 0;
+ for (int i = 0; i < skipLevels; ++i) {
+ startIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
+ if (startIx == -1) {
+ Utilities.LOG14535.info("Expected level of nesting (" + skipLevels + ") is not "
+ + " present in " + subDir + " (from " + lfs.getPath() + ")");
+ break;
+ }
+ }
+ if (startIx == -1) continue;
+ int endIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
+ if (endIx == -1) {
+ Utilities.LOG14535.info("Expected level of nesting (" + (skipLevels + 1) + ") is not "
+ + " present in " + subDir + " (from " + lfs.getPath() + ")");
+ continue;
+ }
+ lastRelDir = subDir = subDir.substring(0, endIx);
+ Path candidate = new Path(relRoot, subDir);
+ Utilities.LOG14535.info("Considering MM directory candidate " + candidate);
+ if (!filter.accept(candidate)) continue;
+ results.add(fs.makeQualified(candidate));
+ }
+ return results.toArray(new Path[results.size()]);
+ }
+
+ private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs,
+ Path path, int skipLevels, PathFilter filter, long mmWriteId) throws IOException {
StringBuilder sb = new StringBuilder(path.toUri().getPath());
- for (int i = 0; i < dpLevels + lbLevels; i++) {
+ for (int i = 0; i < skipLevels; i++) {
sb.append(Path.SEPARATOR).append("*");
}
sb.append(Path.SEPARATOR).append(ValidWriteIds.getMmFilePrefix(mmWriteId));
- Utilities.LOG14535.info("Looking for files via: " + sb.toString());
Path pathPattern = new Path(path, sb.toString());
- if (filter == null) {
- // TODO: do we need this? Likely yes; we don't want mm_10 when we use ".../mm_1" pattern.
- filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
- }
- return filter == null ? fs.globStatus(pathPattern) : fs.globStatus(pathPattern, filter);
+ Utilities.LOG14535.info("Looking for files via: " + pathPattern);
+ return statusToPath(fs.globStatus(pathPattern, filter));
}
private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir,
int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter,
- long mmWriteId) throws IOException {
- FileStatus[] files = getMmDirectoryCandidates(
- fs, specPath, dpLevels, lbLevels, filter, mmWriteId);
+ long mmWriteId, Configuration conf) throws IOException {
+ Path[] files = getMmDirectoryCandidates(
+ fs, specPath, dpLevels, lbLevels, filter, mmWriteId, conf);
if (files != null) {
- for (FileStatus status : files) {
- Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
- tryDelete(fs, status.getPath());
+ for (Path path : files) {
+ Utilities.LOG14535.info("Deleting " + path + " on failure");
+ tryDelete(fs, path);
}
}
Utilities.LOG14535.info("Deleting " + manifestDir + " on failure");
@@ -3882,15 +3956,15 @@ public final class Utilities {
if (!success) {
ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
- unionSuffix, filter, mmWriteId);
+ unionSuffix, filter, mmWriteId, hconf);
return;
}
Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")");
- FileStatus[] files = fs.listStatus(manifestDir);
+ FileStatus[] manifestFiles = fs.listStatus(manifestDir);
List<Path> manifests = new ArrayList<>();
- if (files != null) {
- for (FileStatus status : files) {
+ if (manifestFiles != null) {
+ for (FileStatus status : manifestFiles) {
Path path = status.getPath();
if (path.getName().endsWith(MANIFEST_EXTENSION)) {
Utilities.LOG14535.info("Reading manifest " + path);
@@ -3901,21 +3975,13 @@ public final class Utilities {
Utilities.LOG14535.info("Looking for files in: " + specPath);
ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
- files = getMmDirectoryCandidates(
- fs, specPath, dpLevels, lbLevels, filter, mmWriteId);
- ArrayList<FileStatus> mmDirectories = new ArrayList<>();
+ Path[] files = getMmDirectoryCandidates(
+ fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf);
+ ArrayList<Path> mmDirectories = new ArrayList<>();
if (files != null) {
- for (FileStatus status : files) {
- Path path = status.getPath();
+ for (Path path : files) {
Utilities.LOG14535.info("Looking at path: " + path);
- if (!status.isDirectory()) {
- if (!path.getName().endsWith(MANIFEST_EXTENSION)) {
- Utilities.LOG14535.warn("Unknown file found, deleting: " + path);
- tryDelete(fs, path);
- }
- } else {
- mmDirectories.add(status);
- }
+ mmDirectories.add(path);
}
}
@@ -3944,8 +4010,8 @@ public final class Utilities {
}
}
- for (FileStatus status : mmDirectories) {
- cleanMmDirectory(status.getPath(), fs, unionSuffix, committed);
+ for (Path path : mmDirectories) {
+ cleanMmDirectory(path, fs, unionSuffix, committed);
}
if (!committed.isEmpty()) {
@@ -3957,7 +4023,12 @@ public final class Utilities {
// TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing,
// so maintain parity here by not calling it at all.
if (lbLevels != 0) return;
- FileStatus[] finalResults = mmDirectories.toArray(new FileStatus[mmDirectories.size()]);
+ // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
+ // doesn't need tocheck anything except path and directory status for MM directories.
+ FileStatus[] finalResults = new FileStatus[mmDirectories.size()];
+ for (int i = 0; i < mmDirectories.size(); ++i) {
+ finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i));
+ }
List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, mmWriteId);
// create empty buckets if necessary
@@ -3967,6 +4038,12 @@ public final class Utilities {
}
}
+ private static final class PathOnlyFileStatus extends FileStatus {
+ public PathOnlyFileStatus(Path path) {
+ super(0, true, 0, 0, 0, path);
+ }
+ }
+
private static void cleanMmDirectory(Path dir, FileSystem fs,
String unionSuffix, HashSet<String> committed) throws IOException, HiveException {
for (FileStatus child : fs.listStatus(dir)) {
@@ -3975,7 +4052,6 @@ public final class Utilities {
if (committed.remove(childPath.toString())) continue; // A good file.
deleteUncommitedFile(childPath, fs);
} else if (!child.isDirectory()) {
- // TODO# needed? if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue;
if (committed.contains(childPath.toString())) {
throw new HiveException("Union FSOP has commited "
+ childPath + " outside of union directory" + unionSuffix);
http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 0a29895..c7ac452 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1589,6 +1589,7 @@ public class Hive {
List<Path> newFiles = null;
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin("MoveTask", "FileMoves");
+ // TODO: this assumes both paths are qualified; which they are, currently.
if (mmWriteId != null && loadPath.equals(newPartPath)) {
// MM insert query, move itself is a no-op.
Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath + " (MM)");
@@ -1705,7 +1706,7 @@ public class Hive {
FileSystem srcFs;
try {
srcFs = loadPath.getFileSystem(conf);
- srcs = srcFs.globStatus(loadPath);
+ srcs = srcFs.listStatus(loadPath);
} catch (IOException e) {
LOG.error("Error listing files", e);
throw new HiveException(e);
@@ -1847,29 +1848,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
Set<Path> validPartitions = new HashSet<Path>();
try {
FileSystem fs = loadPath.getFileSystem(conf);
- FileStatus[] leafStatus = null;
if (mmWriteId == null) {
- leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
+ FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
+ // Check for empty partitions
+ for (FileStatus s : leafStatus) {
+ if (!s.isDirectory()) {
+ throw new HiveException("partition " + s.getPath() + " is not a directory!");
+ }
+ Path dpPath = s.getPath();
+ Utilities.LOG14535.info("Found DP " + dpPath);
+ validPartitions.add(dpPath);
+ }
} else {
// The non-MM path only finds new partitions, as it is looking at the temp path.
// To produce the same effect, we will find all the partitions affected by this write ID.
- leafStatus = Utilities.getMmDirectoryCandidates(
- fs, loadPath, numDP, numLB, null, mmWriteId);
- }
- // Check for empty partitions
- for (FileStatus s : leafStatus) {
- if (mmWriteId == null && !s.isDirectory()) {
- throw new HiveException("partition " + s.getPath() + " is not a directory!");
- }
- Path dpPath = s.getPath();
- if (mmWriteId != null) {
- dpPath = dpPath.getParent(); // Skip the MM directory that we have found.
+ Path[] leafStatus = Utilities.getMmDirectoryCandidates(
+ fs, loadPath, numDP, numLB, null, mmWriteId, conf);
+ for (Path p : leafStatus) {
+ Path dpPath = p.getParent(); // Skip the MM directory that we have found.
for (int i = 0; i < numLB; ++i) {
dpPath = dpPath.getParent(); // Now skip the LB directories, if any...
}
+ Utilities.LOG14535.info("Found DP " + dpPath);
+ validPartitions.add(dpPath);
}
- Utilities.LOG14535.info("Found DP " + dpPath);
- validPartitions.add(dpPath);
}
} catch (IOException e) {
throw new HiveException(e);
@@ -2047,6 +2049,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
newFiles = Collections.synchronizedList(new ArrayList<Path>());
}
+ // TODO: this assumes both paths are qualified; which they are, currently.
if (mmWriteId != null && loadPath.equals(tbl.getPath())) {
Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath());
if (replace) {