You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/22 14:51:28 UTC
[incubator-doris] 03/13: [Feature] CTAS support insert data (#9271)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit 733fb3a92af8b0180e8e52f49d7a390479c608d7
Author: Stalary <45...@qq.com>
AuthorDate: Sat May 7 08:51:54 2022 +0800
[Feature] CTAS support insert data (#9271)
---
.../doris/analysis/CreateTableAsSelectStmt.java | 44 +++++++++++-----------
.../java/org/apache/doris/catalog/Catalog.java | 2 +-
.../java/org/apache/doris/qe/StmtExecutor.java | 36 +++++++++++++++++-
3 files changed, 58 insertions(+), 24 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
index a5d17632f6..235e497e80 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateTableAsSelectStmt.java
@@ -21,28 +21,42 @@ import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.UserException;
+import lombok.Getter;
+
import java.util.ArrayList;
import java.util.List;
/**
- * Represents a CREATE TABLE AS SELECT (CTAS) statement
- * Syntax:
- * CREATE TABLE table_name [( column_name_list )]
- * opt_engine opt_partition opt_properties KW_AS query_stmt
+ * Represents a CREATE TABLE AS SELECT (CTAS) statement.
+ * Syntax:
+ * CREATE TABLE table_name [( column_name_list )]
+ * opt_engine opt_partition opt_properties KW_AS query_stmt
*/
public class CreateTableAsSelectStmt extends DdlStmt {
+
+ @Getter
private final CreateTableStmt createTableStmt;
+
+ @Getter
private final List<String> columnNames;
+
+ @Getter
private QueryStmt queryStmt;
-
- public CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
- List<String> columnNames, QueryStmt queryStmt) {
+
+ @Getter
+ private final InsertStmt insertStmt;
+
+ protected CreateTableAsSelectStmt(CreateTableStmt createTableStmt,
+ List<String> columnNames, QueryStmt queryStmt) {
this.createTableStmt = createTableStmt;
this.columnNames = columnNames;
this.queryStmt = queryStmt;
- // Insert is not currently supported
+ this.insertStmt = new InsertStmt(createTableStmt.getDbTbl(), queryStmt.clone());
}
-
+
+ /**
+ * Cannot analyze insertStmt because the table has not been created yet.
+ */
@Override
public void analyze(Analyzer analyzer) throws UserException {
// first: we analyze queryStmt before create table.
@@ -63,16 +77,4 @@ public class CreateTableAsSelectStmt extends DdlStmt {
ErrorReport.reportAnalysisException(ErrorCode.ERR_COL_NUMBER_NOT_MATCH);
}
}
-
- public CreateTableStmt getCreateTableStmt() {
- return createTableStmt;
- }
-
- public List<String> getColumnNames() {
- return columnNames;
- }
-
- public QueryStmt getQueryStmt() {
- return queryStmt;
- }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
index 3bb59eac55..ffded947bf 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java
@@ -3146,7 +3146,7 @@ public class Catalog {
createTableStmt.analyze(dummyRootAnalyzer);
createTable(createTableStmt);
} catch (UserException e) {
- throw new DdlException("Failed to execute CREATE TABLE AS SELECT Reason: " + e.getMessage());
+ throw new DdlException("Failed to execute CTAS Reason: " + e.getMessage());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index e913f9cc96..6ba7364f1a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -20,6 +20,7 @@ package org.apache.doris.qe;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
import org.apache.doris.analysis.DdlStmt;
+import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.EnterStmt;
import org.apache.doris.analysis.ExplainOptions;
import org.apache.doris.analysis.ExportStmt;
@@ -414,6 +415,8 @@ public class StmtExecutor implements ProfileWriter {
handleUseStmt();
} else if (parsedStmt instanceof TransactionStmt) {
handleTransactionStmt();
+ } else if (parsedStmt instanceof CreateTableAsSelectStmt) {
+ handleCtasStmt();
} else if (parsedStmt instanceof InsertStmt) { // Must ahead of DdlStmt because InserStmt is its subclass
try {
handleInsertStmt();
@@ -1237,7 +1240,6 @@ public class StmtExecutor implements ProfileWriter {
context.getMysqlChannel().reset();
// create plan
InsertStmt insertStmt = (InsertStmt) parsedStmt;
-
if (insertStmt.getQueryStmt().hasOutFileClause()) {
throw new DdlException("Not support OUTFILE clause in INSERT statement");
}
@@ -1558,6 +1560,37 @@ public class StmtExecutor implements ProfileWriter {
context.getCatalog().getExportMgr().addExportJob(exportStmt);
}
+ private void handleCtasStmt() {
+ CreateTableAsSelectStmt ctasStmt = (CreateTableAsSelectStmt) this.parsedStmt;
+ try {
+ // create table
+ DdlExecutor.execute(context.getCatalog(), ctasStmt);
+ context.getState().setOk();
+ } catch (Exception e) {
+ // Maybe our bug
+ LOG.warn("CTAS create table error, stmt={}", originStmt.originStmt, e);
+ context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage());
+ }
+ // after success create table insert data
+ if (MysqlStateType.OK.equals(context.getState().getStateType())) {
+ try {
+ parsedStmt = ctasStmt.getInsertStmt();
+ execute();
+ } catch (Exception e) {
+ LOG.warn("CTAS insert data error, stmt={}", parsedStmt.toSql(), e);
+ // insert error drop table
+ DropTableStmt dropTableStmt = new DropTableStmt(true, ctasStmt.getCreateTableStmt().getDbTbl(), true);
+ try {
+ DdlExecutor.execute(context.getCatalog(), dropTableStmt);
+ } catch (Exception ex) {
+ LOG.warn("CTAS drop table error, stmt={}", parsedStmt.toSql(), ex);
+ context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR,
+ "Unexpected exception: " + ex.getMessage());
+ }
+ }
+ }
+ }
+
public Data.PQueryStatistics getQueryStatisticsForAuditLog() {
if (statisticsForAuditLog == null) {
statisticsForAuditLog = Data.PQueryStatistics.newBuilder();
@@ -1581,4 +1614,3 @@ public class StmtExecutor implements ProfileWriter {
return exprs.stream().map(e -> e.getType().getPrimitiveType()).collect(Collectors.toList());
}
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org