You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2019/10/24 05:42:58 UTC

[hive] branch master updated: HIVE-22396 : CMV creating a Full ACID partitioned table fails because of no writeId (Jesus Camacho Rodriguez via Ashutosh Chauhan)

This is an automated email from the ASF dual-hosted git repository.

hashutosh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new c69bcdc  HIVE-22396 : CMV creating a Full ACID partitioned table fails because of no writeId (Jesus Camacho Rodriguez via Ashutosh Chauhan)
c69bcdc is described below

commit c69bcdc848e9974e8eb3ea6eb60d96680069f1b8
Author: Jesus Camacho Rodriguez <jc...@apache.org>
AuthorDate: Wed Oct 23 22:42:08 2019 -0700

    HIVE-22396 : CMV creating a Full ACID partitioned table fails because of no writeId (Jesus Camacho Rodriguez via Ashutosh Chauhan)
    
    Signed-off-by: Ashutosh Chauhan <ha...@apache.org>
---
 .../hive/ql/ddl/view/create/CreateViewDesc.java    | 23 +++++++
 .../hadoop/hive/ql/parse/SemanticAnalyzer.java     | 76 +++++++++++++---------
 .../apache/hadoop/hive/ql/parse/TaskCompiler.java  | 21 +++---
 .../clientpositive/materialized_view_create.q      | 14 ++++
 .../llap/materialized_view_create.q.out            | 57 ++++++++++++++++
 5 files changed, 152 insertions(+), 39 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
