You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ab...@apache.org on 2023/01/24 07:18:28 UTC

[hive] branch master updated: HIVE-23891: UNION ALL and multiple task attempts can cause file duplication (#3836) (Zhihua Deng reviewed by Laszlo Bodor)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c02f733d62 HIVE-23891: UNION ALL and multiple task attempts can cause file duplication (#3836) (Zhihua Deng reviewed by Laszlo Bodor)
7c02f733d62 is described below

commit 7c02f733d6251fec3a0fa9537e48817094204dad
Author: dengzh <de...@gmail.com>
AuthorDate: Tue Jan 24 15:18:19 2023 +0800

    HIVE-23891: UNION ALL and multiple task attempts can cause file duplication (#3836) (Zhihua Deng reviewed by Laszlo Bodor)
---
 .../hive/ql/exec/AbstractFileMergeOperator.java    |  2 +-
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |  2 +-
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  | 23 ++++----
 .../io/rcfile/truncate/ColumnTruncateMapper.java   |  2 +-
 .../hadoop/hive/ql/exec/TestFileSinkOperator.java  | 68 +++++++++++++++++++++-
 5 files changed, 82 insertions(+), 15 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
index 264573b17c4..454fc3d5c97 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java
@@ -324,7 +324,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc>
       if (!isMmTable) {
         Path backupPath = backupOutputPath(fs, outputDir);
         Utilities.mvFileToFinalPath(
-            outputDir, hconf, success, LOG, conf.getDpCtx(), null, reporter);
+            outputDir, null, hconf, success, LOG, conf.getDpCtx(), null, reporter);
         if (success) {
           LOG.info("jobCloseOp moved merged files to output dir: " + outputDir);
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index 88035ac4027..9d30093c239 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -1596,7 +1596,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
           Utilities.FILE_OP_LOGGER.trace("jobCloseOp using specPath " + specPath);
         }
         if (!conf.isMmTable() && !conf.isDirectInsert()) {
-          Utilities.mvFileToFinalPath(specPath, hconf, success, LOG, dpCtx, conf, reporter);
+          Utilities.mvFileToFinalPath(specPath, unionSuffix, hconf, success, LOG, dpCtx, conf, reporter);
         } else {
           int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
               lbLevels = lbCtx == null ? 0 : lbCtx.calculateListBucketingLevel();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index c205f2c974f..315e00f1593 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1411,7 +1411,7 @@ public final class Utilities {
     }
   }
 
-  public static void mvFileToFinalPath(Path specPath, Configuration hconf,
+  public static void mvFileToFinalPath(Path specPath, String unionSuffix, Configuration hconf,
                                        boolean success, Logger log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
                                        Reporter reporter) throws IOException,
       HiveException {
@@ -1461,8 +1461,10 @@ public final class Utilities {
         Set<FileStatus> filesKept = new HashSet<>();
         perfLogger.perfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles");
         // remove any tmp file or double-committed output files
-        List<Path> emptyBuckets = Utilities.removeTempOrDuplicateFiles(
-            fs, statuses, dpCtx, conf, hconf, filesKept, false);
+        int dpLevels = dpCtx == null ? 0 : dpCtx.getNumDPCols(),
+            numBuckets = (conf != null && conf.getTable() != null) ? conf.getTable().getNumBuckets() : 0;
+        List<Path> emptyBuckets = removeTempOrDuplicateFiles(
+            fs, statuses, unionSuffix, dpLevels, numBuckets, hconf, null, 0, false, filesKept, false);
         perfLogger.perfLogEnd("FileSinkOperator", "RemoveTempOrDuplicateFiles");
         // create empty buckets if necessary
         if (!emptyBuckets.isEmpty()) {
@@ -1809,15 +1811,14 @@ public final class Utilities {
           if (!path.getName().equals(AcidUtils.baseOrDeltaSubdir(isBaseDir, writeId, writeId, stmtId))) {
             throw new IOException("Unexpected non-MM directory name " + path);
           }
+        }
 
-          Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in MM directory {}", path);
-
-          if (!StringUtils.isEmpty(unionSuffix)) {
-            try {
-              items = fs.listStatus(new Path(path, unionSuffix));
-            } catch (FileNotFoundException e) {
-              continue;
-            }
+        Utilities.FILE_OP_LOGGER.trace("removeTempOrDuplicateFiles processing files in directory {}", path);
+        if (!StringUtils.isEmpty(unionSuffix)) {
+          try {
+            items = fs.listStatus(new Path(path, unionSuffix));
+          } catch (FileNotFoundException e) {
+            continue;
           }
         }
 
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
index c112978ff7d..736b9e39cb2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/truncate/ColumnTruncateMapper.java
@@ -234,7 +234,7 @@ public class ColumnTruncateMapper extends MapReduceBase implements
       ) throws HiveException, IOException {
     FileSystem fs = outputPath.getFileSystem(job);
     Path backupPath = backupOutputPath(fs, outputPath, job);
-    Utilities.mvFileToFinalPath(outputPath, job, success, LOG, dynPartCtx, null,
+    Utilities.mvFileToFinalPath(outputPath, null, job, success, LOG, dynPartCtx, null,
       reporter);
     if (backupPath != null) {
       fs.delete(backupPath, true);
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index 05d8f564860..a27215ffd10 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -76,11 +76,15 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Tests for {@link org.apache.hadoop.hive.ql.exec.FileSinkOperator}
@@ -162,6 +166,64 @@ public class TestFileSinkOperator {
     confirmOutput(DataFormat.WITH_PARTITION_VALUE);
   }
 
+  @Test
+  public void testNonAcidRemoveDuplicate() throws Exception {
+    setBasePath("writeDuplicate");
+    setupData(DataFormat.WITH_PARTITION_VALUE);
+
+    FileSinkDesc desc = (FileSinkDesc) getFileSink(AcidUtils.Operation.NOT_ACID, true, 0).getConf().clone();
+    desc.setLinkedFileSink(true);
+    desc.setDirName(new Path(desc.getDirName(), AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "0"));
+    JobConf jobConf = new JobConf(jc);
+    jobConf.set("hive.execution.engine", "tez");
+    jobConf.set("mapred.task.id", "000000_0");
+    FileSinkOperator op1 = (FileSinkOperator)OperatorFactory.get(new CompilationOpContext(), FileSinkDesc.class);
+    op1.setConf(desc);
+    op1.initialize(jobConf, new ObjectInspector[]{inspector});
+
+    JobConf jobConf2 = new JobConf(jobConf);
+    jobConf2.set("mapred.task.id", "000000_1");
+    FileSinkOperator op2 = (FileSinkOperator)OperatorFactory.get(
+        new CompilationOpContext(), FileSinkDesc.class);
+    op2.setConf(desc);
+    op2.initialize(jobConf2, new ObjectInspector[]{inspector});
+
+    for (Object r : rows) {
+      op1.process(r, 0);
+      op2.process(r, 0);
+    }
+
+    op1.close(false);
+    // Assume op2 also ends successfully, this happens in different containers
+    op2.close(false);
+    Path[] paths = findFilesInBasePath();
+    List<Path> mondays = Arrays.stream(paths)
+        .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+        .collect(Collectors.toList());
+    Assert.assertEquals("Two result files are expected", 2, mondays.size());
+    Set<String> fileNames = new HashSet<>();
+    fileNames.add(mondays.get(0).getName());
+    fileNames.add(mondays.get(1).getName());
+
+    Assert.assertTrue("000000_1 file is expected", fileNames.contains("000000_1"));
+    Assert.assertTrue("000000_0 file is expected", fileNames.contains("000000_0"));
+
+    // This happens in HiveServer2 when the job is finished, the job will call
+    // jobCloseOp to end his operators. For the FileSinkOperator, a deduplication on the
+    // output files may happen so that only one output file is left for each yarn task.
+    op1.jobCloseOp(jobConf, true);
+    List<Path> resultFiles = new ArrayList<Path>();
+    recurseOnPath(basePath, basePath.getFileSystem(jc), resultFiles);
+    mondays = resultFiles.stream()
+        .filter(path -> path.getParent().toString().endsWith("partval=Monday/HIVE_UNION_SUBDIR_0"))
+        .collect(Collectors.toList());
+    Assert.assertEquals("Only 1 file should be here after cleaning", 1, mondays.size());
+    Assert.assertEquals("000000_1 file is expected", "000000_1", mondays.get(0).getName());
+
+    confirmOutput(DataFormat.WITH_PARTITION_VALUE, resultFiles.toArray(new Path[0]));
+    // Clean out directory after testing
+    basePath.getFileSystem(jc).delete(basePath, true);
+  }
 
   @Test
   public void testInsertDynamicPartitioning() throws Exception {
@@ -290,6 +352,7 @@ public class TestFileSinkOperator {
     } else {
       desc = new FileSinkDesc(basePath, tableDesc, false);
     }
+    desc.setStatsAggPrefix(basePath.toString());
     desc.setWriteType(writeType);
     desc.setGatherStats(true);
     if (writeId > 0) {
@@ -313,7 +376,10 @@ public class TestFileSinkOperator {
   }
 
   private void confirmOutput(DataFormat rType) throws IOException, SerDeException, CloneNotSupportedException {
-    Path[] paths = findFilesInBasePath();
+    confirmOutput(rType, findFilesInBasePath());
+  }
+
+  private void confirmOutput(DataFormat rType, Path[] paths) throws IOException, SerDeException, CloneNotSupportedException {
     TFSOInputFormat input = new TFSOInputFormat(rType);
     FileInputFormat.setInputPaths(jc, paths);