You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/07/19 13:02:10 UTC

[flink] branch master updated: [FLINK-28490][sql-parser] Introduce `ANALYZE TABLE` syntax in sql parser

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e54c86415b1 [FLINK-28490][sql-parser] Introduce `ANALYZE TABLE` syntax in sql parser
e54c86415b1 is described below

commit e54c86415b106f253b58c54670fe5b84b5c40f25
Author: godfreyhe <go...@163.com>
AuthorDate: Sat Jul 9 18:36:55 2022 +0800

    [FLINK-28490][sql-parser] Introduce `ANALYZE TABLE` syntax in sql parser
    
    This closes #20242
---
 .../src/main/codegen/data/Parser.tdd               |   9 ++
 .../src/main/codegen/includes/parserImpls.ftl      |  93 ++++++++++++++
 .../flink/sql/parser/SqlPartitionSpecProperty.java |  99 +++++++++++++++
 .../flink/sql/parser/ddl/SqlAnalyzeTable.java      | 137 +++++++++++++++++++++
 .../flink/sql/parser/FlinkSqlParserImplTest.java   |  32 +++++
 5 files changed, 370 insertions(+)

diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index e4dbf0256a2..05cf6ddffa8 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -71,6 +71,7 @@
     "org.apache.flink.sql.parser.ddl.SqlUseDatabase"
     "org.apache.flink.sql.parser.ddl.SqlUseModules"
     "org.apache.flink.sql.parser.ddl.SqlWatermark"
+    "org.apache.flink.sql.parser.ddl.SqlAnalyzeTable"
     "org.apache.flink.sql.parser.dml.RichSqlInsert"
     "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword"
     "org.apache.flink.sql.parser.dml.SqlBeginStatementSet"
@@ -107,6 +108,7 @@
     "org.apache.flink.sql.parser.utils.ParserResource"
     "org.apache.flink.sql.parser.validate.FlinkSqlConformance"
     "org.apache.flink.sql.parser.SqlProperty"
+    "org.apache.flink.sql.parser.SqlPartitionSpecProperty"
     "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec"
     "org.apache.calcite.sql.SqlCreate"
     "org.apache.calcite.sql.SqlDrop"
@@ -159,6 +161,9 @@
     "WATERMARKS"
     "TIMESTAMP_LTZ"
     "TRY_CAST"
+    "ANALYZE"
+    "COMPUTE"
+    "STATISTICS"
   ]
 
   # List of keywords from "keywords" section that are not reserved.
@@ -501,6 +506,9 @@
     "PARTITIONS"
     "TRY_CAST"
     "VIRTUAL"
+    "ANALYZE"
+    "COMPUTE"
+    "STATISTICS"
   ]
 
   # List of non-reserved keywords to remove;
@@ -546,6 +554,7 @@
     "SqlShowJars()"
     "SqlSet()"
     "SqlReset()"
+    "SqlAnalyzeTable()"
   ]
 
   # List of methods for parsing custom literals.
diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index ac73adea8a4..d6d6284ca0e 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2203,3 +2203,96 @@ SqlNode TryCastFunctionCall() :
         return operator.createCall(s.end(this), args);
     }
 }
