You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Igal Shilman (Jira)" <ji...@apache.org> on 2020/02/17 16:37:00 UTC

[jira] [Created] (FLINK-16123) Add routable Kafka connector

Igal Shilman created FLINK-16123:
------------------------------------

             Summary: Add routable Kafka connector
                 Key: FLINK-16123
                 URL: https://issues.apache.org/jira/browse/FLINK-16123
             Project: Flink
          Issue Type: Task
          Components: Stateful Functions
            Reporter: Igal Shilman


In some cases it is beneficial to associate a stateful function instance with a key in a Kafka topic.

In that case, a simplified Kafka ingress definition can be introduced. 

Consider the following example:

Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings representing user ids,

and the values are Protobuf messages of type (2) com.user.foo.bar.greeter.SingupMessage.

We would like to have a stateful function of type(3)
{code:java}
FunctionType( com.user.foo.bar, SingupProcessor{code}
to be invoked for each incoming signup message.

The following spec definition:
{code:java}
    
  - ingress:
          meta:
            type:  org.apache.flink.statefun.sdk.kafka/routable-kafka-connector
            id: com.user.foo.bar/greeter
          spec:
            properties:
              - consumer.group: greeter
            topics: 
              - singups: (1)
                  typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage"
                  target: (3) "com.user.foo.bar/SingupProcessor"

{code}


Defines a Kafka ingress that consumes  <utf8 strings, bytes > from a singups topic,
and produces an Routable Protobuf message with the following type and properties:

{code}
message Routable {
       Address target; (1)
       Any payload;
}
{code}
Where:
(1) is Address(FunctionType(com.user.foo.bar, SingupProcessor),  <a consumer record's key>)
(2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the value bytes
would come directly from the consumer record value bytes

This would require an additional AutoRoutable router,
that basically forwards the payload to the target address.

 




--
This message was sent by Atlassian Jira
(v8.3.4#803005)