You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/19 13:02:10 UTC
[flink] branch master updated: [FLINK-28490][sql-parser] Introduce `ANALYZE TABLE` syntax in sql parser
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e54c86415b1 [FLINK-28490][sql-parser] Introduce `ANALYZE TABLE` syntax in sql parser
e54c86415b1 is described below
commit e54c86415b106f253b58c54670fe5b84b5c40f25
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jul 9 18:36:55 2022 +0800
[FLINK-28490][sql-parser] Introduce `ANALYZE TABLE` syntax in sql parser
This closes #20242
---
.../src/main/codegen/data/Parser.tdd | 9 ++
.../src/main/codegen/includes/parserImpls.ftl | 93 ++++++++++++++
.../flink/sql/parser/SqlPartitionSpecProperty.java | 99 +++++++++++++++
.../flink/sql/parser/ddl/SqlAnalyzeTable.java | 137 +++++++++++++++++++++
.../flink/sql/parser/FlinkSqlParserImplTest.java | 32 +++++
5 files changed, 370 insertions(+)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index e4dbf0256a2..05cf6ddffa8 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -71,6 +71,7 @@
"org.apache.flink.sql.parser.ddl.SqlUseDatabase"
"org.apache.flink.sql.parser.ddl.SqlUseModules"
"org.apache.flink.sql.parser.ddl.SqlWatermark"
+ "org.apache.flink.sql.parser.ddl.SqlAnalyzeTable"
"org.apache.flink.sql.parser.dml.RichSqlInsert"
"org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
"org.apache.flink.sql.parser.dml.SqlBeginStatementSet"
@@ -107,6 +108,7 @@
"org.apache.flink.sql.parser.utils.ParserResource"
"org.apache.flink.sql.parser.validate.FlinkSqlConformance"
"org.apache.flink.sql.parser.SqlProperty"
+ "org.apache.flink.sql.parser.SqlPartitionSpecProperty"
"org.apache.calcite.sql.SqlAlienSystemTypeNameSpec"
"org.apache.calcite.sql.SqlCreate"
"org.apache.calcite.sql.SqlDrop"
@@ -159,6 +161,9 @@
"WATERMARKS"
"TIMESTAMP_LTZ"
"TRY_CAST"
+ "ANALYZE"
+ "COMPUTE"
+ "STATISTICS"
]
# List of keywords from "keywords" section that are not reserved.
@@ -501,6 +506,9 @@
"PARTITIONS"
"TRY_CAST"
"VIRTUAL"
+ "ANALYZE"
+ "COMPUTE"
+ "STATISTICS"
]
# List of non-reserved keywords to remove;
@@ -546,6 +554,7 @@
"SqlShowJars()"
"SqlSet()"
"SqlReset()"
+ "SqlAnalyzeTable()"
]
# List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ac73adea8a4..d6d6284ca0e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2203,3 +2203,96 @@ SqlNode TryCastFunctionCall() :
return operator.createCall(s.end(this), args);
}
}
+
+/**
+* Parses a partition key/value,
+* e.g. p or p = '10'.
+*/
+SqlPartitionSpecProperty PartitionSpecProperty():
+{
+ final SqlParserPos pos;
+ final SqlIdentifier key;
+ SqlNode value = null;
+}
+{
+ key = SimpleIdentifier() { pos = getPos(); }
+ [
+ LOOKAHEAD(1)
+ <EQ> value = Literal()
+ ]
+ {
+ return new SqlPartitionSpecProperty(key, value, pos);
+ }
+}
+
+/**
+* Parses a partition specifications statement,
+* e.g. ANALYZE TABLE tbl1 partition(col1='val1', col2='val2') xxx
+* or
+* ANALYZE TABLE tbl1 partition(col1, col2) xxx.
+* or
+* ANALYZE TABLE tbl1 partition(col1='val1', col2) xxx.
+*/
+void ExtendedPartitionSpecCommaList(SqlNodeList list) :
+{
+ SqlPartitionSpecProperty property;
+}
+{
+ <LPAREN>
+ property = PartitionSpecProperty()
+ {
+ list.add(property);
+ }
+ (
+ <COMMA> property = PartitionSpecProperty()
+ {
+ list.add(property);
+ }
+ )*
+ <RPAREN>
+}
+
+/** Parses a comma-separated list of simple identifiers with position. */
+SqlNodeList SimpleIdentifierCommaListWithPosition() :
+{
+ final Span s;
+ final List<SqlNode> list = new ArrayList<SqlNode>();
+}
+{
+ { s = span(); }
+ SimpleIdentifierCommaList(list) {
+ return new SqlNodeList(list, s.end(this));
+ }
+}
+
+/** Parses an ANALYZE TABLE statement. */
+SqlNode SqlAnalyzeTable():
+{
+ final Span s;
+ final SqlIdentifier tableName;
+ SqlNodeList partitionSpec = SqlNodeList.EMPTY;
+ SqlNodeList columns = SqlNodeList.EMPTY;
+ boolean allColumns = false;
+}
+{
+ <ANALYZE> <TABLE> { s = span(); }
+ tableName = CompoundIdentifier()
+ [
+ <PARTITION> {
+ partitionSpec = new SqlNodeList(getPos());
+ ExtendedPartitionSpecCommaList(partitionSpec);
+ }
+ ]
+
+ <COMPUTE> <STATISTICS> [ <FOR>
+ (
+ <COLUMNS> { columns = SimpleIdentifierCommaListWithPosition(); }
+ |
+ <ALL> <COLUMNS> { allColumns = true; }
+ )
+ ]
+
+ {
+ return new SqlAnalyzeTable(s.end(this), tableName, partitionSpec, columns, allColumns);
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionSpecProperty.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionSpecProperty.java
new file mode 100644
index 00000000000..23daa801ec0
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionSpecProperty.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Properties of PartitionSpec, a key-value pair with key as component identifier and value as
+ * string literal. Different from {@link SqlProperty}, {@link SqlPartitionSpecProperty} allows the
+ * value is null.
+ */
+public class SqlPartitionSpecProperty extends SqlCall {
+
+ /** Use this operator only if you don't have a better one. */
+ protected static final SqlOperator OPERATOR = new SqlSpecialOperator("Pair", SqlKind.OTHER);
+
+ private final SqlIdentifier key;
+ private final @Nullable SqlNode value;
+
+ public SqlPartitionSpecProperty(SqlIdentifier key, @Nullable SqlNode value, SqlParserPos pos) {
+ super(pos);
+ this.key = requireNonNull(key, "Pair key is missing");
+ this.value = value;
+ }
+
+ public SqlIdentifier getKey() {
+ return key;
+ }
+
+ @Nullable
+ public SqlNode getValue() {
+ return value;
+ }
+
+ public String getKeyString() {
+ return key.toString();
+ }
+
+ @Nullable
+ public String getValueString() {
+ return value != null ? ((NlsString) SqlLiteral.value(value)).getValue() : null;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ if (value != null) {
+ return ImmutableNullableList.of(key, value);
+ } else {
+ return ImmutableNullableList.of(key);
+ }
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ key.unparse(writer, leftPrec, rightPrec);
+ if (value != null) {
+ writer.keyword("=");
+ value.unparse(writer, leftPrec, rightPrec);
+ }
+ }
+}
+
+// End SqlPartitionSpecProperty.java
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
new file mode 100644
index 00000000000..4d533fc3d80
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionSpecProperty;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nonnull;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** ANALYZE TABLE to compute the statistics for a given table. */
+public class SqlAnalyzeTable extends SqlCall {
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("ANALYZE TABLE", SqlKind.OTHER_DDL);
+
+ private final SqlIdentifier tableName;
+ private final SqlNodeList partitions;
+ private final SqlNodeList columns;
+ private final boolean allColumns;
+
+ public SqlAnalyzeTable(
+ SqlParserPos pos,
+ SqlIdentifier tableName,
+ SqlNodeList partitions,
+ SqlNodeList columns,
+ boolean allColumns) {
+ super(pos);
+ this.tableName = requireNonNull(tableName, "tableName is null");
+ this.partitions = requireNonNull(partitions, "partitions is null");
+ this.columns = requireNonNull(columns, "columns is null");
+ this.allColumns = allColumns;
+ }
+
+ public String[] fullTableName() {
+ return tableName.names.toArray(new String[0]);
+ }
+
+ /**
+ * Get partition spec as key-value strings, if only partition key is given, the corresponding
+ * value is null.
+ */
+ public LinkedHashMap<String, String> getPartitions() {
+ LinkedHashMap<String, String> ret = new LinkedHashMap<>();
+ for (SqlNode node : partitions.getList()) {
+ SqlPartitionSpecProperty property = (SqlPartitionSpecProperty) node;
+ final String value;
+ if (property.getValue() == null) {
+ value = null;
+ } else {
+ Comparable<?> comparable = SqlLiteral.value(property.getValue());
+ value =
+ comparable instanceof NlsString
+ ? ((NlsString) comparable).getValue()
+ : comparable.toString();
+ }
+
+ ret.put(property.getKey().getSimple(), value);
+ }
+ return ret;
+ }
+
+ public String[] getColumnNames() {
+ return columns.getList().stream()
+ .map(col -> ((SqlIdentifier) col).getSimple())
+ .toArray(String[]::new);
+ }
+
+ public boolean isAllColumns() {
+ return allColumns;
+ }
+
+ @Nonnull
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(tableName, partitions, columns);
+ }
+
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("ANALYZE TABLE");
+ final int opLeft = getOperator().getLeftPrec();
+ final int opRight = getOperator().getRightPrec();
+ tableName.unparse(writer, opLeft, opRight);
+
+ if (partitions.size() > 0) {
+ writer.keyword("PARTITION");
+ partitions.unparse(writer, opLeft, opRight);
+ }
+
+ writer.keyword("COMPUTE STATISTICS");
+
+ if (allColumns) {
+ writer.keyword("FOR ALL COLUMNS");
+ } else if (columns.size() > 0) {
+ writer.keyword("FOR COLUMNS");
+ // use 0 to disable parentheses
+ columns.unparse(writer, 0, 0);
+ }
+ }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 48a15aa75bd..3408ddf1443 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1856,6 +1856,38 @@ class FlinkSqlParserImplTest extends SqlParserTest {
"TRY_CAST(`A` AS ROW(`F0` INTEGER ARRAY, `F1` MAP< STRING, DECIMAL(10, 2) >, `F2` STRING NOT NULL))");
}
+ @Test
+ void testAnalyzeTable() {
+ sql("analyze table emp^s^").fails("(?s).*Encountered \"<EOF>\" at line 1, column 18.\n.*");
+ sql("analyze table emps compute statistics").ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS");
+ sql("analyze table emps partition ^compute^ statistics")
+ .fails("(?s).*Encountered \"compute\" at line 1, column 30.\n.*");
+ sql("analyze table emps partition(^)^ compute statistics")
+ .fails("(?s).*Encountered \"\\)\" at line 1, column 30.\n.*");
+ sql("analyze table emps partition(x='ab') compute statistics")
+ .ok("ANALYZE TABLE `EMPS` PARTITION (`X` = 'ab') COMPUTE STATISTICS");
+ sql("analyze table emps partition(x='ab', y='bc') compute statistics")
+ .ok("ANALYZE TABLE `EMPS` PARTITION (`X` = 'ab', `Y` = 'bc') COMPUTE STATISTICS");
+ sql("analyze table emps compute statistics for column^s^")
+ .fails("(?s).*Encountered \"<EOF>\" at line 1, column 49.\n.*");
+ sql("analyze table emps compute statistics for columns a")
+ .ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS FOR COLUMNS `A`");
+ sql("analyze table emps compute statistics for columns a, b")
+ .ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS FOR COLUMNS `A`, `B`");
+ sql("analyze table emps compute statistics for all columns")
+ .ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS FOR ALL COLUMNS");
+ sql("analyze table emps partition(x, y) compute statistics for all columns")
+ .ok("ANALYZE TABLE `EMPS` PARTITION (`X`, `Y`) COMPUTE STATISTICS FOR ALL COLUMNS");
+ sql("analyze table emps partition(x='ab', y) compute statistics for all columns")
+ .ok(
+ "ANALYZE TABLE `EMPS` PARTITION (`X` = 'ab', `Y`) COMPUTE STATISTICS FOR ALL COLUMNS");
+ sql("analyze table emps partition(x, y='cd') compute statistics for all columns")
+ .ok(
+ "ANALYZE TABLE `EMPS` PARTITION (`X`, `Y` = 'cd') COMPUTE STATISTICS FOR ALL COLUMNS");
+ sql("analyze table emps partition(x=^,^ y) compute statistics for all columns")
+ .fails("(?s).*Encountered \"\\,\" at line 1, column 32.\n.*");
+ }
+
public static BaseMatcher<SqlNode> validated(String validatedSql) {
return new TypeSafeDiagnosingMatcher<SqlNode>() {
@Override