You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2023/01/24 07:18:28 UTC
[hive] branch master updated: HIVE-23891: UNION ALL and multiple task attempts can cause file duplication (#3836) (Zhihua Deng reviewed by Laszlo Bodor)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7c02f733d62 HIVE-23891: UNION ALL and multiple task attempts can cause file duplication (#3836) (Zhihua Deng reviewed by Laszlo Bodor)
7c02f733d62 is described below
commit 7c02f733d6251fec3a0fa9537e48817094204dad
Author: dengzh <de...@gmail.com>
AuthorDate: Tue Jan 24 15:18:19 2023 +0800
HIVE-23891: UNION ALL and multiple task attempts can cause file duplication (#3836) (Zhihua Deng reviewed by Laszlo Bodor)
---
.../hive/ql/exec/AbstractFileMergeOperator.java | 2 +-
.../hadoop/hive/ql/exec/FileSinkOperator.java | 2 +-
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 23 ++++----
.../io/rcfile/truncate/ColumnTruncateMapper.java | 2 +-
.../hadoop/hive/ql/exec/TestFileSinkOperator.java | 68 +++++++++++++++++++++-
5 files changed, 82 insertions(+), 15 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index 264573b17c4..454fc3d5c97 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -324,7 +324,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
if (!isMmTable) {
Path backupPath = backupOutputPath(fs, outputDir);
Utilities.mvFileToFinalPath(
- outputDir, hconf, success, LOG, conf.getDpCtx(), null, reporter);
+ outputDir, null, hconf, success, LOG, conf.getDpCtx(), null, reporter);
if (success) {
LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 88035ac4027..9d30093c239 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -1596,7 +1596,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
Utilities.FILE_OP_LOGGER.trace("jobCloseOp using specPath " + specPath);
}
if (!conf.isMmTable() && !conf.isDirectInsert()) {
- Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter);
+ Utilities.mvFileToFinalPath(specPath, unionSuffix, hconf, success, LOG, dpCtx, conf, reporter);
} else {
int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
lbLevels = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index c205f2c974f..315e00f1593 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1411,7 +1411,7 @@ public final class Utilities {
}
}
- public static void mvFileToFinalPath(Path specPath, Configuration hconf,
+ public static void mvFileToFinalPath(Path specPath, String unionSuffix, Configuration hconf,
boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
Reporter reporter) throws IOException,
HiveException {
@@ -1461,8 +1461,10 @@ public final class Utilities {
Set<FileStatus> filesKept = new HashSet<>();
perfLogger.perfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles");
// remove any tmp file or double-committed output files
- List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
- fs, statuses, dpCtx, conf, hconf, filesKept, false);
+ int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
+ numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0;
+ List<Path> emptyBuckets = removeTempOrDuplicateFiles(
+ fs, statuses, unionSuffix, dpLevels, numBuckets, hconf, null, 0, false, filesKept, false);
perfLogger.perfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles");
// create empty buckets if necessary
if (!emptyBuckets.isEmpty()) {
@@ -1809,15 +1811,14 @@ public final class Utilities {
if (!path.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) {
throw new IOException("Unexpected non-MM directory name " + path);
}
+ }
- Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", path);
-
- if (!StringUtils.isEmpty(unionSuffix)) {
- try {
- items = fs.listStatus(new Path(path, unionSuffix));
- } catch (FileNotFoundException e) {
- continue;
- }
+ Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in directory {}", path);
+ if (!StringUtils.isEmpty(unionSuffix)) {
+ try {
+ items = fs.listStatus(new Path(path, unionSuffix));
+ } catch (FileNotFoundException e) {
+ continue;
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
index c112978ff7d..736b9e39cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
@@ -234,7 +234,7 @@ public class ColumnTruncateMapper extends MapReduceBase implements
) throws HiveException, IOException {
FileSystem fs = outputPath.getFileSystem(job);
Path backupPath = backupOutputPath(fs, outputPath, job);
- Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null,
+ Utilities.mvFileToFinalPath(outputPath, null, job, success, LOG, dynPartCtx, null,
reporter);
if (backupPath != null) {
fs.delete(backupPath, true);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 05d8f564860..a27215ffd10 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -76,11 +76,15 @@ import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Tests for {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator}
@@ -162,6 +166,64 @@ public class TestFileSinkOperator {
confirmOutput(DataFormat.WITH_PARTITION_VALUE);
}
+ @Test
+ public void testNonAcidRemoveDuplicate() throws Exception {
+ setBasePath("writeDuplicate");
+ setupData(DataFormat.WITH_PARTITION_VALUE);
+
+ FileSinkDesc desc = (FileSinkDesc) getFileSink(AcidUtils.Operation.NOT_ACID, true, 0).getConf().clone();
+ desc.setLinkedFileSink(true);
+ desc.setDirName(new Path(desc.getDirName(), AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
+ JobConf jobConf = new JobConf(jc);
+ jobConf.set("hive.execution.engine", "tez");
+ jobConf.set("mapred.task.id", "000000_0");
+ FileSinkOperator op1 = (FileSinkOperator)OperatorFactory.get(new CompilationOpContext(), FileSinkDesc.class);
+ op1.setConf(desc);
+ op1.initialize(jobConf, new ObjectInspector[]{inspector});
+
+ JobConf jobConf2 = new JobConf(jobConf);
+ jobConf2.set("mapred.task.id", "000000_1");
+ FileSinkOperator op2 = (FileSinkOperator)OperatorFactory.get(
+ new CompilationOpContext(), FileSinkDesc.class);
+ op2.setConf(desc);
+ op2.initialize(jobConf2, new ObjectInspector[]{inspector});
+
+ for (Object r : rows) {
+ op1.process(r, 0);
+ op2.process(r, 0);
+ }
+
+ op1.close(false);
+ // Assume op2 also ends successfully, this happens in different containers
+ op2.close(false);
+ Path[] paths = findFilesInBasePath();
+ List<Path> mondays = Arrays.stream(paths)
+ .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+ .collect(Collectors.toList());
+ Assert.assertEquals("Two result files are expected", 2, mondays.size());
+ Set<String> fileNames = new HashSet<>();
+ fileNames.add(mondays.get(0).getName());
+ fileNames.add(mondays.get(1).getName());
+
+ Assert.assertTrue("000000_1 file is expected", fileNames.contains("000000_1"));
+ Assert.assertTrue("000000_0 file is expected", fileNames.contains("000000_0"));
+
+ // This happens in HiveServer2 when the job is finished, the job will call
+ // jobCloseOp to end his operators. For the FileSinkOperator, a deduplication on the
+ // output files may happen so that only one output file is left for each yarn task.
+ op1.jobCloseOp(jobConf, true);
+ List<Path> resultFiles = new ArrayList<Path>();
+ recurseOnPath(basePath, basePath.getFileSystem(jc), resultFiles);
+ mondays = resultFiles.stream()
+ .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+ .collect(Collectors.toList());
+ Assert.assertEquals("Only 1 file should be here after cleaning", 1, mondays.size());
+ Assert.assertEquals("000000_1 file is expected", "000000_1", mondays.get(0).getName());
+
+ confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.toArray(new Path[0]));
+ // Clean out directory after testing
+ basePath.getFileSystem(jc).delete(basePath, true);
+ }
@Test
public void testInsertDynamicPartitioning() throws Exception {
@@ -290,6 +352,7 @@ public class TestFileSinkOperator {
} else {
desc = new FileSinkDesc(basePath, tableDesc, false);
}
+ desc.setStatsAggPrefix(basePath.toString());
desc.setWriteType(writeType);
desc.setGatherStats(true);
if (writeId > 0) {
@@ -313,7 +376,10 @@ public class TestFileSinkOperator {
}
private void confirmOutput(DataFormat rType) throws IOException, SerDeException, CloneNotSupportedException {
- Path[] paths = findFilesInBasePath();
+ confirmOutput(rType, findFilesInBasePath());
+ }
+
+ private void confirmOutput(DataFormat rType, Path[] paths) throws IOException, SerDeException, CloneNotSupportedException {
TFSOInputFormat input = new TFSOInputFormat(rType);
FileInputFormat.setInputPaths(jc, paths);