+
+/**
+* Parses a partition key/value,
+* e.g. p or p = '10'.
+*/
+SqlPartitionSpecProperty PartitionSpecProperty():
+{
+    final SqlParserPos pos;
+    final SqlIdentifier key;
+    SqlNode value = null;
+}
+{
+    key = SimpleIdentifier() { pos = getPos(); }
+    [
+        LOOKAHEAD(1)
+        <EQ> value = Literal()
+    ]
+    {
+        return new SqlPartitionSpecProperty(key, value, pos);
+    }
+}
+
+/**
+* Parses a partition specifications statement,
+* e.g. ANALYZE TABLE tbl1 partition(col1='val1', col2='val2') xxx
+* or
+* ANALYZE TABLE tbl1 partition(col1, col2) xxx.
+* or
+* ANALYZE TABLE tbl1 partition(col1='val1', col2) xxx.
+*/
+void ExtendedPartitionSpecCommaList(SqlNodeList list) :
+{
+    SqlPartitionSpecProperty property;
+}
+{
+    <LPAREN>
+    property = PartitionSpecProperty()
+    {
+       list.add(property);
+    }
+    (
+        <COMMA> property = PartitionSpecProperty()
+        {
+            list.add(property);
+        }
+    )*
+    <RPAREN>
+}
+
+/** Parses a comma-separated list of simple identifiers with position. */
+SqlNodeList SimpleIdentifierCommaListWithPosition() :
+{
+    final Span s;
+    final List<SqlNode> list = new ArrayList<SqlNode>();
+}
+{
+    { s = span(); }
+    SimpleIdentifierCommaList(list) {
+        return new SqlNodeList(list, s.end(this));
+    }
+}
+
+/** Parses an ANALYZE TABLE statement. */
+SqlNode SqlAnalyzeTable():
+{
+       final Span s;
+       final SqlIdentifier tableName;
+       SqlNodeList partitionSpec = SqlNodeList.EMPTY;
+       SqlNodeList columns = SqlNodeList.EMPTY;
+       boolean allColumns = false;
+}
+{
+    <ANALYZE> <TABLE> { s = span(); }
+    tableName = CompoundIdentifier()
+    [
+        <PARTITION> {
+            partitionSpec = new SqlNodeList(getPos());
+            ExtendedPartitionSpecCommaList(partitionSpec);
+        }
+    ]
+
+    <COMPUTE> <STATISTICS> [ <FOR>
+        (
+           <COLUMNS> { columns = SimpleIdentifierCommaListWithPosition(); }
+        |
+           <ALL> <COLUMNS> { allColumns = true; }
+        )
+    ]
+
+    {
+        return new SqlAnalyzeTable(s.end(this), tableName, partitionSpec, columns, allColumns);
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionSpecProperty.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionSpecProperty.java
new file mode 100644
index 00000000000..23daa801ec0
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlPartitionSpecProperty.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Properties of PartitionSpec, a key-value pair with key as component identifier and value as
+ * string literal. Different from {@link SqlProperty}, {@link SqlPartitionSpecProperty} allows the
+ * value is null.
+ */
+public class SqlPartitionSpecProperty extends SqlCall {
+
+    /** Use this operator only if you don't have a better one. */
+    protected static final SqlOperator OPERATOR = new SqlSpecialOperator("Pair", SqlKind.OTHER);
+
+    private final SqlIdentifier key;
+    private final @Nullable SqlNode value;
+
+    public SqlPartitionSpecProperty(SqlIdentifier key, @Nullable SqlNode value, SqlParserPos pos) {
+        super(pos);
+        this.key = requireNonNull(key, "Pair key is missing");
+        this.value = value;
+    }
+
+    public SqlIdentifier getKey() {
+        return key;
+    }
+
+    @Nullable
+    public SqlNode getValue() {
+        return value;
+    }
+
+    public String getKeyString() {
+        return key.toString();
+    }
+
+    @Nullable
+    public String getValueString() {
+        return value != null ? ((NlsString) SqlLiteral.value(value)).getValue() : null;
+    }
+
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Override
+    public List<SqlNode> getOperandList() {
+        if (value != null) {
+            return ImmutableNullableList.of(key, value);
+        } else {
+            return ImmutableNullableList.of(key);
+        }
+    }
+
+    @Override
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        key.unparse(writer, leftPrec, rightPrec);
+        if (value != null) {
+            writer.keyword("=");
+            value.unparse(writer, leftPrec, rightPrec);
+        }
+    }
+}
+
+// End SqlPartitionSpecProperty.java
diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
new file mode 100644
index 00000000000..4d533fc3d80
--- /dev/null
+++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.SqlPartitionSpecProperty;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nonnull;
+
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** ANALYZE TABLE to compute the statistics for a given table. */
+public class SqlAnalyzeTable extends SqlCall {
+    public static final SqlSpecialOperator OPERATOR =
+            new SqlSpecialOperator("ANALYZE TABLE", SqlKind.OTHER_DDL);
+
+    private final SqlIdentifier tableName;
+    private final SqlNodeList partitions;
+    private final SqlNodeList columns;
+    private final boolean allColumns;
+
+    public SqlAnalyzeTable(
+            SqlParserPos pos,
+            SqlIdentifier tableName,
+            SqlNodeList partitions,
+            SqlNodeList columns,
+            boolean allColumns) {
+        super(pos);
+        this.tableName = requireNonNull(tableName, "tableName is null");
+        this.partitions = requireNonNull(partitions, "partitions is null");
+        this.columns = requireNonNull(columns, "columns is null");
+        this.allColumns = allColumns;
+    }
+
+    public String[] fullTableName() {
+        return tableName.names.toArray(new String[0]);
+    }
+
+    /**
+     * Get partition spec as key-value strings, if only partition key is given, the corresponding
+     * value is null.
+     */
+    public LinkedHashMap<String, String> getPartitions() {
+        LinkedHashMap<String, String> ret = new LinkedHashMap<>();
+        for (SqlNode node : partitions.getList()) {
+            SqlPartitionSpecProperty property = (SqlPartitionSpecProperty) node;
+            final String value;
+            if (property.getValue() == null) {
+                value = null;
+            } else {
+                Comparable<?> comparable = SqlLiteral.value(property.getValue());
+                value =
+                        comparable instanceof NlsString
+                                ? ((NlsString) comparable).getValue()
+                                : comparable.toString();
+            }
+
+            ret.put(property.getKey().getSimple(), value);
+        }
+        return ret;
+    }
+
+    public String[] getColumnNames() {
+        return columns.getList().stream()
+                .map(col -> ((SqlIdentifier) col).getSimple())
+                .toArray(String[]::new);
+    }
+
+    public boolean isAllColumns() {
+        return allColumns;
+    }
+
+    @Nonnull
+    @Override
+    public SqlOperator getOperator() {
+        return OPERATOR;
+    }
+
+    @Nonnull
+    @Override
+    public List<SqlNode> getOperandList() {
+        return ImmutableNullableList.of(tableName, partitions, columns);
+    }
+
+    public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+        writer.keyword("ANALYZE TABLE");
+        final int opLeft = getOperator().getLeftPrec();
+        final int opRight = getOperator().getRightPrec();
+        tableName.unparse(writer, opLeft, opRight);
+
+        if (partitions.size() > 0) {
+            writer.keyword("PARTITION");
+            partitions.unparse(writer, opLeft, opRight);
+        }
+
+        writer.keyword("COMPUTE STATISTICS");
+
+        if (allColumns) {
+            writer.keyword("FOR ALL COLUMNS");
+        } else if (columns.size() > 0) {
+            writer.keyword("FOR COLUMNS");
+            // use 0 to disable parentheses
+            columns.unparse(writer, 0, 0);
+        }
+    }
+}
diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 48a15aa75bd..3408ddf1443 100644
--- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1856,6 +1856,38 @@ class FlinkSqlParserImplTest extends SqlParserTest {
                         "TRY_CAST(`A` AS ROW(`F0` INTEGER ARRAY, `F1` MAP< STRING, DECIMAL(10, 2) >, `F2` STRING NOT NULL))");
     }
 
