You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2020/05/18 20:20:30 UTC

[hive] branch master updated: HIVE-23354: Remove file size sanity checking from compareTempOrDuplicateFiles (John Sherman, reviewed by Jesus Camacho Rodriguez)

This is an automated email from the ASF dual-hosted git repository.

jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 61c9b2e  HIVE-23354: Remove file size sanity checking from compareTempOrDuplicateFiles (John Sherman, reviewed by Jesus Camacho Rodriguez)
61c9b2e is described below

commit 61c9b2eebd7411c982e8f33e1fe27636f98897a0
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Mon May 18 13:19:21 2020 -0700

    HIVE-23354: Remove file size sanity checking from compareTempOrDuplicateFiles (John Sherman, reviewed by Jesus Camacho Rodriguez)
    
    Close apache/hive#1022
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |   5 +-
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  | 119 +++++++++++----------
 .../apache/hadoop/hive/ql/exec/mr/ExecDriver.java  |   7 +-
 .../apache/hadoop/hive/ql/exec/tez/DagUtils.java   |   9 +-
 .../hadoop/hive/ql/io/merge/MergeFileTask.java     |   3 +
 .../ql/io/rcfile/truncate/ColumnTruncateTask.java  |   3 +
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |   4 +
 .../apache/hadoop/hive/ql/exec/TestUtilities.java  |   4 +
 8 files changed, 87 insertions(+), 67 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f5ad3a8..5a39006 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -601,8 +601,9 @@ public class HiveConf extends Configuration {
     EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"),
     EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8,
         "How many jobs at most can be executed in parallel"),
-    HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true,
-        "Whether speculative execution for reducers should be turned on. "),
+    @Deprecated
+    HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", false,
+        "(Deprecated) Whether speculative execution for reducers should be turned on. "),
     HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L,
         "The interval with which to poll the JobTracker for the counters the running job. \n" +
         "The smaller it is the more load there will be on the jobtracker, the higher it is the less granular the caught will be."),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 0e4ce78..811fcc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1802,7 +1802,7 @@ public final class Utilities {
           }
         }
 
-        taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs);
+        taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs, hconf);
         if (filesKept != null && taskIDToFile != null) {
           addFilesToPathSet(taskIDToFile.values(), filesKept);
         }
@@ -1815,7 +1815,7 @@ public final class Utilities {
       }
       Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir);
       taskIDToFile = removeTempOrDuplicateFilesNonMm(
-          fs.listStatus(new Path(mmDir, unionSuffix)), fs);
+          fs.listStatus(new Path(mmDir, unionSuffix)), fs, hconf);
       if (filesKept != null && taskIDToFile != null) {
         addFilesToPathSet(taskIDToFile.values(), filesKept);
       }
@@ -1825,13 +1825,13 @@ public final class Utilities {
         return result;
       }
       if (!isMmTable) {
-        taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs);
+        taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs, hconf);
         if (filesKept != null && taskIDToFile != null) {
           addFilesToPathSet(taskIDToFile.values(), filesKept);
         }
       } else {
         Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir);
-        taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs);
+        taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs, hconf);
         if (filesKept != null && taskIDToFile != null) {
           addFilesToPathSet(taskIDToFile.values(), filesKept);
         }
