You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (Jira)" <ji...@apache.org> on 2020/02/26 16:29:00 UTC

[jira] [Reopened] (FLINK-16123) Add routable Kafka connector

     [ https://issues.apache.org/jira/browse/FLINK-16123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Tzu-Li (Gordon) Tai reopened FLINK-16123:
-----------------------------------------

Reopening. This should not be closed until the PR for adding the E2E (https://github.com/apache/flink-statefun/pull/37) is merged as well.

> Add routable Kafka connector
> ----------------------------
>
>                 Key: FLINK-16123
>                 URL: https://issues.apache.org/jira/browse/FLINK-16123
>             Project: Flink
>          Issue Type: Task
>          Components: Stateful Functions
>            Reporter: Igal Shilman
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: statefun-1.1
>
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> In some cases it is beneficial to associate a stateful function instance with a key in a Kafka topic.
> In that case, a simplified Kafka ingress definition can be introduced. 
> Consider the following example:
> Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings representing user ids,
> and the values are Protobuf messages of type (2) com.user.foo.bar.greeter.SingupMessage.
> We would like to have a stateful function of type(3)
> {code:java}
> FunctionType( com.user.foo.bar, SingupProcessor{code}
> to be invoked for each incoming signup message.
> The following spec definition:
> {code:java}
>     
>   - ingress:
>           meta:
>             type:  org.apache.flink.statefun.sdk.kafka/routable-kafka-connector
>             id: com.user.foo.bar/greeter
>           spec:
>             properties:
>               - consumer.group: greeter
>             topics: 
>               - singups: (1)
>                   typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage"
>                   target: (3) "com.user.foo.bar/SingupProcessor"
> {code}
> Defines a Kafka ingress that consumes  <utf8 strings, bytes > from a singups topic,
> and produces an Routable Protobuf message with the following type and properties:
> {code}
> message Routable {
>        Address target; (1)
>        Any payload;
> }
> {code}
> Where:
> (1) is Address(FunctionType(com.user.foo.bar, SingupProcessor),  <a consumer record's key>)
> (2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the value bytes
> would come directly from the consumer record value bytes
> This would require an additional AutoRoutable router,
> that basically forwards the payload to the target address.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)