+    @Test
+    void testAnalyzeTable() {
+        sql("analyze table emp^s^").fails("(?s).*Encountered \"<EOF>\" at line 1, column 18.\n.*");
+        sql("analyze table emps compute statistics").ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS");
+        sql("analyze table emps partition ^compute^ statistics")
+                .fails("(?s).*Encountered \"compute\" at line 1, column 30.\n.*");
+        sql("analyze table emps partition(^)^ compute statistics")
+                .fails("(?s).*Encountered \"\\)\" at line 1, column 30.\n.*");
+        sql("analyze table emps partition(x='ab') compute statistics")
+                .ok("ANALYZE TABLE `EMPS` PARTITION (`X` = 'ab') COMPUTE STATISTICS");
+        sql("analyze table emps partition(x='ab', y='bc') compute statistics")
+                .ok("ANALYZE TABLE `EMPS` PARTITION (`X` = 'ab', `Y` = 'bc') COMPUTE STATISTICS");
+        sql("analyze table emps compute statistics for column^s^")
+                .fails("(?s).*Encountered \"<EOF>\" at line 1, column 49.\n.*");
+        sql("analyze table emps compute statistics for columns a")
+                .ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS FOR COLUMNS `A`");
+        sql("analyze table emps compute statistics for columns a, b")
+                .ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS FOR COLUMNS `A`, `B`");
+        sql("analyze table emps compute statistics for all columns")
+                .ok("ANALYZE TABLE `EMPS` COMPUTE STATISTICS FOR ALL COLUMNS");
+        sql("analyze table emps partition(x, y) compute statistics for all columns")
+                .ok("ANALYZE TABLE `EMPS` PARTITION (`X`, `Y`) COMPUTE STATISTICS FOR ALL COLUMNS");
+        sql("analyze table emps partition(x='ab', y) compute statistics for all columns")
+                .ok(
+                        "ANALYZE TABLE `EMPS` PARTITION (`X` = 'ab', `Y`) COMPUTE STATISTICS FOR ALL COLUMNS");
+        sql("analyze table emps partition(x, y='cd') compute statistics for all columns")
+                .ok(
+                        "ANALYZE TABLE `EMPS` PARTITION (`X`, `Y` = 'cd') COMPUTE STATISTICS FOR ALL COLUMNS");
+        sql("analyze table emps partition(x=^,^ y) compute statistics for all columns")
+                .fails("(?s).*Encountered \"\\,\" at line 1, column 32.\n.*");
+    }
+
     public static BaseMatcher<SqlNode> validated(String validatedSql) {
         return new TypeSafeDiagnosingMatcher<SqlNode>() {
             @Override