You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2016/06/03 14:55:59 UTC
[jira] [Commented] (FLINK-3872) Add Kafka TableSource with JSON
serialization
[ https://issues.apache.org/jira/browse/FLINK-3872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15314238#comment-15314238 ]
ASF GitHub Bot commented on FLINK-3872:
---------------------------------------
GitHub user uce opened a pull request:
https://github.com/apache/flink/pull/2069
[FLINK-3872] [table, connector-kafka] Add KafkaJsonTableSource
Adds `StreamTableSource` variants for Kafka with syntactic sugar for parsing JSON streams.
```java
KafkaJsonTableSource source = new Kafka08JsonTableSource(
topic,
props,
new String[] { "id" }, // field names
new Class<?>[] { Long.class }); // field types
tableEnvironment.registerTableSource("kafka-stream", source)
```
You can then continue to work with the stream:
```java
Table result = tableEnvironment.ingest("kafka-stream").filter("id > 1000");
tableEnvironment.toDataStream(result, Row.class).print();
```
**Limitations**
- Assumes flat JSON field access (we can easily extend this to use JSON pointers, allowing us to parse nested fields like `/location/area` as field names).
- This does not extract any timestamp or watermarks (not an issue right now as the Table API currently does not support operations where this is needed).
- API is kind of cumbersome and non Scalaesque for the Scala Table API.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/uce/flink 3872-kafkajson_table
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/2069.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #2069
----
commit 12ec6a594d23bd36bed1e07eeaba2aa75a768f67
Author: Ufuk Celebi <uc...@apache.org>
Date: 2016-06-02T20:38:23Z
[FLINK-3872] [table, connector-kafka] Add JsonRowDeserializationSchema
- Adds a deserialization schema from byte[] to Row to be used in conjunction
with the Table API.
commit a8dc3aa7ab70a91b12af2adccbbed821bf25ecc9
Author: Ufuk Celebi <uc...@apache.org>
Date: 2016-06-03T13:24:22Z
[FLINK-3872] [table, connector-kafka] Add KafkaTableSource
----
> Add Kafka TableSource with JSON serialization
> ---------------------------------------------
>
> Key: FLINK-3872
> URL: https://issues.apache.org/jira/browse/FLINK-3872
> Project: Flink
> Issue Type: New Feature
> Components: Table API
> Reporter: Fabian Hueske
> Assignee: Ufuk Celebi
> Fix For: 1.1.0
>
>
> Add a Kafka TableSource which reads JSON serialized data.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)