You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2019/08/01 18:05:00 UTC
[hive] branch master updated: HIVE-22045 : HIVE-21711 introduced
regression in data load (Vineet Garg, reviewed by Gopal V)
This is an automated email from the ASF dual-hosted git repository.
vgarg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new b1a446b HIVE-22045 : HIVE-21711 introduced regression in data load (Vineet Garg, reviewed by Gopal V)
b1a446b is described below
commit b1a446bd7c9a8f18ca19d6e9f31f158beaf85308
Author: Vineet Garg <vg...@apache.org>
AuthorDate: Thu Aug 1 11:04:18 2019 -0700
HIVE-22045 : HIVE-21711 introduced regression in data load (Vineet Garg, reviewed by Gopal V)
---
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 27 ++++++++++++++++++----
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 3 ++-
.../apache/hadoop/hive/ql/plan/FileSinkDesc.java | 21 +++++++++++++++--
.../hadoop/hive/ql/exec/TestFileSinkOperator.java | 2 +-
4 files changed, 44 insertions(+), 9 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 1d32ba0..ac89dd9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -1431,17 +1431,28 @@ public final class Utilities {
// 3) Rename/move the temp directory to specPath
FileSystem fs = specPath.getFileSystem(hconf);
- boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
Path tmpPath = Utilities.toTempPath(specPath);
Path taskTmpPath = Utilities.toTaskTempPath(specPath);
+ PerfLogger perfLogger = SessionState.getPerfLogger();
+ boolean isBlobStorage = BlobStorageUtils.isBlobStorageFileSystem(hconf, fs);
+ boolean avoidRename = false;
+ boolean shouldAvoidRename = shouldAvoidRename(conf, hconf);
+
+ if(isBlobStorage && (shouldAvoidRename|| ((conf != null) && conf.isCTASorCM()))
+ || (!isBlobStorage && shouldAvoidRename)) {
+ avoidRename = true;
+ }
if (success) {
- if (!shouldAvoidRename(conf, hconf) && fs.exists(tmpPath) && !isBlobStorage) {
+ if (!avoidRename && fs.exists(tmpPath)) {
// 1) Rename tmpPath to a new directory name to prevent additional files
// from being added by runaway processes.
+ // this is only done for all statements except SELECT, CTAS and Create MV
Path tmpPathOriginal = tmpPath;
tmpPath = new Path(tmpPath.getParent(), tmpPath.getName() + ".moved");
- LOG.debug("Moving/Renaming " + tmpPathOriginal + " to " + tmpPath);
+ LOG.debug("shouldAvoidRename is false therefore moving/renaming " + tmpPathOriginal + " to " + tmpPath);
+ perfLogger.PerfLogBegin("FileSinkOperator", "rename");
Utilities.rename(fs, tmpPathOriginal, tmpPath);
+ perfLogger.PerfLogEnd("FileSinkOperator", "rename");
}
// Remove duplicates from tmpPath
@@ -1449,7 +1460,6 @@ public final class Utilities {
tmpPath, ((dpCtx == null) ? 1 : dpCtx.getNumDPCols()), fs);
FileStatus[] statuses = statusList.toArray(new FileStatus[statusList.size()]);
if(statuses != null && statuses.length > 0) {
- PerfLogger perfLogger = SessionState.getPerfLogger();
Set<FileStatus> filesKept = new HashSet<>();
perfLogger.PerfLogBegin("FileSinkOperator", "RemoveTempOrDuplicateFiles");
// remove any tmp file or double-committed output files
@@ -1471,12 +1481,19 @@ public final class Utilities {
// move to the file destination
Utilities.FILE_OP_LOGGER.trace("Moving tmp dir: {} to: {}", tmpPath, specPath);
if(shouldAvoidRename(conf, hconf)){
+ // for SELECT statements
LOG.debug("Skipping rename/move files. Files to be kept are: " + filesKept.toString());
conf.getFilesToFetch().addAll(filesKept);
- } else if (isBlobStorage) {
+ } else if (conf !=null && conf.isCTASorCM() && isBlobStorage) {
+ // for CTAS or Create MV statements
+ perfLogger.PerfLogBegin("FileSinkOperator", "moveSpecifiedFileStatus");
+ LOG.debug("CTAS/Create MV: Files being renamed: " + filesKept.toString());
Utilities.moveSpecifiedFileStatus(fs, tmpPath, specPath, filesKept);
+ perfLogger.PerfLogEnd("FileSinkOperator", "moveSpecifiedFileStatus");
} else {
+ // for rest of the statement e.g. INSERT, LOAD etc
perfLogger.PerfLogBegin("FileSinkOperator", "RenameOrMoveFiles");
+ LOG.debug("Final renaming/moving. Source: " + tmpPath + " .Destination: " + specPath);
Utilities.renameOrMoveFiles(fs, tmpPath, specPath);
perfLogger.PerfLogEnd("FileSinkOperator", "RenameOrMoveFiles");
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 8ff00fb..826b23e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -8168,7 +8168,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
FileSinkDesc fileSinkDesc = new FileSinkDesc(queryTmpdir, table_desc,
conf.getBoolVar(HiveConf.ConfVars.COMPRESSRESULT), currentTableId, rsCtx.isMultiFileSpray(),
canBeMerged, rsCtx.getNumFiles(), rsCtx.getTotalFiles(), rsCtx.getPartnCols(), dpCtx,
- dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery());
+ dest_path, mmWriteId, isMmCtas, isInsertOverwrite, qb.getIsQuery(),
+ qb.isCTAS() || qb.isMaterializedView());
boolean isHiveServerQuery = SessionState.get().isHiveServerQuery();
fileSinkDesc.setHiveServerQuery(isHiveServerQuery);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index 61ea28a..72ecde4 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -115,6 +115,8 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
private boolean isQuery = false;
+ private boolean isCTASorCM = false;
+
public FileSinkDesc() {
}
@@ -125,7 +127,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
final boolean compressed, final int destTableId, final boolean multiFileSpray,
final boolean canBeMerged, final int numFiles, final int totalFiles,
final ArrayList<ExprNodeDesc> partitionCols, final DynamicPartitionCtx dpCtx, Path destPath,
- Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery) {
+ Long mmWriteId, boolean isMmCtas, boolean isInsertOverwrite, boolean isQuery, boolean isCTASorCM) {
this.dirName = dirName;
this.tableInfo = tableInfo;
@@ -143,6 +145,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
this.isMmCtas = isMmCtas;
this.isInsertOverwrite = isInsertOverwrite;
this.isQuery = isQuery;
+ this.isCTASorCM = isCTASorCM;
}
public FileSinkDesc(final Path dirName, final TableDesc tableInfo,
@@ -164,7 +167,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
public Object clone() throws CloneNotSupportedException {
FileSinkDesc ret = new FileSinkDesc(dirName, tableInfo, compressed,
destTableId, multiFileSpray, canBeMerged, numFiles, totalFiles,
- partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery);
+ partitionCols, dpCtx, destPath, mmWriteId, isMmCtas, isInsertOverwrite, isQuery, isCTASorCM);
ret.setCompressCodec(compressCodec);
ret.setCompressType(compressType);
ret.setGatherStats(gatherStats);
@@ -181,6 +184,7 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
ret.setIsMerge(isMerge);
ret.setFilesToFetch(filesToFetch);
ret.setIsQuery(isQuery);
+ ret.setIsCTASorCM(isCTASorCM);
return ret;
}
@@ -188,6 +192,10 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
this.filesToFetch = filesToFetch;
}
+ public void setIsCTASorCM(boolean isCTASorCM) {
+ this.isCTASorCM = isCTASorCM;
+ }
+
public void setIsQuery(boolean isQuery) {
this.isQuery = isQuery;
}
@@ -591,6 +599,15 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe
return isMmCtas;
}
+ /**
+ * Whether this is CREATE TABLE SELECT or CREATE MATERIALIZED VIEW statemet
+ * Set by semantic analyzer this is required because CTAS/CM requires some special logic
+ * in mvFileToFinalPath
+ */
+ public boolean isCTASorCM() {
+ return isCTASorCM;
+ }
+
public class FileSinkOperatorExplainVectorization extends OperatorExplainVectorization {
public FileSinkOperatorExplainVectorization(VectorFileSinkDesc vectorFileSinkDesc) {
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
index a75103d..2c4b69b 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java
@@ -286,7 +286,7 @@ public class TestFileSinkOperator {
DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100);
//todo: does this need the finalDestination?
desc = new FileSinkDesc(basePath, tableDesc, false, 1, false,
- false, 1, 1, partCols, dpCtx, null, null, false, false, false);
+ false, 1, 1, partCols, dpCtx, null, null, false, false, false, false);
} else {
desc = new FileSinkDesc(basePath, tableDesc, false);
}