index 1f30478..d1f3694 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/view/create/CreateViewDesc.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.Explain;
 import org.apache.hadoop.hive.ql.plan.Explain.Level;
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -77,6 +78,10 @@ public class CreateViewDesc implements DDLDesc, Serializable {
   private List<String> distributeColNames;  // only used for materialized views
   private List<FieldSchema> distributeCols;  // only used for materialized views
   private ReplicationSpec replicationSpec = null;
+  private Long initialMmWriteId; // Initial MM write ID for CMV and import.
+  // The FSOP configuration for the FSOP that is going to write initial data during cmv.
+  // This is not needed beyond compilation, so it is transient.
+  private transient FileSinkDesc writer;
   private String ownerName = null;
 
   /**
@@ -458,6 +463,24 @@ public class CreateViewDesc implements DDLDesc, Serializable {
     return tbl;
   }
 
+  public void setInitialMmWriteId(Long mmWriteId) {
+    this.initialMmWriteId = mmWriteId;
+  }
+
+  public Long getInitialMmWriteId() {
+    return initialMmWriteId;
+  }
+
+  public FileSinkDesc getAndUnsetWriter() {
+    FileSinkDesc fsd = writer;
+    writer = null;
+    return fsd;
+  }
+
+  public void setWriter(FileSinkDesc writer) {
+    this.writer = writer;
+  }
+
   public void setOwnerName(String ownerName) {
     this.ownerName = ownerName;
   }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
index 30d3791..2257cc1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
@@ -7296,7 +7296,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     LoadTableDesc ltd = null;
     ListBucketingCtx lbCtx = null;
     Map<String, String> partSpec = null;
-    boolean isMmTable = false, isMmCtas = false;
+    boolean isMmTable = false, isMmCreate = false;
     Long writeId = null;
     HiveTxnManager txnMgr = getTxnMgr();
 
@@ -7568,6 +7568,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
       List<ColumnInfo> fileSinkColInfos = null;
       List<ColumnInfo> sortColInfos = null;
       List<ColumnInfo> distributeColInfos = null;
+      String dbName = null;
+      String tableName = null;
+      Map<String, String> tblProps = null;
       CreateTableDesc tblDesc = qb.getTableDesc();
       CreateViewDesc viewDesc = qb.getViewDesc();
       if (tblDesc != null) {
@@ -7577,31 +7580,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         fileSinkColInfos = new ArrayList<>();
         destTableIsTemporary = tblDesc.isTemporary();
         destTableIsMaterialization = tblDesc.isMaterialization();
-        if (AcidUtils.isTablePropertyTransactional(tblDesc.getTblProps())) {
-          try {
-            if (ctx.getExplainConfig() != null) {
-              writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
-            } else {
-              String dbName = tblDesc.getDatabaseName();
-              String tableName = tblDesc.getTableName();
-
-              // CreateTableDesc stores table name as db.table. So, need to decode it before allocating
-              // write id.
-              if (tableName.contains(".")) {
-                String[] names = Utilities.getDbTableName(tableName);
-                dbName = names[0];
-                tableName = names[1];
-              }
-              writeId = txnMgr.getTableWriteId(dbName, tableName);
-            }
-          } catch (LockException ex) {
-            throw new SemanticException("Failed to allocate write Id", ex);
-          }
-          if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) {
-            isMmTable = isMmCtas = true;
-            tblDesc.setInitialMmWriteId(writeId);
-          }
-        }
+        dbName = tblDesc.getDatabaseName();
+        tableName = tblDesc.getTableName();
+        // CreateTableDesc stores table name as db.table. So, need to decode it before allocating
+        // write id.
+        if (tableName.contains(".")) {
+          String[] names = Utilities.getDbTableName(tableName);
+          dbName = names[0];
+          tableName = names[1];
+        }
+        tblProps = tblDesc.getTblProps();
       } else if (viewDesc != null) {
         fieldSchemas = new ArrayList<>();
         partitionColumns = new ArrayList<>();
@@ -7615,6 +7603,30 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
         distributeColInfos = new ArrayList<>();
         destTableIsTemporary = false;
         destTableIsMaterialization = false;
+        String[] names = Utilities.getDbTableName(viewDesc.getViewName());
+        dbName = names[0];
+        tableName = names[1];
+        tblProps = viewDesc.getTblProps();
+      }
+
+      if (tblProps != null && AcidUtils.isTablePropertyTransactional(tblProps)) {
+        try {
+          if (ctx.getExplainConfig() != null) {
+            writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id
+          } else {
+            writeId = txnMgr.getTableWriteId(dbName, tableName);
+          }
+        } catch (LockException ex) {
+          throw new SemanticException("Failed to allocate write Id", ex);
+        }
+        if (AcidUtils.isInsertOnlyTable(tblProps, true)) {
+          isMmTable = isMmCreate = true;
+          if (tblDesc != null) {
+            tblDesc.setInitialMmWriteId(writeId);
+          } else {
+            viewDesc.setInitialMmWriteId(writeId);
+          }
+        }
       }
 
       if (isLocal) {
@@ -7797,7 +7809,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
             colTypes,
             destTableIsFullAcid ?//there is a change here - prev version had 'transactional', one before 'acid'
                 Operation.INSERT : Operation.NOT_ACID,
-            isMmCtas));
+            isMmCreate));
         if (!outputs.add(new WriteEntity(destinationPath, !isDfsDir, isDestTempFile))) {
           throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES
               .getMsg(destinationPath.toUri().toString()));
@@ -7855,10 +7867,14 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer {
     FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition,
         destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid
         destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS,
-        canBeMerged, destinationTable, writeId, isMmCtas, destType, qb);
-    if (isMmCtas) {
+        canBeMerged, destinationTable, writeId, isMmCreate, destType, qb);
+    if (isMmCreate) {
       // Add FSD so that the LoadTask compilation could fix up its path to avoid the move.
-      tableDesc.setWriter(fileSinkDesc);
+      if (tableDesc != null) {
+        tableDesc.setWriter(fileSinkDesc);
+      } else {
+        createVwDesc.setWriter(fileSinkDesc);
+      }
     }
 
     if (fileSinkDesc.getInsertOverwrite()) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
index cdd212c..ec46280 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java
@@ -446,22 +446,25 @@ public abstract class TaskCompiler {
   private void setLoadFileLocation(
       final ParseContext pCtx, LoadFileDesc lfd) throws SemanticException {
     // CTAS; make the movetask's destination directory the table's destination.
-    Long txnIdForCtas = null;
-    int stmtId = 0; // CTAS cannot be part of multi-txn stmt
-    FileSinkDesc dataSinkForCtas = null;
+    Long txnId = null;
+    int stmtId = 0; // CTAS or CMV cannot be part of multi-txn stmt
+    FileSinkDesc dataSink = null;
     String loc = null;
     if (pCtx.getQueryProperties().isCTAS()) {
       CreateTableDesc ctd = pCtx.getCreateTable();
-      dataSinkForCtas = ctd.getAndUnsetWriter();
-      txnIdForCtas = ctd.getInitialMmWriteId();
+      dataSink = ctd.getAndUnsetWriter();
+      txnId = ctd.getInitialMmWriteId();
       loc = ctd.getLocation();
     } else {
-      loc = pCtx.getCreateViewDesc().getLocation();
+      CreateViewDesc cmv = pCtx.getCreateViewDesc();
+      dataSink = cmv.getAndUnsetWriter();
+      txnId = cmv.getInitialMmWriteId();
+      loc = cmv.getLocation();
     }
     Path location = (loc == null) ? getDefaultCtasLocation(pCtx) : new Path(loc);
-    if (txnIdForCtas != null) {
-      dataSinkForCtas.setDirName(location);
-      location = new Path(location, AcidUtils.deltaSubdir(txnIdForCtas, txnIdForCtas, stmtId));
+    if (txnId != null) {
+      dataSink.setDirName(location);
+      location = new Path(location, AcidUtils.deltaSubdir(txnId, txnId, stmtId));
       lfd.setSourcePath(location);
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("Setting MM CTAS to " + location);
diff --git a/ql/src/test/queries/clientpositive/materialized_view_create.q b/ql/src/test/queries/clientpositive/materialized_view_create.q
index c65cde5..77cc60f 100644
--- a/ql/src/test/queries/clientpositive/materialized_view_create.q
+++ b/ql/src/test/queries/clientpositive/materialized_view_create.q
@@ -1,3 +1,4 @@
+--! qt:dataset:src
 set hive.vectorized.execution.enabled=false;
 create table cmv_basetable_n4 (a int, b varchar(256), c decimal(10,2));
 
@@ -40,3 +41,16 @@ drop materialized view cmv_mat_view2_n1;
 drop materialized view cmv_mat_view3;
 drop materialized view cmv_mat_view4;
 drop materialized view cmv_mat_view5;
+
+-- ACID CMV
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+set hive.stats.autogather=false;
+
+create materialized view acid_cmv_part disable rewrite partitioned on (k)
+  stored as orc TBLPROPERTIES ('transactional'='true')
+  as select key k, value from src order by k limit 5;
+select k, value from acid_cmv_part;
+
+explain formatted
+select k, value from acid_cmv_part;
diff --git a/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out b/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out
index 105203d..1eb5b69 100644
--- a/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out
+++ b/ql/src/test/results/clientpositive/llap/materialized_view_create.q.out
@@ -292,3 +292,60 @@ POSTHOOK: query: drop materialized view cmv_mat_view5
 POSTHOOK: type: DROP_MATERIALIZED_VIEW
 POSTHOOK: Input: default@cmv_mat_view5
 POSTHOOK: Output: default@cmv_mat_view5
+PREHOOK: query: create materialized view acid_cmv_part disable rewrite partitioned on (k)
+  stored as orc TBLPROPERTIES ('transactional'='true')
+  as select key k, value from src order by k limit 5
+PREHOOK: type: CREATE_MATERIALIZED_VIEW
+PREHOOK: Input: default@src
+PREHOOK: Output: database:default
+PREHOOK: Output: default@acid_cmv_part
+PREHOOK: Output: default@acid_cmv_part
+POSTHOOK: query: create materialized view acid_cmv_part disable rewrite partitioned on (k)
+  stored as orc TBLPROPERTIES ('transactional'='true')
+  as select key k, value from src order by k limit 5
+POSTHOOK: type: CREATE_MATERIALIZED_VIEW
+POSTHOOK: Input: default@src
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@acid_cmv_part
+POSTHOOK: Output: default@acid_cmv_part@k=0
+POSTHOOK: Output: default@acid_cmv_part@k=10
+POSTHOOK: Output: default@acid_cmv_part@k=100
+POSTHOOK: Lineage: acid_cmv_part PARTITION(k=0).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: acid_cmv_part PARTITION(k=100).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+POSTHOOK: Lineage: acid_cmv_part PARTITION(k=10).value SIMPLE [(src)src.FieldSchema(name:value, type:string, comment:default), ]
+PREHOOK: query: select k, value from acid_cmv_part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_cmv_part
+PREHOOK: Input: default@acid_cmv_part@k=0
+PREHOOK: Input: default@acid_cmv_part@k=10
+PREHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+POSTHOOK: query: select k, value from acid_cmv_part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_cmv_part
+POSTHOOK: Input: default@acid_cmv_part@k=0
+POSTHOOK: Input: default@acid_cmv_part@k=10
+POSTHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+0	val_0
+0	val_0
+0	val_0
+10	val_10
+100	val_100
+PREHOOK: query: explain formatted
+select k, value from acid_cmv_part
+PREHOOK: type: QUERY
+PREHOOK: Input: default@acid_cmv_part
+PREHOOK: Input: default@acid_cmv_part@k=0
+PREHOOK: Input: default@acid_cmv_part@k=10
+PREHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+POSTHOOK: query: explain formatted
+select k, value from acid_cmv_part
+POSTHOOK: type: QUERY
+POSTHOOK: Input: default@acid_cmv_part
+POSTHOOK: Input: default@acid_cmv_part@k=0
+POSTHOOK: Input: default@acid_cmv_part@k=10
+POSTHOOK: Input: default@acid_cmv_part@k=100
+#### A masked pattern was here ####
+{"CBOPlan":"{\n  \"rels\": [\n    {\n      \"id\": \"0\",\n      \"relOp\": \"org.apache.hadoop.hive.ql.optimizer.calcite.reloperators.HiveTableScan\",\n      \"table\": [\n        \"default\",\n        \"acid_cmv_part\"\n      ],\n      \"table:alias\": \"acid_cmv_part\",\n      \"inputs\": [],\n      \"rowCount\": 202.0,\n      \"avgRowSize\": 200.0,\n      \"rowType\": [\n        {\n          \"type\": \"VARCHAR\",\n          \"nullable\": true,\n          \"precision\": 2147483647,\n [...]