You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2016/03/29 20:27:28 UTC

hive git commit: HIVE-12988 : Improve dynamic partition loading IV (Ashutosh Chauhan via Prasanth J)

Repository: hive
Updated Branches:
  refs/heads/master 1de97bc5f -> a14ef8abe


HIVE-12988 : Improve dynamic partition loading IV (Ashutosh Chauhan via Prasanth J)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a14ef8ab
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a14ef8ab
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a14ef8ab

Branch: refs/heads/master
Commit: a14ef8abe1df1516b8b9f486030bc3d584f940a9
Parents: 1de97bc
Author: Ashutosh Chauhan <ha...@apache.org>
Authored: Tue Feb 2 18:03:44 2016 -0800
Committer: Ashutosh Chauhan <ha...@apache.org>
Committed: Tue Mar 29 11:27:12 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   4 +-
 .../apache/hadoop/hive/ql/metadata/Hive.java    | 252 +++++++++++--------
 .../org/apache/hadoop/fs/ProxyFileSystem.java   |   5 +-
 3 files changed, 155 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index b8870f2..f03c1ab 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2326,6 +2326,8 @@ public class HiveConf extends Configuration {
     HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist", "set,reset,dfs,add,list,delete,reload,compile",
         "Comma separated list of non-SQL Hive commands users are authorized to execute"),
 
