You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/07/11 08:24:00 UTC
[jira] [Commented] (FLINK-7146) FLINK SQLs support DDL
[ https://issues.apache.org/jira/browse/FLINK-7146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16081851#comment-16081851 ]
Fabian Hueske commented on FLINK-7146:
--------------------------------------
[~yuemeng] Can you split this issue into two subissues:
1. DDL for tables.
2. DDL to register user-defined functions.
DDL for tables will require a very good design to make it applicable for a wide variety of use cases:
- batch and streaming
- flat and nested data
- different encodings: JSON, CSV, Avro, Parquet, ORC, ...
- timestamp & watermark assignment strategies
- time attributes
- custom parameter for the source / sink system, e.g., Kafka properties
We are also currently discussing how to refactor the {{TableSource}} and {{TableSink}} (and related) interfaces to support these use cases.
> FLINK SQLs support DDL
> ----------------------
>
> Key: FLINK-7146
> URL: https://issues.apache.org/jira/browse/FLINK-7146
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: yuemeng
>
> For now,Flink SQL can't support DDL, we can only register a table by call registerTableInternal in TableEnvironment
> we should support DDL for sql such as create a table or create function like:
> {code}
> CREATE TABLE kafka_source (
> id INT,
> price INT
> ) PROPERTIES (
> category = 'source',
> type = 'kafka',
> version = '0.9.0.1',
> separator = ',',
> topic = 'test',
> brokers = 'xxxxxx:9092',
> group_id = 'test'
> );
> CREATE TABLE db_sink (
> id INT,
> price DOUBLE
> ) PROPERTIES (
> category = 'sink',
> type = 'mysql',
> table_name = 'udaf_test',
> url = 'jdbc:mysql://127.0.0.1:3308/ds?useUnicode=true&characterEncoding=UTF8',
> username = 'ds_dev',
> password = 's]k51_(>R'
> );
> CREATE TEMPORARY function 'AVGUDAF' AS 'com.xxxxx.server.codegen.aggregate.udaf.avg.IntegerAvgUDAF';
> INSERT INTO db_sink SELECT id ,AVGUDAF(price) FROM kafka_source group by id
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)