You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jd...@apache.org on 2017/11/07 18:20:30 UTC

hive git commit: HIVE-17963: Fix for HIVE-17113 can be improved for non-blobstore filesystems (Jason Dere, reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 76485de6c -> 71a92c2a2


HIVE-17963: Fix for HIVE-17113 can be improved for non-blobstore filesystems (Jason Dere, reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/71a92c2a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/71a92c2a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/71a92c2a

Branch: refs/heads/master
Commit: 71a92c2a2d2f2b092bfe4a81f4d04859f6f784f4
Parents: 76485de
Author: Jason Dere <jd...@hortonworks.com>
Authored: Tue Nov 7 10:19:54 2017 -0800
Committer: Jason Dere <jd...@hortonworks.com>
Committed: Tue Nov 7 10:19:54 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  5 ---
 .../apache/hadoop/hive/ql/exec/Utilities.java   | 40 +++++++++++++++++++-
 .../insert_with_move_files_from_source_dir.q    |  4 +-
 ql/src/test/queries/clientpositive/skewjoin.q   |  1 -
 4 files changed, 40 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/71a92c2a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 305e9dc..a26ea21 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3592,11 +3592,6 @@ public class HiveConf extends Configuration {
     HIVE_EXEC_INPUT_LISTING_MAX_THREADS("hive.exec.input.listing.max.threads", 0, new  SizeValidator(0L, true, 1024L, true),
         "Maximum number of threads that Hive uses to list file information from file systems (recommended > 1 for blobstore)."),
 
-    HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR("hive.exec.move.files.from.source.dir", false,
-        "When moving/renaming a directory from source to destination, individually move each \n" +
-        "file in the source directory, rather than renaming the source directory. This may \n" +
-        "help protect against files written to temp directories by runaway task attempts."),
-
     /* BLOBSTORE section */
 
     HIVE_BLOBSTORE_SUPPORTED_SCHEMES("hive.blobstore.supported.schemes", "s3,s3a,s3n",

http://git-wip-us.apache.org/repos/asf/hive/blob/71a92c2a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index b78c930..71fa42c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -93,6 +93,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.common.BlobStorageUtils;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
@@ -1451,11 +1452,43 @@ public final class Utilities {
       boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
       Reporter reporter) throws IOException,
       HiveException {
-    
+
+    //
+    // Runaway task attempts (which are unable to be killed by MR/YARN) can cause HIVE-17113,
+    // where they can write duplicate output files to tmpPath after de-duplicating the files,
+    // but before tmpPath is moved to specPath.
+    // Fixing this issue will be done differently for blobstore (e.g. S3)
+    // vs non-blobstore (local filesystem, HDFS) filesystems due to differences in
+    // implementation - a directory move in a blobstore effectively results in file-by-file
+    // moves for every file in a directory, while in HDFS/localFS a directory move is just a
+    // single filesystem operation.
+    // - For non-blobstore FS, do the following:
+    //   1) Rename tmpPath to a new directory name to prevent additional files
+    //      from being added by runaway processes.
+    //   2) Remove duplicates from the temp directory
+    //   3) Rename/move the temp directory to specPath
+    //
+    // - For blobstore FS, do the following:
+    //   1) Remove duplicates from tmpPath
+    //   2) Use moveSpecifiedFiles() to perform a file-by-file move of the de-duped files
+    //      to specPath. On blobstore FS, assuming n files in the directory, this results
+    //      in n file moves, compared to 2*n file moves with the previous solution
+    //      (each directory move would result in a file-by-file move of the files in the directory)
+    //
     FileSystem fs = specPath.getFileSystem(hconf);
+    boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
     if (success) {
+      if (!isBlobStorage && fs.exists(tmpPath)) {
+        //   1) Rename tmpPath to a new directory name to prevent additional files
+        //      from being added by runaway processes.
+        Path tmpPathOriginal = tmpPath;
+        tmpPath = new Path(tmpPath.getParent(), tmpPath.getName() + ".moved");
+        Utilities.rename(fs, tmpPathOriginal, tmpPath);
+      }
+
+      // Remove duplicates from tmpPath
       FileStatus[] statuses = HiveStatsUtils.getFileStatusRecurse(
           tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
       if(statuses != null && statuses.length > 0) {
@@ -1474,15 +1507,18 @@ public final class Utilities {
           filesKept.addAll(emptyBuckets);
           perfLogger.PerfLogEnd("FileSinkOperator", "CreateEmptyBuckets");
         }
+
         // move to the file destination
         Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath);
 
         perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
-        if (HiveConf.getBoolVar(hconf, HiveConf.ConfVars.HIVE_EXEC_MOVE_FILES_FROM_SOURCE_DIR)) {
+        if (isBlobStorage) {
           // HIVE-17113 - avoid copying files that may have been written to the temp dir by runaway tasks,
           // by moving just the files we've tracked from removeTempOrDuplicateFiles().
           Utilities.moveSpecifiedFiles(fs, tmpPath, specPath, filesKept);
         } else {
+          // For non-blobstore case, can just move the directory - the initial directory rename
+          // at the start of this method should prevent files written by runaway tasks.
           Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
         }
         perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles");

http://git-wip-us.apache.org/repos/asf/hive/blob/71a92c2a/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q b/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q
index 0117755..16d74d5 100644
--- a/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q
+++ b/ql/src/test/queries/clientpositive/insert_with_move_files_from_source_dir.q
@@ -1,5 +1,4 @@
 
-set hive.exec.move.files.from.source.dir=true;
 set hive.enforce.bucketing=true;
 set hive.exec.dynamic.partition=true;
 set hive.exec.dynamic.partition.mode=nonstrict;
@@ -8,7 +7,8 @@ create table emp1 (id int, name string, dept int, country string) row format del
 load data local inpath '../../data/files/employee_part.txt' overwrite into table emp1;
 select * from emp1 order by id;
 
--- Testing inserts with hive.exec.move.files.from.source.dir=true
+set hive.blobstore.supported.schemes=pfile;
+-- Setting pfile to be treated as blobstore to test mvFileToFinalPath() behavior for blobstore case
 -- inserts into non-partitioned/non-bucketed table
 create table emp2 (id int, name string, dept int, country string) stored as textfile;
 insert overwrite table emp2 select * from emp1;

http://git-wip-us.apache.org/repos/asf/hive/blob/71a92c2a/ql/src/test/queries/clientpositive/skewjoin.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientpositive/skewjoin.q b/ql/src/test/queries/clientpositive/skewjoin.q
index 9ad4b77..e4b178a 100644
--- a/ql/src/test/queries/clientpositive/skewjoin.q
+++ b/ql/src/test/queries/clientpositive/skewjoin.q
@@ -2,7 +2,6 @@ set hive.mapred.mode=nonstrict;
 set hive.explain.user=false;
 set hive.optimize.skewjoin = true;
 set hive.skewjoin.key = 2;
-set hive.exec.move.files.from.source.dir=true;
 
 -- SORT_QUERY_RESULTS