You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/08/01 18:05:00 UTC

[hive] branch master updated: HIVE-22045 : HIVE-21711 introduced regression in data load (Vineet Garg, reviewed by Gopal V)

This is an automated email from the ASF dual-hosted git repository.

vgarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new b1a446b  HIVE-22045 : HIVE-21711 introduced regression in data load (Vineet Garg, reviewed by Gopal V)
b1a446b is described below

commit b1a446bd7c9a8f18ca19d6e9f31f158beaf85308
Author: Vineet Garg <vg...@apache.org>
AuthorDate: Thu Aug 1 11:04:18 2019 -0700

    HIVE-22045 : HIVE-21711 introduced regression in data load (Vineet Garg, reviewed by Gopal V)
---
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  | 27 ++++++++++++++++++----
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     |  3 ++-
 .../apache/hadoop/hive/ql/plan/FileSinkDesc.java   | 21 +++++++++++++++--
 .../hadoop/hive/ql/exec/TestFileSinkOperator.java  |  2 +-
 4 files changed, 44 insertions(+), 9 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 1d32ba0..ac89dd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1431,17 +1431,28 @@ public final class Utilities {
     //       3) Rename/move the temp directory to specPath
 
     FileSystem fs = specPath.getFileSystem(hconf);
-    boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
     Path tmpPath = Utilities.toTempPath(specPath);
     Path taskTmpPath = Utilities.toTaskTempPath(specPath);
+    PerfLogger perfLogger = SessionState.getPerfLogger();
+    boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
+    boolean avoidRename = false;
+    boolean shouldAvoidRename = shouldAvoidRename(conf, hconf);
+
+    if(isBlobStorage && (shouldAvoidRename|| ((conf != null) && conf.isCTASorCM()))
+        || (!isBlobStorage && shouldAvoidRename)) {
+      avoidRename = true;
+    }
     if (success) {
-      if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) {
+      if (!avoidRename && fs.exists(tmpPath)) {
         //   1) Rename tmpPath to a new directory name to prevent additional files
         //      from being added by runaway processes.
+        // this is only done for all statements except SELECT, CTAS and Create MV
         Path tmpPathOriginal = tmpPath;
         tmpPath = new Path(tmpPath.getParent(), tmpPath.getName() + ".moved");
-        LOG.debug("Moving/Renaming " + tmpPathOriginal + " to " + tmpPath);
+        LOG.debug("shouldAvoidRename is false therefore moving/renaming " + tmpPathOriginal + " to " + tmpPath);
+        perfLogger.PerfLogBegin("FileSinkOperator", "rename");
         Utilities.rename(fs, tmpPathOriginal, tmpPath);
+        perfLogger.PerfLogEnd("FileSinkOperator", "rename");
       }
 
       // Remove duplicates from tmpPath
@@ -1449,7 +1460,6 @@ public final class Utilities {
           tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
       FileStatus[] statuses = statusList.toArray(new FileStatus[statusList.size()]);
       if(statuses != null && statuses.length > 0) {
-        PerfLogger perfLogger = SessionState.getPerfLogger();
         Set<FileStatus> filesKept = new HashSet<>();
         perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles");
         // remove any tmp file or double-committed output files
@@ -1471,12 +1481,19 @@ public final class Utilities {
         // move to the file destination
         Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath);
         if(shouldAvoidRename(conf, hconf)){
+          // for SELECT statements
           LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString());
           conf.getFilesToFetch().addAll(filesKept);
-        } else if (isBlobStorage) {
+        } else if (conf !=null && conf.isCTASorCM() && isBlobStorage) {
+          // for CTAS or Create MV statements
+          perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus");
+          LOG.debug("CTAS/Create MV: Files being renamed:  " + filesKept.toString());
           Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept);
+          perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus");
         } else {
+          // for rest of the statement e.g. INSERT, LOAD etc
           perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
+          LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath);
           Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
           perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles");
         }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 8ff00fb..826b23e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8168,7 +8168,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc,
         conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(),
         canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx,
-        dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery());
+        dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(),
+        qb.isCTAS() || qb.isMaterializedView());
 
     boolean isHiveServerQuery = SessionState.get().isHiveServerQuery();
     fileSinkDesc.setHiveServerQuery(isHiveServerQuery);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 61ea28a..72ecde4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -115,6 +115,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
 
   private boolean isQuery = false;
 
+  private boolean isCTASorCM = false;
+
   public FileSinkDesc() {
   }
 
@@ -125,7 +127,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
       final boolean compressed, final int destTableId, final boolean multiFileSpray,
       final boolean canBeMerged, final int numFiles, final int totalFiles,
       final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
-      Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery) {
+      Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM) {
 
     this.dirName = dirName;
     this.tableInfo = tableInfo;
@@ -143,6 +145,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
     this.isMmCtas = isMmCtas;
     this.isInsertOverwrite = isInsertOverwrite;
     this.isQuery = isQuery;
+    this.isCTASorCM = isCTASorCM;
   }
 
   public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -164,7 +167,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
   public Object clone() throws CloneNotSupportedException {
     FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
         destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
-        partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery);
+        partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isCTASorCM);
     ret.setCompressCodec(compressCodec);
     ret.setCompressType(compressType);
     ret.setGatherStats(gatherStats);
@@ -181,6 +184,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
     ret.setIsMerge(isMerge);
     ret.setFilesToFetch(filesToFetch);
     ret.setIsQuery(isQuery);
+    ret.setIsCTASorCM(isCTASorCM);
     return ret;
   }
 
@@ -188,6 +192,10 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
     this.filesToFetch = filesToFetch;
   }
 
+  public void setIsCTASorCM(boolean isCTASorCM) {
+    this.isCTASorCM = isCTASorCM;
+  }
+
   public void setIsQuery(boolean isQuery) {
     this.isQuery = isQuery;
   }
@@ -591,6 +599,15 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
     return isMmCtas;
   }
 
+  /**
+   * Whether this is CREATE TABLE SELECT or CREATE MATERIALIZED VIEW statemet
+   * Set by semantic analyzer this is required because CTAS/CM requires some special logic
+   * in mvFileToFinalPath
+   */
+  public boolean isCTASorCM() {
+    return isCTASorCM;
+  }
+
   public class FileSinkOperatorExplainVectorization extends OperatorExplainVectorization {
 
     public FileSinkOperatorExplainVectorization(VectorFileSinkDesc vectorFileSinkDesc) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index a75103d..2c4b69b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -286,7 +286,7 @@ public class TestFileSinkOperator {
       DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100);
       //todo: does this need the finalDestination?
       desc = new FileSinkDesc(basePath, tableDesc, false, 1, false,
-          false, 1, 1, partCols, dpCtx, null, null, false, false, false);
+          false, 1, 1, partCols, dpCtx, null, null, false, false, false, false);
     } else {
       desc = new FileSinkDesc(basePath, tableDesc, false);
     }