You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/10/25 21:43:47 UTC

[28/28] hive git commit: HIVE-14953 : don't use globStatus on S3 in MM tables (Sergey Shelukhin)

HIVE-14953 : don't use globStatus on S3 in MM tables (Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0f7f4ed8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0f7f4ed8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0f7f4ed8

Branch: refs/heads/hive-14535
Commit: 0f7f4ed83fffb355666ebf0a0a259872156a133c
Parents: 65a380d
Author: Sergey Shelukhin <se...@apache.org>
Authored: Tue Oct 25 14:06:14 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Tue Oct 25 14:06:14 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/common/ValidWriteIds.java       |   5 +-
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 174 +++++++++++++------
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  35 ++--
 4 files changed, 150 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
index 6b38247..c61b63a 100644
--- a/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
+++ b/common/src/java/org/apache/hadoop/hive/common/ValidWriteIds.java
@@ -122,18 +122,17 @@ public class ValidWriteIds {
 
 
   public static class IdPathFilter implements PathFilter {
-    private final String mmDirName, tmpPrefix;
+    private final String mmDirName;
     private final boolean isMatch;
     public IdPathFilter(long writeId, boolean isMatch) {
       this.mmDirName = ValidWriteIds.getMmFilePrefix(writeId);
-      this.tmpPrefix = "_tmp." + mmDirName;
       this.isMatch = isMatch;
     }
 
     @Override
     public boolean accept(Path path) {
       String name = path.getName();
-      return isMatch == (name.equals(mmDirName) || name.equals(tmpPrefix));
+      return isMatch == name.equals(mmDirName);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8a00f07..6848811 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3141,6 +3141,10 @@ public class HiveConf extends Configuration {
         "MM write ID will not be removed up for that long after it has been aborted;\n" +
         "this is to work around potential races e.g. with FS visibility, when deleting files."),
 
+
+    HIVE_MM_AVOID_GLOBSTATUS_ON_S3("hive.mm.avoid.s3.globstatus", true,
+        "Whether to use listFiles (optimized on S3) instead of globStatus when on S3."),
+
     HIVE_CONF_RESTRICTED_LIST("hive.conf.restricted.list",
         "hive.security.authenticator.manager,hive.security.authorization.manager," +
         "hive.security.metastore.authorization.manager,hive.security.metastore.authenticator.manager," +

http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index a7050ab..e0af81e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -85,8 +85,10 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
@@ -1524,6 +1526,18 @@ public final class Utilities {
           ? conf.getTable().getNumBuckets() : 0;
     return removeTempOrDuplicateFiles(fs, fileStats, dpLevels, numBuckets, hconf, null);
   }
+  
+  private static boolean removeEmptyDpDirectory(FileSystem fs, Path path) throws IOException {
+    FileStatus[] items = fs.listStatus(path);
+    // remove empty directory since DP insert should not generate empty partitions.
+    // empty directories could be generated by crashed Task/ScriptOperator
+    if (items.length != 0) return false;
+    if (!fs.delete(path, true)) {
+      LOG.error("Cannot delete empty directory " + path);
+      throw new IOException("Cannot delete empty directory " + path);
+    }
+    return true;
+  }
 
   public static List<Path> removeTempOrDuplicateFiles(FileSystem fs, FileStatus[] fileStats,
       int dpLevels, int numBuckets, Configuration hconf, Long mmWriteId) throws IOException {
@@ -1535,21 +1549,15 @@ public final class Utilities {
     if (dpLevels > 0) {
       FileStatus parts[] = fileStats;
       for (int i = 0; i < parts.length; ++i) {
-        assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
+        assert parts[i].isDirectory() : "dynamic partition " + parts[i].getPath()
             + " is not a directory";
-        Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + parts[i].getPath());
-        FileStatus[] items = fs.listStatus(parts[i].getPath());
-
-        // remove empty directory since DP insert should not generate empty partitions.
-        // empty directories could be generated by crashed Task/ScriptOperator
-        if (items.length == 0) {
-          if (!fs.delete(parts[i].getPath(), true)) {
-            LOG.error("Cannot delete empty directory " + parts[i].getPath());
-            throw new IOException("Cannot delete empty directory " + parts[i].getPath());
-          }
+        Path path = parts[i].getPath();
+        Utilities.LOG14535.info("removeTempOrDuplicateFiles looking at DP " + path);
+        if (removeEmptyDpDirectory(fs, path)) {
           parts[i] = null;
           continue;
         }
+        FileStatus[] items = fs.listStatus(path);
 
         if (mmWriteId != null) {
           Path mmDir = parts[i].getPath();
@@ -1575,8 +1583,7 @@ public final class Utilities {
           throw new IOException("Unexpected directories for non-DP MM: " + Arrays.toString(items));
         }
         Path mmDir = items[0].getPath();
-        if (!items[0].isDirectory()
-            || !mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) {
+        if (!mmDir.getName().equals(ValidWriteIds.getMmFilePrefix(mmWriteId))) {
           throw new IOException("Unexpected non-MM directory " + mmDir);
         }
         Utilities.LOG14535.info(
@@ -3803,31 +3810,98 @@ public final class Utilities {
     }
   }
 
-  public static FileStatus[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
-      int lbLevels, PathFilter filter, long mmWriteId) throws IOException {
+  public static Path[] getMmDirectoryCandidates(FileSystem fs, Path path, int dpLevels,
+      int lbLevels, PathFilter filter, long mmWriteId, Configuration conf) throws IOException {
+    int skipLevels = dpLevels + lbLevels;
+    if (filter == null) {
+      filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
+    }
+    if (skipLevels == 0) {
+      return statusToPath(fs.listStatus(path, filter));
+    }
+    if (fs.getScheme().equalsIgnoreCase("s3a")
+        && HiveConf.getBoolVar(conf, ConfVars.HIVE_MM_AVOID_GLOBSTATUS_ON_S3)) {
+      return getMmDirectoryCandidatesRecursive(fs, path, skipLevels, filter);
+    }
+    return getMmDirectoryCandidatesGlobStatus(fs, path, skipLevels, filter, mmWriteId);
+  }
+
+  private static Path[] statusToPath(FileStatus[] statuses) {
+    if (statuses == null) return null;
+    Path[] paths = new Path[statuses.length];
+    for (int i = 0; i < statuses.length; ++i) {
+      paths[i] = statuses[i].getPath();
+    }
+    return paths;
+  }
+
+  private static Path[] getMmDirectoryCandidatesRecursive(FileSystem fs,
+      Path path, int skipLevels, PathFilter filter) throws IOException {
+    String lastRelDir = null;
+    HashSet<Path> results = new HashSet<Path>();
+    String relRoot = Path.getPathWithoutSchemeAndAuthority(path).toString();
+    if (!relRoot.endsWith(Path.SEPARATOR)) {
+      relRoot += Path.SEPARATOR;
+    }
+    RemoteIterator<LocatedFileStatus> allFiles = fs.listFiles(path, true);
+    while (allFiles.hasNext()) {
+      LocatedFileStatus lfs = allFiles.next();
+      Path dirPath = Path.getPathWithoutSchemeAndAuthority(lfs.getPath());
+      String dir = dirPath.toString();
+      if (!dir.startsWith(relRoot)) {
+        throw new IOException("Path " + lfs.getPath() + " is not under " + relRoot
+            + " (when shortened to " + dir + ")");
+      }
+      String subDir = dir.substring(relRoot.length());
+      Utilities.LOG14535.info("Looking at " + subDir + " from " + lfs.getPath());
+      // If sorted, we'll skip a bunch of files.
+      if (lastRelDir != null && subDir.startsWith(lastRelDir)) continue;
+      int startIx = skipLevels > 0 ? -1 : 0;
+      for (int i = 0; i < skipLevels; ++i) {
+        startIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
+        if (startIx == -1) {
+          Utilities.LOG14535.info("Expected level of nesting (" + skipLevels + ") is not "
+              + " present in " + subDir + " (from " + lfs.getPath() + ")");
+          break;
+        }
+      }
+      if (startIx == -1) continue;
+      int endIx = subDir.indexOf(Path.SEPARATOR_CHAR, startIx + 1);
+      if (endIx == -1) {
+        Utilities.LOG14535.info("Expected level of nesting (" + (skipLevels + 1) + ") is not "
+            + " present in " + subDir + " (from " + lfs.getPath() + ")");
+        continue;
+      }
+      lastRelDir = subDir = subDir.substring(0, endIx);
+      Path candidate = new Path(relRoot, subDir);
+      Utilities.LOG14535.info("Considering MM directory candidate " + candidate);
+      if (!filter.accept(candidate)) continue;
+      results.add(fs.makeQualified(candidate));
+    }
+    return results.toArray(new Path[results.size()]);
+  }
+
+  private static Path[] getMmDirectoryCandidatesGlobStatus(FileSystem fs,
+      Path path, int skipLevels, PathFilter filter, long mmWriteId) throws IOException {
     StringBuilder sb = new StringBuilder(path.toUri().getPath());
-    for (int i = 0; i < dpLevels + lbLevels; i++) {
+    for (int i = 0; i < skipLevels; i++) {
       sb.append(Path.SEPARATOR).append("*");
     }
     sb.append(Path.SEPARATOR).append(ValidWriteIds.getMmFilePrefix(mmWriteId));
-    Utilities.LOG14535.info("Looking for files via: " + sb.toString());
     Path pathPattern = new Path(path, sb.toString());
-    if (filter == null) {
-      // TODO: do we need this? Likely yes; we don't want mm_10 when we use ".../mm_1" pattern.
-      filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
-    }
-    return filter == null ? fs.globStatus(pathPattern) : fs.globStatus(pathPattern, filter);
+    Utilities.LOG14535.info("Looking for files via: " + pathPattern);
+    return statusToPath(fs.globStatus(pathPattern, filter));
   }
 
   private static void tryDeleteAllMmFiles(FileSystem fs, Path specPath, Path manifestDir,
       int dpLevels, int lbLevels, String unionSuffix, ValidWriteIds.IdPathFilter filter,
-      long mmWriteId) throws IOException {
-    FileStatus[] files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, mmWriteId);
+      long mmWriteId, Configuration conf) throws IOException {
+    Path[] files = getMmDirectoryCandidates(
+        fs, specPath, dpLevels, lbLevels, filter, mmWriteId, conf);
     if (files != null) {
-      for (FileStatus status : files) {
-        Utilities.LOG14535.info("Deleting " + status.getPath() + " on failure");
-        tryDelete(fs, status.getPath());
+      for (Path path : files) {
+        Utilities.LOG14535.info("Deleting " + path + " on failure");
+        tryDelete(fs, path);
       }
     }
     Utilities.LOG14535.info("Deleting " + manifestDir + " on failure");
@@ -3882,15 +3956,15 @@ public final class Utilities {
     if (!success) {
       ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
       tryDeleteAllMmFiles(fs, specPath, manifestDir, dpLevels, lbLevels,
-          unionSuffix, filter, mmWriteId);
+          unionSuffix, filter, mmWriteId, hconf);
       return;
     }
 
     Utilities.LOG14535.info("Looking for manifests in: " + manifestDir + " (" + mmWriteId + ")");
-    FileStatus[] files = fs.listStatus(manifestDir);
+    FileStatus[] manifestFiles = fs.listStatus(manifestDir);
     List<Path> manifests = new ArrayList<>();
-    if (files != null) {
-      for (FileStatus status : files) {
+    if (manifestFiles != null) {
+      for (FileStatus status : manifestFiles) {
         Path path = status.getPath();
         if (path.getName().endsWith(MANIFEST_EXTENSION)) {
           Utilities.LOG14535.info("Reading manifest " + path);
@@ -3901,21 +3975,13 @@ public final class Utilities {
 
     Utilities.LOG14535.info("Looking for files in: " + specPath);
     ValidWriteIds.IdPathFilter filter = new ValidWriteIds.IdPathFilter(mmWriteId, true);
-    files = getMmDirectoryCandidates(
-        fs, specPath, dpLevels, lbLevels, filter, mmWriteId);
-    ArrayList<FileStatus> mmDirectories = new ArrayList<>();
+    Path[] files = getMmDirectoryCandidates(
+        fs, specPath, dpLevels, lbLevels, filter, mmWriteId, hconf);
+    ArrayList<Path> mmDirectories = new ArrayList<>();
     if (files != null) {
-      for (FileStatus status : files) {
-        Path path = status.getPath();
+      for (Path path : files) {
         Utilities.LOG14535.info("Looking at path: " + path);
-        if (!status.isDirectory()) {
-          if (!path.getName().endsWith(MANIFEST_EXTENSION)) {
-            Utilities.LOG14535.warn("Unknown file found, deleting: " + path);
-            tryDelete(fs, path);
-          }
-        } else {
-          mmDirectories.add(status);
-        }
+        mmDirectories.add(path);
       }
     }
 
@@ -3944,8 +4010,8 @@ public final class Utilities {
       }
     }
 
-    for (FileStatus status : mmDirectories) {
-      cleanMmDirectory(status.getPath(), fs, unionSuffix, committed);
+    for (Path path : mmDirectories) {
+      cleanMmDirectory(path, fs, unionSuffix, committed);
     }
 
     if (!committed.isEmpty()) {
@@ -3957,7 +4023,12 @@ public final class Utilities {
     // TODO: see HIVE-14886 - removeTempOrDuplicateFiles is broken for list bucketing,
     //       so maintain parity here by not calling it at all.
     if (lbLevels != 0) return;
-    FileStatus[] finalResults = mmDirectories.toArray(new FileStatus[mmDirectories.size()]);
+    // Create fake file statuses to avoid querying the file system. removeTempOrDuplicateFiles
+    // doesn't need tocheck anything except path and directory status for MM directories.
+    FileStatus[] finalResults = new FileStatus[mmDirectories.size()];
+    for (int i = 0; i < mmDirectories.size(); ++i) {
+      finalResults[i] = new PathOnlyFileStatus(mmDirectories.get(i));
+    }
     List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
         fs, finalResults, dpLevels, mbc == null ? 0 : mbc.numBuckets, hconf, mmWriteId);
     // create empty buckets if necessary
@@ -3967,6 +4038,12 @@ public final class Utilities {
     }
   }
 
+  private static final class PathOnlyFileStatus extends FileStatus {
+    public PathOnlyFileStatus(Path path) {
+      super(0, true, 0, 0, 0, path);
+    }
+  }
+
   private static void cleanMmDirectory(Path dir, FileSystem fs,
       String unionSuffix, HashSet<String> committed) throws IOException, HiveException {
     for (FileStatus child : fs.listStatus(dir)) {
@@ -3975,7 +4052,6 @@ public final class Utilities {
         if (committed.remove(childPath.toString())) continue; // A good file.
         deleteUncommitedFile(childPath, fs);
       } else if (!child.isDirectory()) {
-        // TODO# needed? if (childPath.getName().endsWith(MANIFEST_EXTENSION)) continue;
         if (committed.contains(childPath.toString())) {
           throw new HiveException("Union FSOP has commited "
               + childPath + " outside of union directory" + unionSuffix);

http://git-wip-us.apache.org/repos/asf/hive/blob/0f7f4ed8/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 0a29895..c7ac452 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -1589,6 +1589,7 @@ public class Hive {
       List<Path> newFiles = null;
       PerfLogger perfLogger = SessionState.getPerfLogger();
       perfLogger.PerfLogBegin("MoveTask", "FileMoves");
+      // TODO: this assumes both paths are qualified; which they are, currently.
       if (mmWriteId != null && loadPath.equals(newPartPath)) {
         // MM insert query, move itself is a no-op.
         Utilities.LOG14535.info("not moving " + loadPath + " to " + newPartPath + " (MM)");
@@ -1705,7 +1706,7 @@ public class Hive {
     FileSystem srcFs;
     try {
       srcFs = loadPath.getFileSystem(conf);
-      srcs = srcFs.globStatus(loadPath);
+      srcs = srcFs.listStatus(loadPath);
     } catch (IOException e) {
       LOG.error("Error listing files", e);
       throw new HiveException(e);
@@ -1847,29 +1848,30 @@ private void constructOneLBLocationMap(FileStatus fSta,
     Set<Path> validPartitions = new HashSet<Path>();
     try {
       FileSystem fs = loadPath.getFileSystem(conf);
-      FileStatus[] leafStatus = null;
       if (mmWriteId == null) {
-        leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
+        FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP, fs);
+        // Check for empty partitions
+        for (FileStatus s : leafStatus) {
+          if (!s.isDirectory()) {
+            throw new HiveException("partition " + s.getPath() + " is not a directory!");
+          }
+          Path dpPath = s.getPath();
+          Utilities.LOG14535.info("Found DP " + dpPath);
+          validPartitions.add(dpPath);
+        }
       } else {
         // The non-MM path only finds new partitions, as it is looking at the temp path.
         // To produce the same effect, we will find all the partitions affected by this write ID.
-        leafStatus = Utilities.getMmDirectoryCandidates(
-            fs, loadPath, numDP, numLB, null, mmWriteId);
-      }
-      // Check for empty partitions
-      for (FileStatus s : leafStatus) {
-        if (mmWriteId == null && !s.isDirectory()) {
-          throw new HiveException("partition " + s.getPath() + " is not a directory!");
-        }
-        Path dpPath = s.getPath();
-        if (mmWriteId != null) {
-          dpPath = dpPath.getParent(); // Skip the MM directory that we have found.
+        Path[] leafStatus = Utilities.getMmDirectoryCandidates(
+            fs, loadPath, numDP, numLB, null, mmWriteId, conf);
+        for (Path p : leafStatus) {
+          Path dpPath = p.getParent(); // Skip the MM directory that we have found.
           for (int i = 0; i < numLB; ++i) {
             dpPath = dpPath.getParent(); // Now skip the LB directories, if any...
           }
+          Utilities.LOG14535.info("Found DP " + dpPath);
+          validPartitions.add(dpPath);
         }
-        Utilities.LOG14535.info("Found DP " + dpPath);
-        validPartitions.add(dpPath);
       }
     } catch (IOException e) {
       throw new HiveException(e);
@@ -2047,6 +2049,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
       newFiles = Collections.synchronizedList(new ArrayList<Path>());
     }
+    // TODO: this assumes both paths are qualified; which they are, currently.
     if (mmWriteId != null && loadPath.equals(tbl.getPath())) {
       Utilities.LOG14535.info("not moving " + loadPath + " to " + tbl.getPath());
       if (replace) {