You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2019/10/24 05:42:58 UTC
[hive] branch master updated: HIVE-22396 : CMV creating a Full ACID
partitioned table fails because of no writeId (Jesus Camacho Rodriguez via
Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new c69bcdc HIVE-22396 : CMV creating a Full ACID partitioned table fails because of no writeId (Jesus Camacho Rodriguez via Ashutosh Chauhan)
c69bcdc is described below
commit c69bcdc848e9974e8eb3ea6eb60d96680069f1b8
Author: Jesus Camacho Rodriguez <jc...@apache.org>
AuthorDate: Wed Oct 23 22:42:08 2019 -0700
HIVE-22396 : CMV creating a Full ACID partitioned table fails because of no writeId (Jesus Camacho Rodriguez via Ashutosh Chauhan)
Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
.../hive/ql/ddl/view/create/CreateViewDesc.java | 23 +++++++
.../hadoop/hive/ql/parse/SemanticAnalyzer.java | 76 +++++++++++++---------
.../apache/hadoop/hive/ql/parse/TaskCompiler.java | 21 +++---
.../clientpositive/materialized_view_create.q | 14 ++++
.../llap/materialized_view_create.q.out | 57 ++++++++++++++++
5 files changed, 152 insertions(+), 39 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
index 1f30478..d1f3694 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
import org.apache.hadoop.hive.ql.plan.Explain;
import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +78,10 @@ public class CreateViewDesc implements DDLDesc, Serializable {
private List<String> distributeColNames; // only used for materialized views
private List<FieldSchema> distributeCols; // only used for materialized views
private ReplicationSpec replicationSpec = null;
+ private Long initialMmWriteId; // Initial MM write ID for CMV and import.
+ // The FSOP configuration for the FSOP that is going to write initial data during cmv.
+ // This is not needed beyond compilation, so it is transient.
+ private transient FileSinkDesc writer;
private String ownerName = null;
/**
@@ -458,6 +463,24 @@ public class CreateViewDesc implements DDLDesc, Serializable {
return tbl;
}
+ public void setInitialMmWriteId(Long mmWriteId) {
+ this.initialMmWriteId = mmWriteId;
+ }
+
+ public Long getInitialMmWriteId() {
+ return initialMmWriteId;
+ }
+
+ public FileSinkDesc getAndUnsetWriter() {
+ FileSinkDesc fsd = writer;
+ writer = null;
+ return fsd;
+ }
+
+ public void setWriter(FileSinkDesc writer) {
+ this.writer = writer;
+ }
+
public void setOwnerName(String ownerName) {
this.ownerName = ownerName;
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 30d3791..2257cc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7296,7 +7296,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
LoadTableDesc ltd = null;
ListBucketingCtx lbCtx = null;
Map<String, String> partSpec = null;
- boolean isMmTable = false, isMmCtas = false;
+ boolean isMmTable = false, isMmCreate = false;
Long writeId = null;
HiveTxnManager txnMgr = getTxnMgr();
@@ -7568,6 +7568,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
List<ColumnInfo> fileSinkColInfos = null;
List<ColumnInfo> sortColInfos = null;
List<ColumnInfo> distributeColInfos = null;
+ String dbName = null;
+ String tableName = null;
+ Map<String, String> tblProps = null;
CreateTableDesc tblDesc = qb.getTableDesc();
CreateViewDesc viewDesc = qb.getViewDesc();
if (tblDesc != null) {
@@ -7577,31 +7580,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
fileSinkColInfos = new ArrayList<>();
destTableIsTemporary = tblDesc.isTemporary();
destTableIsMaterialization = tblDesc.isMaterialization();
- if (AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) {
- try {
- if (ctx.getExplainConfig() != null) {
- writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
- } else {
- String dbName = tblDesc.getDatabaseName();
- String tableName = tblDesc.getTableName();
-
- // CreateTableDesc stores table name as db.table. So, need to decode it before allocating
- // write id.
- if (tableName.contains(".")) {
- String[] names = Utilities.getDbTableName(tableName);
- dbName = names[0];
- tableName = names[1];
- }
- writeId = txnMgr.getTableWriteId(dbName, tableName);
- }
- } catch (LockException ex) {
- throw new SemanticException("Failed to allocate write Id", ex);
- }
- if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) {
- isMmTable = isMmCtas = true;
- tblDesc.setInitialMmWriteId(writeId);
- }
- }
+ dbName = tblDesc.getDatabaseName();
+ tableName = tblDesc.getTableName();
+ // CreateTableDesc stores table name as db.table. So, need to decode it before allocating
+ // write id.
+ if (tableName.contains(".")) {
+ String[] names = Utilities.getDbTableName(tableName);
+ dbName = names[0];
+ tableName = names[1];
+ }
+ tblProps = tblDesc.getTblProps();
} else if (viewDesc != null) {
fieldSchemas = new ArrayList<>();
partitionColumns = new ArrayList<>();
@@ -7615,6 +7603,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
distributeColInfos = new ArrayList<>();
destTableIsTemporary = false;
destTableIsMaterialization = false;
+ String[] names = Utilities.getDbTableName(viewDesc.getViewName());
+ dbName = names[0];
+ tableName = names[1];
+ tblProps = viewDesc.getTblProps();
+ }
+
+ if (tblProps != null && AcidUtils.isTablePropertyTransactional(tblProps)) {
+ try {
+ if (ctx.getExplainConfig() != null) {
+ writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
+ } else {
+ writeId = txnMgr.getTableWriteId(dbName, tableName);
+ }
+ } catch (LockException ex) {
+ throw new SemanticException("Failed to allocate write Id", ex);
+ }
+ if (AcidUtils.isInsertOnlyTable(tblProps, true)) {
+ isMmTable = isMmCreate = true;
+ if (tblDesc != null) {
+ tblDesc.setInitialMmWriteId(writeId);
+ } else {
+ viewDesc.setInitialMmWriteId(writeId);
+ }
+ }
}
if (isLocal) {
@@ -7797,7 +7809,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
colTypes,
destTableIsFullAcid ?//there is a change here - prev version had 'transactional', one before 'acid'
Operation.INSERT : Operation.NOT_ACID,
- isMmCtas));
+ isMmCreate));
if (!outputs.add(new WriteEntity(destinationPath, !isDfsDir, isDestTempFile))) {
throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
.getMsg(destinationPath.toUri().toString()));
@@ -7855,10 +7867,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition,
destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid
destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
- canBeMerged, destinationTable, writeId, isMmCtas, destType, qb);
- if (isMmCtas) {
+ canBeMerged, destinationTable, writeId, isMmCreate, destType, qb);
+ if (isMmCreate) {
// Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
- tableDesc.setWriter(fileSinkDesc);
+ if (tableDesc != null) {
+ tableDesc.setWriter(fileSinkDesc);
+ } else {
+ createVwDesc.setWriter(fileSinkDesc);
+ }
}
if (fileSinkDesc.getInsertOverwrite()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index cdd212c..ec46280 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -446,22 +446,25 @@ public abstract class TaskCompiler {
private void setLoadFileLocation(
final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException {
// CTAS; make the movetask's destination directory the table's destination.
- Long txnIdForCtas = null;
- int stmtId = 0; // CTAS cannot be part of multi-txn stmt
- FileSinkDesc dataSinkForCtas = null;
+ Long txnId = null;
+ int stmtId = 0; // CTAS or CMV cannot be part of multi-txn stmt
+ FileSinkDesc dataSink = null;
String loc = null;
if (pCtx.getQueryProperties().isCTAS()) {
CreateTableDesc ctd = pCtx.getCreateTable();
- dataSinkForCtas = ctd.getAndUnsetWriter();
- txnIdForCtas = ctd.getInitialMmWriteId();
+ dataSink = ctd.getAndUnsetWriter();
+ txnId = ctd.getInitialMmWriteId();
loc = ctd.getLocation();
} else {
- loc = pCtx.getCreateViewDesc().getLocation();
+ CreateViewDesc cmv = pCtx.getCreateViewDesc();
+ dataSink = cmv.getAndUnsetWriter();
+ txnId = cmv.getInitialMmWriteId();
+ loc = cmv.getLocation();
}
Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new Path(loc);
- if (txnIdForCtas != null) {
- dataSinkForCtas.setDirName(location);
- location = new Path(location, AcidUtils.deltaSubdir(txnIdForCtas, txnIdForCtas, stmtId));
+ if (txnId != null) {
+ dataSink.setDirName(location);
+ location = new Path(location, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
lfd.setSourcePath(location);
if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
Utilities.FILE_OP_LOGGER.trace("Setting MM CTAS to " + location);
diff --git a/ql/src/test/queries/clientpositive/materialized_view_create.q b/ql/src/test/queries/clientpositive/materialized_view_create.q
index c65cde5..77cc60f 100644
--- a/ql/src/test/queries/clientpositive/materialized_view_create.q
+++ b/ql/src/test/queries/clientpositive/materialized_view_create.q
@@ -1,3 +1,4 @@
+--! qt:dataset:src
set hive.vectorized.execution.enabled=false;
create table cmv_basetable_n4 (a int, b varchar(256), c decimal(10,2));
@@ -40,3 +41,16 @@ drop materialized view cmv_mat_view2_n1;
drop materialized view cmv_mat_view3;
drop materialized view cmv_mat_view4;
drop materialized view cmv_mat_view5;
+
+-- ACID CMV
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.stats.autogather=false;
+
+create materialized view acid_cmv_part disable rewrite partitioned on (k)
+ stored as orc TBLPROPERTIES ('transactional'='true')
+ as select key k, value from src order by k limit 5;
+select k, value from acid_cmv_part;
+
+explain formatted
+select k, value from acid_cmv_part;
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out
index 105203d..1eb5b69 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out
@@ -292,3 +292,60 @@ POSTHOOK: query: drop materialized view cmv_mat_view5
POSTHOOK: type: DROP_MATERIALIZED_VIEW
POSTHOOK: Input: default@cmv_mat_view5
POSTHOOK: Output: default@cmv_mat_view5
+PREHOOK: query: create materialized view acid_cmv_part disable rewrite partitioned on (k)
+ stored as orc TBLPROPERTIES ('transactional'='true')
+ as select key k, value from src order by k limit 5
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@acid_cmv_part
+PREHOOK: Output: default@acid_cmv_part
+POSTHOOK: query: create materialized view acid_cmv_part disable rewrite partitioned on (k)
+ stored as orc TBLPROPERTIES ('transactional'='true')
+ as select key k, value from src order by k limit 5
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@acid_cmv_part
+POSTHOOK: Output: default@acid_cmv_part@k=0
+POSTHOOK: Output: default@acid_cmv_part@k=10
+POSTHOOK: Output: default@acid_cmv_part@k=100
+POSTHOOK: Lineage: acid_cmv_part PARTITION(k=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: acid_cmv_part PARTITION(k=100).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: acid_cmv_part PARTITION(k=10).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select k, value from acid_cmv_part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_cmv_part
+PREHOOK: Input: default@acid_cmv_part@k=0
+PREHOOK: Input: default@acid_cmv_part@k=10
+PREHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+POSTHOOK: query: select k, value from acid_cmv_part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_cmv_part
+POSTHOOK: Input: default@acid_cmv_part@k=0
+POSTHOOK: Input: default@acid_cmv_part@k=10
+POSTHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+0 val_0
+0 val_0
+0 val_0
+10 val_10
+100 val_100
+PREHOOK: query: explain formatted
+select k, value from acid_cmv_part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_cmv_part
+PREHOOK: Input: default@acid_cmv_part@k=0
+PREHOOK: Input: default@acid_cmv_part@k=10
+PREHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+POSTHOOK: query: explain formatted
+select k, value from acid_cmv_part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_cmv_part
+POSTHOOK: Input: default@acid_cmv_part@k=0
+POSTHOOK: Input: default@acid_cmv_part@k=10
+POSTHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+{"CBOPlan":"{\n \"rels\": [\n {\n \"id\": \"0\",\n \"relOp\": \"org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan\",\n \"table\": [\n \"default\",\n \"acid_cmv_part\"\n ],\n \"table:alias\": \"acid_cmv_part\",\n \"inputs\": [],\n \"rowCount\": 202.0,\n \"avgRowSize\": 200.0,\n \"rowType\": [\n {\n \"type\": \"VARCHAR\",\n \"nullable\": true,\n \"precision\": 2147483647,\n [...]