You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/22 14:51:28 UTC

[incubator-doris] 03/13: [Feature] CTAS support insert data (#9271)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 733fb3a92af8b0180e8e52f49d7a390479c608d7
Author: Stalary <45...@qq.com>
AuthorDate: Sat May 7 08:51:54 2022 +0800

    [Feature] CTAS support insert data (#9271)
---
 .../doris/analysis/CreateTableAsSelectStmt.java    | 44 +++++++++++-----------
 .../java/org/apache/doris/catalog/Catalog.java     |  2 +-
 .../java/org/apache/doris/qe/StmtExecutor.java     | 36 +++++++++++++++++-
 3 files changed, 58 insertions(+), 24 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
index a5d17632f6..235e497e80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
@@ -21,28 +21,42 @@ import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
 import org.apache.doris.common.UserException;
 
+import lombok.Getter;
+
 import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Represents a CREATE TABLE AS SELECT (CTAS) statement
- *  Syntax:
- *      CREATE TABLE table_name [( column_name_list )]
- *          opt_engine opt_partition opt_properties KW_AS query_stmt
+ * Represents a CREATE TABLE AS SELECT (CTAS) statement.
+ * Syntax:
+ * CREATE TABLE table_name [( column_name_list )]
+ * opt_engine opt_partition opt_properties KW_AS query_stmt
  */
 public class CreateTableAsSelectStmt extends DdlStmt {
+
+    @Getter
     private final CreateTableStmt createTableStmt;
+
+    @Getter
     private final List<String> columnNames;
+
+    @Getter
     private QueryStmt queryStmt;
-    
-    public CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
-                                   List<String> columnNames, QueryStmt queryStmt) {
+
+    @Getter
+    private final InsertStmt insertStmt;
+
+    protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
+                                      List<String> columnNames, QueryStmt queryStmt) {
         this.createTableStmt = createTableStmt;
         this.columnNames = columnNames;
         this.queryStmt = queryStmt;
-        // Insert is not currently supported
+        this.insertStmt = new InsertStmt(createTableStmt.getDbTbl(), queryStmt.clone());
     }
-    
+
+    /**
+     * Cannot analyze insertStmt because the table has not been created yet.
+     */
     @Override
     public void analyze(Analyzer analyzer) throws UserException {
         // first: we analyze queryStmt before create table.
@@ -63,16 +77,4 @@ public class CreateTableAsSelectStmt extends DdlStmt {
             ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NUMBER_NOT_MATCH);
         }
     }
-    
-    public CreateTableStmt getCreateTableStmt() {
-        return createTableStmt;
-    }
-    
-    public List<String> getColumnNames() {
-        return columnNames;
-    }
-    
-    public QueryStmt getQueryStmt() {
-        return queryStmt;
-    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 3bb59eac55..ffded947bf 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3146,7 +3146,7 @@ public class Catalog {
             createTableStmt.analyze(dummyRootAnalyzer);
             createTable(createTableStmt);
         } catch (UserException e) {
-            throw new DdlException("Failed to execute CREATE TABLE AS SELECT Reason: " + e.getMessage());
+            throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index e913f9cc96..6ba7364f1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.CreateTableAsSelectStmt;
 import org.apache.doris.analysis.DdlStmt;
+import org.apache.doris.analysis.DropTableStmt;
 import org.apache.doris.analysis.EnterStmt;
 import org.apache.doris.analysis.ExplainOptions;
 import org.apache.doris.analysis.ExportStmt;
@@ -414,6 +415,8 @@ public class StmtExecutor implements ProfileWriter {
                 handleUseStmt();
             } else if (parsedStmt instanceof TransactionStmt) {
                 handleTransactionStmt();
+            } else if (parsedStmt instanceof CreateTableAsSelectStmt) {
+                handleCtasStmt();
             } else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass
                 try {
                     handleInsertStmt();
@@ -1237,7 +1240,6 @@ public class StmtExecutor implements ProfileWriter {
         context.getMysqlChannel().reset();
         // create plan
         InsertStmt insertStmt = (InsertStmt) parsedStmt;
-
         if (insertStmt.getQueryStmt().hasOutFileClause()) {
             throw new DdlException("Not support OUTFILE clause in INSERT statement");
         }
@@ -1558,6 +1560,37 @@ public class StmtExecutor implements ProfileWriter {
         context.getCatalog().getExportMgr().addExportJob(exportStmt);
     }
 
+    private void handleCtasStmt() {
+        CreateTableAsSelectStmt ctasStmt = (CreateTableAsSelectStmt) this.parsedStmt;
+        try {
+            // create table
+            DdlExecutor.execute(context.getCatalog(), ctasStmt);
+            context.getState().setOk();
+        }  catch (Exception e) {
+            // Maybe our bug
+            LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
+            context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
+        }
+        // after success create table insert data
+        if (MysqlStateType.OK.equals(context.getState().getStateType())) {
+            try {
+                parsedStmt = ctasStmt.getInsertStmt();
+                execute();
+            } catch (Exception e) {
+                LOG.warn("CTAS insert data error, stmt={}", parsedStmt.toSql(), e);
+                // insert error drop table
+                DropTableStmt dropTableStmt = new DropTableStmt(true, ctasStmt.getCreateTableStmt().getDbTbl(), true);
+                try {
+                    DdlExecutor.execute(context.getCatalog(), dropTableStmt);
+                } catch (Exception ex) {
+                    LOG.warn("CTAS drop table error, stmt={}", parsedStmt.toSql(), ex);
+                    context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
+                            "Unexpected exception: " + ex.getMessage());
+                }
+            }
+        }
+    }
+
     public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
         if (statisticsForAuditLog == null) {
             statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
@@ -1581,4 +1614,3 @@ public class StmtExecutor implements ProfileWriter {
         return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList());
     }
 }
-


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org