You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/06/03 14:55:59 UTC

[jira] [Commented] (FLINK-3872) Add Kafka TableSource with JSON serialization

    [ https://issues.apache.org/jira/browse/FLINK-3872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314238#comment-15314238 ] 

ASF GitHub Bot commented on FLINK-3872:
---------------------------------------

GitHub user uce opened a pull request:

    https://github.com/apache/flink/pull/2069

    [FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource

    Adds `StreamTableSource` variants for Kafka with syntactic sugar for parsing JSON streams.
    
    ```java
    KafkaJsonTableSource source = new Kafka08JsonTableSource(
        topic,
        props,
        new String[] { "id" }, // field names
        new Class<?>[] { Long.class }); // field types
    
    tableEnvironment.registerTableSource("kafka-stream", source)
    ```
    
    You can then continue to work with the stream:
    
    ```java
    Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000");
    tableEnvironment.toDataStream(result, Row.class).print();
    ```
    
    **Limitations**
    - Assumes flat JSON field access (we can easily extend this to use JSON pointers, allowing us to parse nested fields like `/location/area` as field names).
    - This does not extract any timestamp or watermarks (not an issue right now as the Table API currently does not support operations where this is needed).
    - API is kind of cumbersome and non Scalaesque for the Scala Table API.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/uce/flink 3872-kafkajson_table

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2069.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2069
    
----
commit 12ec6a594d23bd36bed1e07eeaba2aa75a768f67
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-06-02T20:38:23Z

    [FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema
    
    - Adds a deserialization schema from byte[] to Row to be used in conjunction
      with the Table API.

commit a8dc3aa7ab70a91b12af2adccbbed821bf25ecc9
Author: Ufuk Celebi <uc...@apache.org>
Date:   2016-06-03T13:24:22Z

    [FLINK-3872] [table, connector-kafka] Add KafkaTableSource

----


> Add Kafka TableSource with JSON serialization
> ---------------------------------------------
>
>                 Key: FLINK-3872
>                 URL: https://issues.apache.org/jira/browse/FLINK-3872
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API
>            Reporter: Fabian Hueske
>            Assignee: Ufuk Celebi
>             Fix For: 1.1.0
>
>
> Add a Kafka TableSource which reads JSON serialized data.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)