+     HIVE_MOVE_FILES_THREAD_COUNT("hive.mv.files.thread", 25, new  SizeValidator(1L, true, 1024L, true), "Number of threads"
+         + " used to move files in move task"),
     // If this is set all move tasks at the end of a multi-insert query will only begin once all
     // outputs are ready
     HIVE_MULTI_INSERT_MOVE_TASKS_SHARE_DEPENDENCIES(
@@ -2771,7 +2773,7 @@ public class HiveConf extends Configuration {
     SPARK_RPC_SASL_MECHANISM("hive.spark.client.rpc.sasl.mechanisms", "DIGEST-MD5",
       "Name of the SASL mechanism to use for authentication."),
     SPARK_RPC_SERVER_ADDRESS("hive.spark.client.rpc.server.address", "",
-      "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " + 
+      "The server address of HiverServer2 host to be used for communication between Hive client and remote Spark driver. " +
       "Default is empty, which means the address will be determined in the same way as for hive.server2.thrift.bind.host." +
       "This is only necessary if the host has mutiple network addresses and if a different network address other than " +
       "hive.server2.thrift.bind.host is to be used."),

http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index 6d27f55..c27481f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -32,19 +32,25 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.ImmutableMap;
+
 import javax.jdo.JDODataStoreException;
 
 import org.apache.hadoop.conf.Configuration;
@@ -132,6 +138,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * This class has functions that implement meta data/DDL operations using calls
@@ -1504,7 +1511,7 @@ public class Hive {
             isSrcLocal);
       } else {
         if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary() && oldPart != null) {
-          newFiles = new ArrayList<>();
+          newFiles = Collections.synchronizedList(new ArrayList<Path>());
         }
 
         FileSystem fs = tbl.getDataLocation().getFileSystem(conf);
@@ -1751,9 +1758,13 @@ private void constructOneLBLocationMap(FileStatus fSta,
   public void loadTable(Path loadPath, String tableName, boolean replace,
       boolean isSrcLocal, boolean isSkewedStoreAsSubdir, boolean isAcid)
       throws HiveException {
-    List<Path> newFiles = new ArrayList<Path>();
+
+    List<Path> newFiles = null;
     Table tbl = getTable(tableName);
     HiveConf sessionConf = SessionState.getSessionConf();
+    if (conf.getBoolVar(ConfVars.FIRE_EVENTS_FOR_DML) && !tbl.isTemporary()) {
+      newFiles = Collections.synchronizedList(new ArrayList<Path>());
+    }
     if (replace) {
       Path tableDest = tbl.getPath();
       replaceFiles(tableDest, loadPath, tableDest, tableDest, sessionConf, isSrcLocal);
@@ -2579,75 +2590,91 @@ private void constructOneLBLocationMap(FileStatus fSta,
     }
   }
 
-  // for each file or directory in 'srcs', make mapping for every file in src to safe name in dest
-  private static List<List<Path[]>> checkPaths(HiveConf conf, FileSystem fs,
-      FileStatus[] srcs, FileSystem srcFs, Path destf, boolean replace)
-      throws HiveException {
+  private static void copyFiles(final HiveConf conf, final FileSystem destFs,
+      FileStatus[] srcs, final FileSystem srcFs, final Path destf, final boolean isSrcLocal, final List<Path> newFiles)
+          throws HiveException {
 
-    List<List<Path[]>> result = new ArrayList<List<Path[]>>();
+    final HadoopShims.HdfsFileStatus fullDestStatus;
     try {
-      FileStatus destStatus = !replace ? FileUtils.getFileStatusOrNull(fs, destf) : null;
-      if (destStatus != null && !destStatus.isDir()) {
-        throw new HiveException("checkPaths: destination " + destf
-            + " should be a directory");
-      }
-      for (FileStatus src : srcs) {
-        FileStatus[] items;
-        if (src.isDir()) {
-          items = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
-          Arrays.sort(items);
+      fullDestStatus = ShimLoader.getHadoopShims().getFullFileStatus(conf, destFs, destf);
+    } catch (IOException e1) {
+      throw new HiveException(e1);
+    }
+
+    if (!fullDestStatus.getFileStatus().isDirectory()) {
+      throw new HiveException(destf + " is not a directory.");
+    }
+    final boolean inheritPerms = HiveConf.getBoolVar(conf,
+        HiveConf.ConfVars.HIVE_WAREHOUSE_SUBDIR_INHERIT_PERMS);
+    final List<Future<ObjectPair<Path, Path>>> futures = new LinkedList<>();
+    final ExecutorService pool = Executors.newFixedThreadPool(
+        conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
+        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
+
+    for (FileStatus src : srcs) {
+      FileStatus[] files;
+      if (src.isDirectory()) {
+        try {
+          files = srcFs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
+        } catch (IOException e) {
+          pool.shutdownNow();
+          throw new HiveException(e);
+        }
+      } else {
+        files = new FileStatus[] {src};
+      }
+
+      for (FileStatus srcFile : files) {
+
+        final Path srcP = srcFile.getPath();
+        final boolean needToCopy = needToCopy(srcP, destf, srcFs, destFs);
+        // Strip off the file type, if any so we don't make:
+        // 000000_0.gz -> 000000_0.gz_copy_1
+        final String name;
+        final String filetype;
+        String itemName = srcP.getName();
+        int index = itemName.lastIndexOf('.');
+        if (index >= 0) {
+          filetype = itemName.substring(index);
+          name = itemName.substring(0, index);
         } else {
-          items = new FileStatus[] {src};
+          name = itemName;
+          filetype = "";
         }
-
-        List<Path[]> srcToDest = new ArrayList<Path[]>();
-        for (FileStatus item : items) {
-
-          Path itemSource = item.getPath();
-
-          if (Utilities.isTempPath(item)) {
-            // This check is redundant because temp files are removed by
-            // execution layer before
-            // calling loadTable/Partition. But leaving it in just in case.
-            srcFs.delete(itemSource, true);
-            continue;
-          }
-
-          Path itemDest = new Path(destf, itemSource.getName());
-
-          if (!replace) {
-            // Strip off the file type, if any so we don't make:
-            // 000000_0.gz -> 000000_0.gz_copy_1
-            String name = itemSource.getName();
-            String filetype;
-            int index = name.lastIndexOf('.');
-            if (index >= 0) {
-              filetype = name.substring(index);
-              name = name.substring(0, index);
+        futures.add(pool.submit(new Callable<ObjectPair<Path, Path>>() {
+          @Override
+          public ObjectPair<Path, Path> call() throws Exception {
+            Path destPath = new Path(destf, srcP.getName());
+            if (!needToCopy && !isSrcLocal) {
+              for (int counter = 1; !destFs.rename(srcP,destPath); counter++) {
+                destPath = new Path(destf, name + ("_copy_" + counter) + filetype);
+              }
             } else {
-              filetype = "";
+              destPath = mvFile(conf, srcP, destPath, isSrcLocal, srcFs, destFs, name, filetype);
             }
-            // It's possible that the file we're copying may have the same
-            // relative name as an existing file in the "destf" directory.
-            // So let's make a quick check to see if we can rename any
-            // potential offenders so as to allow them to move into the
-            // "destf" directory. The scheme is dead simple: simply tack
-            // on "_copy_N" where N starts at 1 and works its way up until
-            // we find a free space.
-
-            // removed source file staging.. it's more confusing when failed.
-            for (int counter = 1; fs.exists(itemDest) || destExists(result, itemDest); counter++) {
-              itemDest = new Path(destf, name + ("_copy_" + counter) + filetype);
+
+            if (inheritPerms) {
+              ShimLoader.getHadoopShims().setFullFileStatus(conf, fullDestStatus, destFs, destf);
             }
+            if (null != newFiles) {
+              newFiles.add(destPath);
+            }
+            return ObjectPair.create(srcP, destPath);
           }
-          srcToDest.add(new Path[]{itemSource, itemDest});
-        }
-        result.add(srcToDest);
+        }));
+      }
+    }
+    pool.shutdown();
+    for (Future<ObjectPair<Path, Path>> future : futures) {
+      try {
+        ObjectPair<Path, Path> pair = future.get();
+        LOG.debug("Moved src: {}", pair.getFirst().toString(), ", to dest: {}", pair.getSecond().toString());
+      } catch (Exception e) {
+        LOG.error("Failed to move: {}", e.getMessage());
+        pool.shutdownNow();
+        throw new HiveException(e.getCause());
       }
-    } catch (IOException e) {
-      throw new HiveException("checkPaths: filesystem error in check phase. " + e.getMessage(), e);
     }
-    return result;
   }
 
   private static boolean destExists(List<List<Path[]>> result, Path proposed) {
@@ -2704,14 +2731,34 @@ private void constructOneLBLocationMap(FileStatus fSta,
     return ShimLoader.getHadoopShims().getPathWithoutSchemeAndAuthority(path).toString();
   }
 
+  private static Path mvFile(HiveConf conf, Path srcf, Path destf, boolean isSrcLocal,
+      FileSystem srcFs, FileSystem destFs, String srcName, String filetype) throws IOException {
+
+    for (int counter = 1; destFs.exists(destf); counter++) {
+      destf = new Path(destf.getParent(), srcName + ("_copy_" + counter) + filetype);
+    }
+    if (isSrcLocal) {
+      // For local src file, copy to hdfs
+      destFs.copyFromLocalFile(srcf, destf);
+    } else {
+      //copy if across file system or encryption zones.
+      LOG.info("Copying source " + srcf + " to " + destf + " because HDFS encryption zones are different.");
+      FileUtils.copy(srcFs, srcf, destFs, destf,
+          true,    // delete source
+          false, // overwrite destination
+          conf);
+    }
+    return destf;
+  }
+
   //it is assumed that parent directory of the destf should already exist when this
   //method is called. when the replace value is true, this method works a little different
   //from mv command if the destf is a directory, it replaces the destf instead of moving under
   //the destf. in this case, the replaced destf still preserves the original destf's permission
-  public static boolean moveFile(HiveConf conf, Path srcf, Path destf,
+  public static boolean moveFile(HiveConf conf, Path srcf, final Path destf,
       boolean replace, boolean isSrcLocal) throws HiveException {
     boolean success = false;
-    FileSystem srcFs, destFs;
+    final FileSystem srcFs, destFs;
     try {
       destFs = destf.getFileSystem(conf);
     } catch (IOException e) {
@@ -2775,31 +2822,38 @@ private void constructOneLBLocationMap(FileStatus fSta,
             FileStatus[] srcs = destFs.listStatus(srcf, FileUtils.HIDDEN_FILES_PATH_FILTER);
             if (srcs.length == 0) {
               success = true; // Nothing to move.
-            }
-            /* Move files one by one because source is a subdirectory of destination */
-            for (FileStatus status : srcs) {
-              Path destFile;
-
-              /* Append the source filename to the destination directory */
-              if (destFs.isDirectory(destf)) {
-                destFile = new Path(destf, status.getPath().getName());
-              } else {
-                destFile = destf;
+            } else {
+              List<Future<Boolean>> futures = new LinkedList<>();
+              final ExecutorService pool = Executors.newFixedThreadPool(
+                  conf.getIntVar(ConfVars.HIVE_MOVE_FILES_THREAD_COUNT),
+                  new ThreadFactoryBuilder().setDaemon(true).setNameFormat("MoveDir-Thread-%d").build());
+              /* Move files one by one because source is a subdirectory of destination */
+              for (final FileStatus status : srcs) {
+                futures.add(pool.submit(new Callable<Boolean>() {
+                  @Override
+                  public Boolean call() throws Exception {
+                    return destFs.rename(status.getPath(), destf);
+                  }
+                }));
               }
-
-              // Destination should be replaced, so we delete it first
-              if (destFs.exists(destFile)) {
-                if (!destFs.delete(destFile, true)) {
-                  throw new HiveException(String.format("File to replace could not be deleted: %s", destFile));
+              pool.shutdown();
+              boolean allFutures = true;
+              for (Future<Boolean> future : futures) {
+                try {
+                  Boolean result = future.get();
+                  allFutures &= result;
+                  if (!result) {
+                    LOG.debug("Failed to rename.");
+                    pool.shutdownNow();
+                  }
+                } catch (Exception e) {
+                  LOG.debug("Failed to rename.", e.getMessage());
+                  pool.shutdownNow();
+                  throw new HiveException(e.getCause());
                 }
               }
-
-              if (!(destFs.rename(status.getPath(), destFile))) {
-                throw new HiveException("Unable to move source " + status.getPath() + " to destination " + destf);
-              }
+              success = allFutures;
             }
-
-            success = true;
           } else {
             success = destFs.rename(srcf, destf);
           }
@@ -2825,8 +2879,9 @@ private void constructOneLBLocationMap(FileStatus fSta,
   /**
    * If moving across different FileSystems or differnent encryption zone, need to do a File copy instead of rename.
    * TODO- consider if need to do this for different file authority.
+   * @throws HiveException
    */
-  static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException, IOException {
+  static protected boolean needToCopy(Path srcf, Path destf, FileSystem srcFs, FileSystem destFs) throws HiveException {
     //Check if different FileSystems
     if (!srcFs.getClass().equals(destFs.getClass())) {
       return true;
@@ -2834,8 +2889,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
 
     //Check if different encryption zones
     HadoopShims.HdfsEncryptionShim hdfsEncryptionShim = SessionState.get().getHdfsEncryptionShim();
-    return hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
-      && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf);
+    try {
+      return hdfsEncryptionShim != null && (hdfsEncryptionShim.isPathEncrypted(srcf) || hdfsEncryptionShim.isPathEncrypted(destf))
+        && !hdfsEncryptionShim.arePathsOnSameEncryptionZone(srcf, destf);
+    } catch (IOException e) {
+      throw new HiveException(e);
+    }
   }
 
   /**
@@ -2886,22 +2945,7 @@ private void constructOneLBLocationMap(FileStatus fSta,
     if (isAcid) {
       moveAcidFiles(srcFs, srcs, destf, newFiles);
     } else {
-    // check that source and target paths exist
-      List<List<Path[]>> result = checkPaths(conf, fs, srcs, srcFs, destf, false);
-      // move it, move it
-      try {
-        for (List<Path[]> sdpairs : result) {
-          for (Path[] sdpair : sdpairs) {
-            if (!moveFile(conf, sdpair[0], sdpair[1], false, isSrcLocal)) {
-              throw new IOException("Cannot move " + sdpair[0] + " to "
-                  + sdpair[1]);
-            }
-            if (newFiles != null) newFiles.add(sdpair[1]);
-          }
-        }
-      } catch (IOException e) {
-        throw new HiveException("copyFiles: error while moving files!!! " + e.getMessage(), e);
-      }
+      copyFiles(conf, fs, srcs, srcFs, destf, isSrcLocal, newFiles);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/a14ef8ab/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
----------------------------------------------------------------------
diff --git a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
index cb1e2b7..2c37a51 100644
--- a/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
+++ b/shims/common/src/main/java/org/apache/hadoop/fs/ProxyFileSystem.java
@@ -82,6 +82,7 @@ public class ProxyFileSystem extends FilterFileSystem {
    * @return
    * @throws IOException
    */
+  @Override
   public Path resolvePath(final Path p) throws IOException {
     // Return the fully-qualified path of path f resolving the path
     // through any symlinks or mount point
@@ -174,7 +175,9 @@ public class ProxyFileSystem extends FilterFileSystem {
 
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
-    return super.rename(swizzleParamPath(src), swizzleParamPath(dst));
+    Path dest = swizzleParamPath(dst);
+    // Make sure for existing destination we return false as per FileSystem api contract
+    return super.isFile(dest) ? false : super.rename(swizzleParamPath(src), dest);
   }
 
   @Override