You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Tzu-Li (Gordon) Tai (Jira)" <ji...@apache.org> on 2020/02/26 16:28:01 UTC
[jira] [Closed] (FLINK-16123) Add routable Kafka connector
[ https://issues.apache.org/jira/browse/FLINK-16123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tzu-Li (Gordon) Tai closed FLINK-16123.
---------------------------------------
Fix Version/s: statefun-1.1
Resolution: Fixed
Merged to master via da2b09da7b984e3fec92c672998cf623ce63b9ad
> Add routable Kafka connector
> ----------------------------
>
> Key: FLINK-16123
> URL: https://issues.apache.org/jira/browse/FLINK-16123
> Project: Flink
> Issue Type: Task
> Components: Stateful Functions
> Reporter: Igal Shilman
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Major
> Labels: pull-request-available
> Fix For: statefun-1.1
>
> Time Spent: 0.5h
> Remaining Estimate: 0h
>
> In some cases it is beneficial to associate a stateful function instance with a key in a Kafka topic.
> In that case, a simplified Kafka ingress definition can be introduced.
> Consider the following example:
> Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings representing user ids,
> and the values are Protobuf messages of type (2) com.user.foo.bar.greeter.SingupMessage.
> We would like to have a stateful function of type(3)
> {code:java}
> FunctionType( com.user.foo.bar, SingupProcessor{code}
> to be invoked for each incoming signup message.
> The following spec definition:
> {code:java}
>
> - ingress:
> meta:
> type: org.apache.flink.statefun.sdk.kafka/routable-kafka-connector
> id: com.user.foo.bar/greeter
> spec:
> properties:
> - consumer.group: greeter
> topics:
> - singups: (1)
> typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage"
> target: (3) "com.user.foo.bar/SingupProcessor"
> {code}
> Defines a Kafka ingress that consumes <utf8 strings, bytes > from a singups topic,
> and produces an Routable Protobuf message with the following type and properties:
> {code}
> message Routable {
> Address target; (1)
> Any payload;
> }
> {code}
> Where:
> (1) is Address(FunctionType(com.user.foo.bar, SingupProcessor), <a consumer record's key>)
> (2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the value bytes
> would come directly from the consumer record value bytes
> This would require an additional AutoRoutable router,
> that basically forwards the payload to the target address.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)