You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Yun Gao (Jira)" <ji...@apache.org> on 2022/04/13 06:24:01 UTC

[jira] [Updated] (FLINK-21634) ALTER TABLE statement enhancement

     [ https://issues.apache.org/jira/browse/FLINK-21634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Yun Gao updated FLINK-21634:
----------------------------
    Fix Version/s: 1.16.0
                       (was: 1.15.0)

> ALTER TABLE statement enhancement
> ---------------------------------
>
>                 Key: FLINK-21634
>                 URL: https://issues.apache.org/jira/browse/FLINK-21634
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table SQL / API, Table SQL / Client
>            Reporter: Jark Wu
>            Assignee: Jark Wu
>            Priority: Major
>              Labels: auto-unassigned, stale-assigned
>             Fix For: 1.16.0
>
>
> We already introduced ALTER TABLE statement in FLIP-69 [1], but only support to rename table name and change table options. One useful feature of ALTER TABLE statement is modifying schema. This is also heavily required by integration with data lakes (e.g. iceberg).
> Therefore, I propose to support following ALTER TABLE statements (except {{SET}} and {{{}RENAME TO{}}}, others are all new introduced syntax):
> {code:sql}
> ALTER TABLE table_name {
>     ADD { <schema_component> | (<schema_component> [, ...]) }
>   | MODIFY { <schema_component> | (<schema_component> [, ...]) }
>   | DROP {column_name | (column_name, column_name, ....) | PRIMARY KEY | CONSTRAINT constraint_name | WATERMARK}
>   | RENAME old_column_name TO new_column_name
>   | RENAME TO new_table_name
>   | SET (key1=val1, ...)
>   | RESET (key1, ...)
> }
> <schema_component>::
>   { <column_component> | <constraint_component> | <watermark_component> }
> <column_component>::
>   column_name <column_definition> [FIRST | AFTER column_name]
> <constraint_component>::
>   [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
> <watermark_component>::
>   WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
> <column_definition>::
>   { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> } [COMMENT column_comment]
> <physical_column_definition>::
>   column_type
> <metadata_column_definition>::
>   column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
> <computed_column_definition>::
>   AS computed_column_expression
> {code}
> And some examples:
> {code:sql}
> -- add a new column 
> ALTER TABLE mytable ADD new_column STRING COMMENT 'new_column docs';
> -- add columns, constraint, and watermark
> ALTER TABLE mytable ADD (
>     log_ts STRING COMMENT 'log timestamp string' FIRST,
>     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
>     PRIMARY KEY (id) NOT ENFORCED,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> );
> -- modify a column type
> ALTER TABLE prod.db.sample MODIFY measurement double COMMENT 'unit is bytes per second' AFTER `id`;
> -- modify definition of column log_ts and ts, primary key, watermark. they must exist in table schema
> ALTER TABLE mytable ADD (
>     log_ts STRING COMMENT 'log timestamp string' AFTER `id`,  -- reoder columns
>     ts AS TO_TIMESTAMP(log_ts) AFTER log_ts,
>     PRIMARY KEY (id) NOT ENFORCED,
>     WATERMARK FOR ts AS ts - INTERVAL '3' SECOND
> );
> -- drop an old column
> ALTER TABLE prod.db.sample DROP measurement;
> -- drop columns
> ALTER TABLE prod.db.sample DROP (col1, col2, col3);
> -- drop a watermark
> ALTER TABLE prod.db.sample DROP WATERMARK;
> -- rename column name
> ALTER TABLE prod.db.sample RENAME `data` TO payload;
> -- rename table name
> ALTER TABLE mytable RENAME TO mytable2;
> -- set options
> ALTER TABLE kafka_table SET (
>     'scan.startup.mode' = 'specific-offsets', 
>     'scan.startup.specific-offsets' = 'partition:0,offset:42'
> );
> -- reset options
> ALTER TABLE kafka_table RESET ('scan.startup.mode', 'scan.startup.specific-offsets');
> {code}
> Note: we don't need to introduce new interfaces, because all the alter table operation will be forward to catalog through {{{}Catalog#alterTable(tablePath, newTable, ignoreIfNotExists){}}}.
> [1]: [https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/sql/alter/#alter-table]
> [2]: [http://iceberg.apache.org/spark-ddl/#alter-table-alter-column]
> [3]: [https://trino.io/docs/current/sql/alter-table.html]
> [4]: [https://dev.mysql.com/doc/refman/8.0/en/alter-table.html]
> [5]: [https://www.postgresql.org/docs/9.1/sql-altertable.html]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)