You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/07/03 07:26:00 UTC

[jira] [Commented] (FLINK-8866) Create unified interfaces to configure and instatiate TableSinks

    [ https://issues.apache.org/jira/browse/FLINK-8866?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16530939#comment-16530939 ] 

ASF GitHub Bot commented on FLINK-8866:
---------------------------------------

Github user suez1224 commented on the issue:

    https://github.com/apache/flink/pull/6201
  
    @fhueske @twalthr thanks for the comments. In `from-source`, the only system i know of is Kafka10 or Kafka11, which support writing record along with timestamp. To support `from-source` in table sink, I think we can do the following:
    1) add a connector property, e.g. connector.support-timestamp. Only if connector.support-timestamp is true, we will allow the sink table schema to contain a field with rowtime type `from-source`. Otherwise, an exception will be thrown.
    2) if the condition in 1) is satisfied, we will create corresponding rowtime field in the sink table schema with type LONG, in TableEnvironment.insertInto(), we will validate the sink schema against the insertion source. Also, in the TableSink.emitDataStream() implementation, we will need to insert an timestamp assigner operator to set StreamRecord.timestamp (should we reuse existing interface, or create a new timestampInserter interface?) and remove the extra rowtime field from StreamRecord.value before we emit the datastream to the sink. (for kafkaTableSink, we will also need to invoke setWriteTimestampToKafka(true))
    
    Please correct me if I missed something here. What do you think?


> Create unified interfaces to configure and instatiate TableSinks
> ----------------------------------------------------------------
>
>                 Key: FLINK-8866
>                 URL: https://issues.apache.org/jira/browse/FLINK-8866
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: Shuyi Chen
>            Priority: Major
>              Labels: pull-request-available
>
> Similar to the efforts done in FLINK-8240. We need unified ways to configure and instantiate TableSinks. Among other applications, this is necessary in order to declare table sinks in an environment file of the SQL client. Such that the sink can be used for {{INSERT INTO}} statements.
> Below are a few major changes in mind. 
> 1) Add TableSinkFactory/TableSinkFactoryService similar to TableSourceFactory/TableSourceFactoryService
> 2) Add a common property called "type" with values (source, sink and both) for both TableSource and TableSink.
> 3) in yaml file, replace "sources" with "tables", and use tableType to identify whether it's source or sink.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)