You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/05/12 16:07:50 UTC

hive git commit: HIVE-13726 : Improve dynamic partition loading VI (Ashutosh Chauhan via Rui Li)

Repository: hive
Updated Branches:
  refs/heads/master 38797d212 -> 64c96e1e9


HIVE-13726 : Improve dynamic partition loading VI (Ashutosh Chauhan via Rui Li)

Signed-off-by: Ashutosh Chauhan <ha...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/64c96e1e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/64c96e1e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/64c96e1e

Branch: refs/heads/master
Commit: 64c96e1e92a490a069f9cc924b2ec476187f98ea
Parents: 38797d2
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Mon May 9 18:31:15 2016 -0700
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Thu May 12 08:59:12 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hive/common/FileUtils.java    | 58 +++-----------------
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 49 ++++++++++++++++-
 2 files changed, 54 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/64c96e1e/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
index b65c35b..5cf4d39 100644
--- a/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
+++ b/common/src/java/org/apache/hadoop/hive/common/FileUtils.java
@@ -50,7 +50,6 @@ import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Collection of file manipulation utilities common across Hive.
  */
@@ -575,73 +574,32 @@ public final class FileUtils {
   }
 
   /**
-   * Trashes or deletes all files under a directory. Leaves the directory as is.
-   * @param fs FileSystem to use
-   * @param f path of directory
-   * @param conf hive configuration
-   * @param forceDelete whether to force delete files if trashing does not succeed
-   * @return true if deletion successful
-   * @throws FileNotFoundException
-   * @throws IOException
-   */
-  public static boolean trashFilesUnderDir(FileSystem fs, Path f, Configuration conf,
-      boolean forceDelete) throws FileNotFoundException, IOException {
-    FileStatus[] statuses = fs.listStatus(f, HIDDEN_FILES_PATH_FILTER);
-    boolean result = true;
-    for (FileStatus status : statuses) {
-      result = result & moveToTrash(fs, status.getPath(), conf, forceDelete);
-    }
-    return result;
-  }
-
-  /**
-   * Move a particular file or directory to the trash. If for a certain reason the trashing fails
-   * it will force deletes the file or directory
-   * @param fs FileSystem to use
-   * @param f path of file or directory to move to trash.
-   * @param conf
-   * @return true if move successful
-   * @throws IOException
-   */
-  public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf) throws IOException {
-    return moveToTrash(fs, f, conf, true);
-  }
-
-  /**
    * Move a particular file or directory to the trash.
    * @param fs FileSystem to use
    * @param f path of file or directory to move to trash.
    * @param conf
-   * @param forceDelete whether force delete the file or directory if trashing fails
    * @return true if move successful
    * @throws IOException
    */
-  public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf, boolean forceDelete)
+  public static boolean moveToTrash(FileSystem fs, Path f, Configuration conf)
       throws IOException {
     LOG.debug("deleting  " + f);
-
     boolean result = false;
     try {
       result = Trash.moveToAppropriateTrash(fs, f, conf);
       if (result) {
-        LOG.info("Moved to trash: " + f);
+        LOG.trace("Moved to trash: " + f);
         return true;
       }
     } catch (IOException ioe) {
-      if (forceDelete) {
-        // for whatever failure reason including that trash has lower encryption zone
-        // retry with force delete
-        LOG.warn(ioe.getMessage() + "; Force to delete it.");
-      } else {
-        throw ioe;
-      }
+      // for whatever failure reason including that trash has lower encryption zone
+      // retry with force delete
+      LOG.warn(ioe.getMessage() + "; Force to delete it.");
     }
 
-    if (forceDelete) {
-      result = fs.delete(f, true);
-      if (!result) {
-        LOG.error("Failed to delete " + f);
-      }
+    result = fs.delete(f, true);
+    if (!result) {
+      LOG.error("Failed to delete " + f);
     }
 
     return result;

http://git-wip-us.apache.org/repos/asf/hive/blob/64c96e1e/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index b5e660b..dcfc2b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -44,6 +44,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -1528,7 +1529,7 @@ public class Hive {
       }
       List<Path> newFiles = null;
       if (replace || (oldPart == null && !isAcid)) {
-        Hive.replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
+        replaceFiles(tbl.getPath(), loadPath, newPartPath, oldPartPath, getConf(),
             isSrcLocal);
       } else {
         if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) {
@@ -3080,7 +3081,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
    * @param isSrcLocal
    *          If the source directory is LOCAL
    */
-  protected static void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
+  protected void replaceFiles(Path tablePath, Path srcf, Path destf, Path oldPath, HiveConf conf,
           boolean isSrcLocal) throws HiveException {
     try {
 
@@ -3114,7 +3115,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
               // existing content might result in incorrect (extra) data.
               // But not sure why we changed not to delete the oldPath in HIVE-8750 if it is
               // not the destf or its subdir?
-              oldPathDeleted = FileUtils.trashFilesUnderDir(fs2, oldPath, conf, true);
+              oldPathDeleted = trashFilesUnderDir(fs2, oldPath, conf);
             }
           }
         } catch (IOException e) {
@@ -3162,6 +3163,48 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
+
+  /**
+   * Trashes or deletes all files under a directory. Leaves the directory as is.
+   * @param fs FileSystem to use
+   * @param f path of directory
+   * @param conf hive configuration
+   * @param forceDelete whether to force delete files if trashing does not succeed
+   * @return true if deletion successful
+   * @throws IOException
+   */
+  private boolean trashFilesUnderDir(final FileSystem fs, Path f, final Configuration conf)
+      throws IOException {
+    FileStatus[] statuses = fs.listStatus(f, FileUtils.HIDDEN_FILES_PATH_FILTER);
+    boolean result = true;
+    final List<Future<Boolean>> futures = new LinkedList<>();
+    final ExecutorService pool = Executors.newFixedThreadPool(
+        conf.getInt(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT.varname, 25),
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Delete-Thread-%d").build());
+    final SessionState parentSession = SessionState.get();
+    for (final FileStatus status : statuses) {
+      futures.add(pool.submit(new Callable<Boolean>() {
+
+        @Override
+        public Boolean call() throws Exception {
+          SessionState.setCurrentSessionState(parentSession);
+          return FileUtils.moveToTrash(fs, status.getPath(), conf);
+        }
+      }));
+    }
+    pool.shutdown();
+    for (Future<Boolean> future : futures) {
+      try {
+        result &= future.get();
+      } catch (InterruptedException | ExecutionException e) {
+        LOG.error("Failed to delete: ",e);
+        pool.shutdownNow();
+        throw new IOException(e);
+      }
+    }
+    return result;
+  }
+
   public static boolean isHadoop1() {
     return ShimLoader.getMajorVersion().startsWith("0.20");
   }