You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jc...@apache.org on 2020/05/18 20:20:30 UTC
[hive] branch master updated: HIVE-23354: Remove file size sanity
checking from compareTempOrDuplicateFiles (John Sherman,
reviewed by Jesus Camacho Rodriguez)
This is an automated email from the ASF dual-hosted git repository.
jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 61c9b2e HIVE-23354: Remove file size sanity checking from compareTempOrDuplicateFiles (John Sherman, reviewed by Jesus Camacho Rodriguez)
61c9b2e is described below
commit 61c9b2eebd7411c982e8f33e1fe27636f98897a0
Author: John Sherman <jf...@cloudera.com>
AuthorDate: Mon May 18 13:19:21 2020 -0700
HIVE-23354: Remove file size sanity checking from compareTempOrDuplicateFiles (John Sherman, reviewed by Jesus Camacho Rodriguez)
Close apache/hive#1022
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 119 +++++++++++----------
.../apache/hadoop/hive/ql/exec/mr/ExecDriver.java | 7 +-
.../apache/hadoop/hive/ql/exec/tez/DagUtils.java | 9 +-
.../hadoop/hive/ql/io/merge/MergeFileTask.java | 3 +
.../ql/io/rcfile/truncate/ColumnTruncateTask.java | 3 +
.../hadoop/hive/ql/txn/compactor/CompactorMR.java | 4 +
.../apache/hadoop/hive/ql/exec/TestUtilities.java | 4 +
8 files changed, 87 insertions(+), 67 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f5ad3a8..5a39006 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -601,8 +601,9 @@ public class HiveConf extends Configuration {
EXECPARALLEL("hive.exec.parallel", false, "Whether to execute jobs in parallel"),
EXECPARALLETHREADNUMBER("hive.exec.parallel.thread.number", 8,
"How many jobs at most can be executed in parallel"),
- HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", true,
- "Whether speculative execution for reducers should be turned on. "),
+ @Deprecated
+ HIVESPECULATIVEEXECREDUCERS("hive.mapred.reduce.tasks.speculative.execution", false,
+ "(Deprecated) Whether speculative execution for reducers should be turned on. "),
HIVECOUNTERSPULLINTERVAL("hive.exec.counters.pull.interval", 1000L,
"The interval with which to poll the JobTracker for the counters the running job. \n" +
"The smaller it is the more load there will be on the jobtracker, the higher it is the less granular the caught will be."),
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 0e4ce78..811fcc0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1802,7 +1802,7 @@ public final class Utilities {
}
}
- taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs);
+ taskIDToFile = removeTempOrDuplicateFilesNonMm(items, fs, hconf);
if (filesKept != null && taskIDToFile != null) {
addFilesToPathSet(taskIDToFile.values(), filesKept);
}
@@ -1815,7 +1815,7 @@ public final class Utilities {
}
Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir);
taskIDToFile = removeTempOrDuplicateFilesNonMm(
- fs.listStatus(new Path(mmDir, unionSuffix)), fs);
+ fs.listStatus(new Path(mmDir, unionSuffix)), fs, hconf);
if (filesKept != null && taskIDToFile != null) {
addFilesToPathSet(taskIDToFile.values(), filesKept);
}
@@ -1825,13 +1825,13 @@ public final class Utilities {
return result;
}
if (!isMmTable) {
- taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs);
+ taskIDToFile = removeTempOrDuplicateFilesNonMm(fileStats, fs, hconf);
if (filesKept != null && taskIDToFile != null) {
addFilesToPathSet(taskIDToFile.values(), filesKept);
}
} else {
Path mmDir = extractNonDpMmDir(writeId, stmtId, fileStats, isBaseDir);
- taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs);
+ taskIDToFile = removeTempOrDuplicateFilesNonMm(fs.listStatus(mmDir), fs, hconf);
if (filesKept != null && taskIDToFile != null) {
addFilesToPathSet(taskIDToFile.values(), filesKept);
}
@@ -1896,12 +1896,20 @@ public final class Utilities {
}
private static HashMap<String, FileStatus> removeTempOrDuplicateFilesNonMm(
- FileStatus[] files, FileSystem fs) throws IOException {
+ FileStatus[] files, FileSystem fs, Configuration conf) throws IOException {
if (files == null || fs == null) {
return null;
}
HashMap<String, FileStatus> taskIdToFile = new HashMap<String, FileStatus>();
+ // This method currently does not support speculative execution due to
+ // compareTempOrDuplicateFiles not being able to de-duplicate speculative
+ // execution created files
+ if (isSpeculativeExecution(conf)) {
+ String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+ throw new IOException("Speculative execution is not supported for engine " + engine);
+ }
+
for (FileStatus one : files) {
if (isTempPath(one)) {
Path onePath = one.getPath();
@@ -1912,31 +1920,62 @@ public final class Utilities {
}
} else {
// This would be a single file. See if we need to remove it.
- ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile);
+ ponderRemovingTempOrDuplicateFile(fs, one, taskIdToFile, conf);
}
}
return taskIdToFile;
}
private static void ponderRemovingTempOrDuplicateFile(FileSystem fs,
- FileStatus file, HashMap<String, FileStatus> taskIdToFile) throws IOException {
+ FileStatus file, HashMap<String, FileStatus> taskIdToFile, Configuration conf)
+ throws IOException {
Path filePath = file.getPath();
String taskId = getPrefixedTaskIdFromFilename(filePath.getName());
Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles looking at {}"
+ ", taskId {}", filePath, taskId);
FileStatus otherFile = taskIdToFile.get(taskId);
taskIdToFile.put(taskId, (otherFile == null) ? file :
- compareTempOrDuplicateFiles(fs, file, otherFile));
+ compareTempOrDuplicateFiles(fs, file, otherFile, conf));
+ }
+
+ private static boolean warnIfSet(Configuration conf, String value) {
+ if (conf.getBoolean(value, false)) {
+ LOG.warn(value + " support is currently deprecated");
+ return true;
+ }
+ return false;
+ }
+
+ private static boolean isSpeculativeExecution(Configuration conf) {
+ String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+ boolean isSpeculative = false;
+ if ("mr".equalsIgnoreCase(engine)) {
+ isSpeculative = warnIfSet(conf, "mapreduce.map.speculative") ||
+ warnIfSet(conf, "mapreduce.reduce.speculative") ||
+ warnIfSet(conf, "mapred.map.tasks.speculative.execution") ||
+ warnIfSet(conf, "mapred.reduce.tasks.speculative.execution");
+ } else if ("tez".equalsIgnoreCase(engine)) {
+ isSpeculative = warnIfSet(conf, "tez.am.speculation.enabled");
+ } // all other engines do not support speculative execution
+
+ return isSpeculative;
}
private static FileStatus compareTempOrDuplicateFiles(FileSystem fs,
- FileStatus file, FileStatus existingFile) throws IOException {
- // Pick the one with mewest attempt ID. For sanity, check the file sizes too.
- // If the file size of newest attempt is less than that for older one,
- // Throw an exception as it maybe a correctness issue causing it.
- // This breaks speculative execution if it ends prematurely.
+ FileStatus file, FileStatus existingFile, Configuration conf) throws IOException {
+ // Pick the one with newest attempt ID. Previously, this function threw an
+ // exception when the file size of the newer attempt was less than the
+ // older attempt. This was an incorrect assumption due to various
+ // techniques like file compression and no guarantee that the new task will
+ // write values in the same order.
FileStatus toDelete = null, toRetain = null;
+ // This method currently does not support speculative execution
+ if (isSpeculativeExecution(conf)) {
+ String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
+ throw new IOException("Speculative execution is not supported for engine " + engine);
+ }
+
// "LOAD .. INTO" and "INSERT INTO" commands will generate files with
// "_copy_x" suffix. These files are usually read by map tasks and the
// task output gets written to some tmp path. The output file names will
@@ -1950,68 +1989,38 @@ public final class Utilities {
// elimination.
Path filePath = file.getPath();
if (isCopyFile(filePath.getName())) {
- LOG.info("{} file identified as duplicate. This file is" +
- " not deleted as it has copySuffix.", filePath);
+ LOG.info("{} file identified as duplicate. This file is"
+ + " not deleted as it has copySuffix.", filePath);
return existingFile;
}
int existingFileAttemptId = getAttemptIdFromFilename(existingFile.getPath().getName());
int fileAttemptId = getAttemptIdFromFilename(file.getPath().getName());
-
- long existingFileSz = getFileSizeRecursively(fs, existingFile);
- long fileSz = getFileSizeRecursively(fs, file);
// Files may come in any order irrespective of their attempt IDs
- if (existingFileAttemptId > fileAttemptId &&
- existingFileSz >= fileSz) {
+ if (existingFileAttemptId > fileAttemptId) {
// keep existing
toRetain = existingFile;
toDelete = file;
- } else if (existingFileAttemptId < fileAttemptId &&
- existingFileSz <= fileSz) {
+ } else if (existingFileAttemptId < fileAttemptId) {
// keep file
toRetain = file;
toDelete = existingFile;
} else {
- throw new IOException(" File " + filePath +
- " with newer attempt ID " + fileAttemptId + " is smaller than the file "
- + existingFile.getPath() + " with older attempt ID " + existingFileAttemptId);
+ throw new IOException(filePath + " has same attempt ID " + fileAttemptId + " as "
+ + existingFile.getPath());
}
+
if (!fs.delete(toDelete.getPath(), true)) {
- throw new IOException(
- "Unable to delete duplicate file: " + toDelete.getPath()
- + ". Existing file: " + toRetain.getPath());
- } else {
- LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length "
- + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length "
- + toRetain.getLen());
+ throw new IOException("Unable to delete duplicate file: " + toDelete.getPath()
+ + ". Existing file: " + toRetain.getPath());
}
+ LOG.warn("Duplicate taskid file removed: " + toDelete.getPath() + " with length "
+ + toDelete.getLen() + ". Existing file: " + toRetain.getPath() + " with length "
+ + toRetain.getLen());
return toRetain;
}
- // This function recurisvely fetches the size of all the files in given directory
- private static long getFileSizeRecursively(FileSystem fs, FileStatus src)
- throws IOException {
- long size = 0;
- if (src.isDirectory()) {
- LOG.debug(" src " + src.getPath() + " is a directory");
- // This is a directory.
- try {
- FileStatus[] files = fs.listStatus(src.getPath(), FileUtils.HIDDEN_FILES_PATH_FILTER);
- // Recursively fetch sizes of each file
- for (FileStatus file : files) {
- size += getFileSizeRecursively(fs, file);
- }
- } catch (IOException e) {
- throw new IOException("Unable to fetch files in directory " + src.getPath(), e);
- }
- } else {
- size = src.getLen();
- LOG.debug("src " + src.getPath() + " is a file of size " + size);
- }
- return size;
- }
-
public static boolean isCopyFile(String filename) {
String taskId = filename;
String copyFileSuffix = null;
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
index 8a8822d..2071de3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
@@ -278,10 +278,9 @@ public class ExecDriver extends Task<MapredWork> implements Serializable, Hadoop
// set input format information if necessary
setInputAttributes(job);
- // Turn on speculative execution for reducers
- boolean useSpeculativeExecReducers = HiveConf.getBoolVar(job,
- HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
- job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, useSpeculativeExecReducers);
+ // HIVE-23354 enforces that MR speculative execution is disabled
+ job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+ job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
String inpFormat = HiveConf.getVar(job, HiveConf.ConfVars.HIVEINPUTFORMAT);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
index 3e8ba08..97220c0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
@@ -869,12 +869,9 @@ public class DagUtils {
// Is this required ?
conf.set("mapred.reducer.class", ExecReducer.class.getName());
-
- boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
- HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
- conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE,
- useSpeculativeExecReducers);
-
+ // HIVE-23354 enforces that MR speculative execution is disabled
+ conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.REDUCE_SPECULATIVE, false);
+ conf.setBoolean(org.apache.hadoop.mapreduce.MRJobConfig.MAP_SPECULATIVE, false);
return conf;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
index 7fb3878..34519fb 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/merge/MergeFileTask.java
@@ -99,6 +99,9 @@ public class MergeFileTask extends Task<MergeFileWork> implements Serializable,
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);
+ // HIVE-23354 enforces that MR speculative execution is disabled
+ job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+ job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
// create the temp directories
Path outputPath = work.getOutputDir();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
index 0458c94..752eea9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateTask.java
@@ -106,6 +106,9 @@ public class ColumnTruncateTask extends Task<ColumnTruncateWork> implements Seri
// zero reducers
job.setNumReduceTasks(0);
+ // HIVE-23354 enforces that MR speculative execution is disabled
+ job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+ job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
if (work.getMinSplitSize() != null) {
HiveConf.setLongVar(job, HiveConf.ConfVars.MAPREDMINSPLITSIZE, work
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 9410a96..05ea38c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskAttemptContext;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
@@ -328,6 +329,9 @@ public class CompactorMR {
job.set(DIRS_TO_SEARCH, dirsToSearch.toString());
job.setLong(MIN_TXN, minTxn);
job.setLong(MAX_TXN, maxTxn);
+ // HIVE-23354 enforces that MR speculative execution is disabled
+ job.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+ job.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
// Add tokens for all the file system in the input path.
ArrayList<Path> dirs = new ArrayList<>();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 163d439..04cfd9e 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -199,6 +200,9 @@ public class TestUtilities {
DynamicPartitionCtx dpCtx = getDynamicPartitionCtx(dPEnabled);
Path tempDirPath = setupTempDirWithSingleOutputFile(hconf);
FileSinkDesc conf = getFileSinkDesc(tempDirPath);
+ // HIVE-23354 enforces that MR speculative execution is disabled
+ hconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+ hconf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
List<Path> paths = Utilities.removeTempOrDuplicateFiles(localFs, tempDirPath, dpCtx, conf, hconf, false);