@@ -1896,12 +1896,20 @@ public final class Utilities {
   }
 
   private static HashMap<String, FileStatus> removeTempOrDuplicateFilesNonMm(
-      FileStatus[] files, FileSystem fs) throws IOException {
+      FileStatus[] files, FileSystem fs, Configuration conf) throws IOException {
     if (files == null || fs == null) {
       return null;
     }
     HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
 
+    // This method currently does not support speculative execution due to
+    // compareTempOrDuplicateFiles not being able to de-duplicate speculative
+    // execution created files
+    if (isSpeculativeExecution(conf)) {
+      String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+      throw new IOException("Speculative execution is not supported for engine " + engine);
+    }
+
     for (FileStatus one : files) {
       if (isTempPath(one)) {
         Path onePath = one.getPath();
@@ -1912,31 +1920,62 @@ public final class Utilities {
         }
       } else {
         // This would be a single file. See if we need to remove it.
-        ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile);
+        ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile, conf);
       }
     }
     return taskIdToFile;
   }
 
   private static void ponderRemovingTempOrDuplicateFile(FileSystem fs,
-      FileStatus file, HashMap<String, FileStatus> taskIdToFile) throws IOException {
+      FileStatus file, HashMap<String, FileStatus> taskIdToFile, Configuration conf)
+      throws IOException {
     Path filePath = file.getPath();
     String taskId = getPrefixedTaskIdFromFilename(filePath.getName());
     Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles looking at {}"
           + ", taskId {}", filePath, taskId);
     FileStatus otherFile = taskIdToFile.get(taskId);
     taskIdToFile.put(taskId, (otherFile == null) ? file :
-      compareTempOrDuplicateFiles(fs, file, otherFile));
+      compareTempOrDuplicateFiles(fs, file, otherFile, conf));
+  }
+
+  private static boolean warnIfSet(Configuration conf, String value) {
+    if (conf.getBoolean(value, false)) {
+      LOG.warn(value + " support is currently deprecated");
+      return true;
+    }
+    return false;
+  }
+
+  private static boolean isSpeculativeExecution(Configuration conf) {
+    String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+    boolean isSpeculative = false;
+    if ("mr".equalsIgnoreCase(engine)) {
+      isSpeculative = warnIfSet(conf, "mapreduce.map.speculative") ||
+          warnIfSet(conf, "mapreduce.reduce.speculative") ||
+          warnIfSet(conf, "mapred.map.tasks.speculative.execution") ||
+          warnIfSet(conf, "mapred.reduce.tasks.speculative.execution");
+    } else if ("tez".equalsIgnoreCase(engine)) {
+      isSpeculative = warnIfSet(conf, "tez.am.speculation.enabled");
+    } // all other engines do not support speculative execution
+
+    return isSpeculative;
   }
 
   private static FileStatus compareTempOrDuplicateFiles(FileSystem fs,
-      FileStatus file, FileStatus existingFile) throws IOException {
-    // Pick the one with mewest attempt ID. For sanity, check the file sizes too.
-    // If the file size of newest attempt is less than that for older one,
-    // Throw an exception as it maybe a correctness issue causing it.
-    // This breaks speculative execution if it ends prematurely.
+      FileStatus file, FileStatus existingFile, Configuration conf) throws IOException {
+    // Pick the one with newest attempt ID. Previously, this function threw an
+    // exception when the file size of the newer attempt was less than the
+    // older attempt. This was an incorrect assumption due to various
+    // techniques like file compression and no guarantee that the new task will
+    // write values in the same order.
     FileStatus toDelete = null, toRetain = null;
 
+    // This method currently does not support speculative execution
+    if (isSpeculativeExecution(conf)) {
+      String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+      throw new IOException("Speculative execution is not supported for engine " + engine);
+    }
+
     // "LOAD .. INTO" and "INSERT INTO" commands will generate files with
     // "_copy_x" suffix. These files are usually read by map tasks and the
     // task output gets written to some tmp path. The output file names will
@@ -1950,68 +1989,38 @@ public final class Utilities {
     // elimination.
     Path filePath = file.getPath();
     if (isCopyFile(filePath.getName())) {
-      LOG.info("{} file identified as duplicate. This file is" +
-          " not deleted as it has copySuffix.", filePath);
+      LOG.info("{} file identified as duplicate. This file is"
+          + " not deleted as it has copySuffix.", filePath);
       return existingFile;
     }
 
     int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName());
     int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName());
-
-    long existingFileSz = getFileSizeRecursively(fs, existingFile);
-    long fileSz = getFileSizeRecursively(fs, file);
     // Files may come in any order irrespective of their attempt IDs
