You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ma...@apache.org on 2022/07/12 03:24:05 UTC
[hive] branch master updated: HIVE-26382 : Stats generation fails during CTAS for external partitioned table. (Mahesh Kumar Behera, reviewed by Ayush Saxena)
This is an automated email from the ASF dual-hosted git repository.
mahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new eac61a4532e HIVE-26382 : Stats generation fails during CTAS for external partitioned table. (Mahesh Kumar Behera, reviewed by Ayush Saxena)
eac61a4532e is described below
commit eac61a4532ee645441daf8b1c11e909e95df810f
Author: mahesh kumar behera <ma...@apache.org>
AuthorDate: Tue Jul 12 08:53:56 2022 +0530
HIVE-26382 : Stats generation fails during CTAS for external partitioned table. (Mahesh Kumar Behera, reviewed by Ayush Saxena)
---
.../org/apache/hadoop/hive/ql/exec/MoveTask.java | 50 +++++++++++++--------
.../apache/hadoop/hive/ql/parse/TaskCompiler.java | 3 +-
ql/src/test/queries/clientpositive/ctas_blob.q | 6 ++-
.../results/clientpositive/llap/ctas_blob.q.out | 51 ++++++++++++++++++++++
4 files changed, 90 insertions(+), 20 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
index d4514f4812f..250debfd9b7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
@@ -104,6 +104,29 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
super();
}
+ private boolean moveFilesUsingManifestFile(FileSystem fs, Path sourcePath, Path targetPath)
+ throws HiveException, IOException {
+ if (work.isCTAS() && BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) {
+ if (fs.exists(new Path(sourcePath, BLOB_MANIFEST_FILE))) {
+ LOG.debug("Attempting to copy using the paths available in {}", new Path(sourcePath, BLOB_MANIFEST_FILE));
+ ArrayList<String> filesKept;
+ try (FSDataInputStream inStream = fs.open(new Path(sourcePath, BLOB_MANIFEST_FILE))) {
+ String paths = IOUtils.toString(inStream, Charset.defaultCharset());
+ filesKept = new ArrayList(Arrays.asList(paths.split(System.lineSeparator())));
+ }
+ // Remove the first entry from the list, it is the source path.
+ Path srcPath = new Path(filesKept.remove(0));
+ LOG.info("Copying files {} from {} to {}", filesKept, srcPath, targetPath);
+ // Do the move using the filesKept now directly to the target dir.
+ Utilities.moveSpecifiedFilesInParallel(conf, fs, srcPath, targetPath, new HashSet<>(filesKept));
+ return true;
+ }
+ // Fallback case, in any case the _blob_files_kept isn't created, we can do the normal logic. The file won't
+ // be created in case of empty source table as well
+ }
+ return false;
+ }
+
private void moveFile(Path sourcePath, Path targetPath, boolean isDfsDir)
throws HiveException {
try {
@@ -117,25 +140,12 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
FileSystem fs = sourcePath.getFileSystem(conf);
- if (work.isCTAS() && BlobStorageUtils.isBlobStorageFileSystem(conf, fs)) {
- if (fs.exists(new Path(sourcePath, BLOB_MANIFEST_FILE))) {
- LOG.debug("Attempting to copy using the paths available in {}", new Path(sourcePath, BLOB_MANIFEST_FILE));
- ArrayList<String> filesKept;
- try (FSDataInputStream inStream = fs.open(new Path(sourcePath, BLOB_MANIFEST_FILE))) {
- String paths = IOUtils.toString(inStream, Charset.defaultCharset());
- filesKept = new ArrayList(Arrays.asList(paths.split(System.lineSeparator())));
- }
- // Remove the first entry from the list, it is the source path.
- Path srcPath = new Path(filesKept.remove(0));
- LOG.info("Copying files {} from {} to {}", filesKept, srcPath, targetPath);
- // Do the move using the filesKept now directly to the target dir.
- Utilities.moveSpecifiedFilesInParallel(conf, fs, srcPath, targetPath, new HashSet<>(filesKept));
- perfLogger.perfLogEnd("MoveTask", PerfLogger.FILE_MOVES);
- return;
- }
- // Fallback case, in any case the _blob_files_kept isn't created, we can do the normal logic. The file won't
- // be created in case of empty source table as well
+ // if _blob_files_kept is present, use it to move the files. Else fall back to normal case.
+ if (moveFilesUsingManifestFile(fs, sourcePath, targetPath)) {
+ perfLogger.perfLogEnd("MoveTask", PerfLogger.FILE_MOVES);
+ return;
}
+
if (isDfsDir) {
moveFileInDfs (sourcePath, targetPath, conf);
} else {
@@ -470,6 +480,10 @@ public class MoveTask extends Task<MoveWork> implements Serializable {
// deal with dynamic partitions
DynamicPartitionCtx dpCtx = tbd.getDPCtx();
if (dpCtx != null && dpCtx.getNumDPCols() > 0) { // dynamic partitions
+ // if _blob_files_kept is present, use it to move the files to the target path
+ // before loading the partitions.
+ moveFilesUsingManifestFile(tbd.getSourcePath().getFileSystem(conf),
+ tbd.getSourcePath(), dpCtx.getRootPath());
dc = handleDynParts(db, table, tbd, ti, dpCtx);
} else { // static partitions
dc = handleStaticParts(db, table, tbd, ti);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index 255baabb93b..8cc755e7d13 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -270,7 +270,8 @@ public abstract class TaskCompiler {
} else if (!isCStats) {
for (LoadTableDesc ltd : loadTableWork) {
Task<MoveWork> tsk = TaskFactory
- .get(new MoveWork(null, null, ltd, null, false));
+ .get(new MoveWork(pCtx.getQueryProperties().isCTAS() && pCtx.getCreateTable().isExternal(),
+ null, null, ltd, null, false));
mvTask.add(tsk);
}
diff --git a/ql/src/test/queries/clientpositive/ctas_blob.q b/ql/src/test/queries/clientpositive/ctas_blob.q
index 7d871311bc9..4dc998f6b8a 100644
--- a/ql/src/test/queries/clientpositive/ctas_blob.q
+++ b/ql/src/test/queries/clientpositive/ctas_blob.q
@@ -22,6 +22,10 @@ create external table t1_ctas as select * from t1;
select * from t1_ctas order by key;
+create external table t1_ctas_part partitioned by (key) as select * from t1;
+
+select * from t1_ctas_part order by key;
+
-- try CTAS with empty external table.
create external table t2 (key int, value string);
@@ -64,4 +68,4 @@ drop table t1_ctas;
drop table t2_ctas;
drop table t3_ctas;
drop table texternal;
-drop table tmanaged;
\ No newline at end of file
+drop table tmanaged;
diff --git a/ql/src/test/results/clientpositive/llap/ctas_blob.q.out b/ql/src/test/results/clientpositive/llap/ctas_blob.q.out
index 9387c97e354..79967b4888f 100644
--- a/ql/src/test/results/clientpositive/llap/ctas_blob.q.out
+++ b/ql/src/test/results/clientpositive/llap/ctas_blob.q.out
@@ -62,6 +62,57 @@ POSTHOOK: Input: default@t1_ctas
4 JKLM
5 NOPQ
6 RSTUV
+PREHOOK: query: create external table t1_ctas_part partitioned by (key) as select * from t1
+PREHOOK: type: CREATETABLE_AS_SELECT
+PREHOOK: Input: default@t1
+PREHOOK: Output: database:default
+PREHOOK: Output: default@t1_ctas_part
+PREHOOK: Output: default@t1_ctas_part
+POSTHOOK: query: create external table t1_ctas_part partitioned by (key) as select * from t1
+POSTHOOK: type: CREATETABLE_AS_SELECT
+POSTHOOK: Input: default@t1
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@t1_ctas_part
+POSTHOOK: Output: default@t1_ctas_part
+POSTHOOK: Output: default@t1_ctas_part@key=1
+POSTHOOK: Output: default@t1_ctas_part@key=2
+POSTHOOK: Output: default@t1_ctas_part@key=3
+POSTHOOK: Output: default@t1_ctas_part@key=4
+POSTHOOK: Output: default@t1_ctas_part@key=5
+POSTHOOK: Output: default@t1_ctas_part@key=6
+POSTHOOK: Lineage: t1_ctas_part.value SIMPLE [(t1)t1.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_ctas_part PARTITION(key=1).value SIMPLE [(t1)t1.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_ctas_part PARTITION(key=2).value SIMPLE [(t1)t1.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_ctas_part PARTITION(key=3).value SIMPLE [(t1)t1.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_ctas_part PARTITION(key=4).value SIMPLE [(t1)t1.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_ctas_part PARTITION(key=5).value SIMPLE [(t1)t1.FieldSchema(name:value, type:string, comment:null), ]
+POSTHOOK: Lineage: t1_ctas_part PARTITION(key=6).value SIMPLE [(t1)t1.FieldSchema(name:value, type:string, comment:null), ]
+PREHOOK: query: select * from t1_ctas_part order by key
+PREHOOK: type: QUERY
+PREHOOK: Input: default@t1_ctas_part
+PREHOOK: Input: default@t1_ctas_part@key=1
+PREHOOK: Input: default@t1_ctas_part@key=2
+PREHOOK: Input: default@t1_ctas_part@key=3
+PREHOOK: Input: default@t1_ctas_part@key=4
+PREHOOK: Input: default@t1_ctas_part@key=5
+PREHOOK: Input: default@t1_ctas_part@key=6
+#### A masked pattern was here ####
+POSTHOOK: query: select * from t1_ctas_part order by key
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@t1_ctas_part
+POSTHOOK: Input: default@t1_ctas_part@key=1
+POSTHOOK: Input: default@t1_ctas_part@key=2
+POSTHOOK: Input: default@t1_ctas_part@key=3
+POSTHOOK: Input: default@t1_ctas_part@key=4
+POSTHOOK: Input: default@t1_ctas_part@key=5
+POSTHOOK: Input: default@t1_ctas_part@key=6
+#### A masked pattern was here ####
+ABCD 1
+BCDE 2
+FGHI 3
+JKLM 4
+NOPQ 5
+RSTUV 6
PREHOOK: query: create external table t2 (key int, value string)
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default