-    if (existingFileAttemptId > fileAttemptId &&
-        existingFileSz >= fileSz) {
+    if (existingFileAttemptId > fileAttemptId) {
       // keep existing
       toRetain = existingFile;
       toDelete = file;
-    } else if (existingFileAttemptId < fileAttemptId &&
-        existingFileSz <= fileSz) {
+    } else if (existingFileAttemptId < fileAttemptId) {
       // keep file
       toRetain = file;
       toDelete = existingFile;
     } else {
-      throw new IOException(" File " + filePath +
-        " with newer attempt ID " + fileAttemptId + " is smaller than the file "
-        + existingFile.getPath() + " with older attempt ID " + existingFileAttemptId);
+      throw new IOException(filePath + " has same attempt ID " + fileAttemptId + " as "
+          + existingFile.getPath());
     }
+
     if (!fs.delete(toDelete.getPath(), true)) {
-      throw new IOException(
-          "Unable to delete duplicate file: " + toDelete.getPath()
-              + ". Existing file: " + toRetain.getPath());
-    } else {
-      LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length "
-          + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length "
-          + toRetain.getLen());
+      throw new IOException("Unable to delete duplicate file: " + toDelete.getPath()
+          + ". Existing file: " + toRetain.getPath());
     }
 
+    LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length "
+        + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length "
+        + toRetain.getLen());
     return toRetain;
   }
 
-  // This function recurisvely fetches the size of all the files in given directory
-  private static long getFileSizeRecursively(FileSystem fs, FileStatus src)
-  throws IOException {
-    long size = 0;
-    if (src.isDirectory()) {
-      LOG.debug(" src " + src.getPath() + " is a directory");
-      // This is a directory.
-      try {
-        FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
-        // Recursively fetch sizes of each file
-        for (FileStatus file : files) {
-          size += getFileSizeRecursively(fs, file);
-        }
-      } catch (IOException e) {
-        throw new IOException("Unable to fetch files in directory " + src.getPath(), e);
-      }
-    } else {
-      size = src.getLen();
-      LOG.debug("src " + src.getPath() + " is a file of size " + size);
-    }
-    return size;
-  }
-
   public static boolean isCopyFile(String filename) {
     String taskId = filename;
     String copyFileSuffix = null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 8a8822d..2071de3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -278,10 +278,9 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
     // set input format information if necessary
     setInputAttributes(job);
 
-    // Turn on speculative execution for reducers
-    boolean useSpeculativeExecReducers = HiveConf.getBoolVar(job,
-        HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
-    job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, useSpeculativeExecReducers);
+    // HIVE-23354 enforces that MR speculative execution is disabled
+    job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+    job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
 
     String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 3e8ba08..97220c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -869,12 +869,9 @@ public class DagUtils {
 
     // Is this required ?
     conf.set("mapred.reducer.class", ExecReducer.class.getName());
-
-    boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
-        HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
-    conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE,
-        useSpeculativeExecReducers);
-
+    // HIVE-23354 enforces that MR speculative execution is disabled
+    conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE, false);
+    conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.MAP_SPECULATIVE, false);
     return conf;
   }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index 7fb3878..34519fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -99,6 +99,9 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
       job.setOutputKeyClass(NullWritable.class);
       job.setOutputValueClass(NullWritable.class);
       job.setNumReduceTasks(0);
+      // HIVE-23354 enforces that MR speculative execution is disabled
+      job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+      job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
 
       // create the temp directories
       Path outputPath = work.getOutputDir();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
index 0458c94..752eea9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
@@ -106,6 +106,9 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
 
     // zero reducers
     job.setNumReduceTasks(0);
+    // HIVE-23354 enforces that MR speculative execution is disabled
+    job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+    job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
 
     if (work.getMinSplitSize() != null) {
       HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 9410a96..05ea38c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskAttemptContext;
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hive.common.util.Ref;
@@ -328,6 +329,9 @@ public class CompactorMR {
     job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
     job.setLong(MIN_TXN, minTxn);
     job.setLong(MAX_TXN, maxTxn);
+    // HIVE-23354 enforces that MR speculative execution is disabled
+    job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+    job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
 
     // Add tokens for all the file system in the input path.
     ArrayList<Path> dirs = new ArrayList<>();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 163d439..04cfd9e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -199,6 +200,9 @@ public class TestUtilities {
     DynamicPartitionCtx dpCtx = getDynamicPartitionCtx(dPEnabled);
     Path tempDirPath = setupTempDirWithSingleOutputFile(hconf);
     FileSinkDesc conf = getFileSinkDesc(tempDirPath);
+    // HIVE-23354 enforces that MR speculative execution is disabled
+    hconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+    hconf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
 
     List<Path> paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false);