You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Tzu-Li (Gordon) Tai" <tz...@apache.org> on 2020/06/17 04:45:11 UTC

Re: Any python example with json data from Kafka using flink-statefun

(forwarding this to user@ as it is more suited to be located there)

Hi Sunil,

With remote functions (using the Python SDK), messages sent to / from them
must be Protobuf messages.
This is a requirement since remote functions can be written in any
language, and we use Protobuf as a means for cross-language messaging.
If you are defining Kafka ingresses in a remote module (via textual YAML
module configs), then records in the Kafka ingress will be directly routed
to the remote functions, and therefore they are required to be Protobuf
messages as well.

With embedded functions (using the current Java SDK), then what you are
trying to do is possible.
When using the Java SDK, the Kafka ingress allows providing a
`KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
into any type you intend for messaging within the StateFun application. So
there, you can convert your JSON records.

If you want to still write your main application logic in Python, but the
input and output messages in Kafka are required to be JSON,
what you can currently do is have a mix of remote module [2] containing the
application logic as Python functions,
and a separate embedded module [3] containing the Java Kafka ingress and
egresses.
So, concretely, your 2 modules will contain:

Remote module:
- Your Python functions implementing the main business logic.

Embedded module:
- Java Kafka ingress with deserializer that converts JSON to Protobuf
messages. Here you have the freedom to extract only the fields that you
need.
- A Java router [4] that routes those converted messages to the remote
functions, by their logical address
- A Java Kafka egress with serializer that converts Protobuf messages from
remote functions into JSON Kafka records.
- A Java function that simply forwards input messages to the Kafka Kafka
egress. If the remote functions need to write JSON messages to Kafka, they
send a Protobuf message to this function.


Hope this helps.
Note that the egress side of things can definitely be easier (without the
extra forwarding through a Java function) if the Python SDK's
`kafka_egress_record` method allows supplying arbitrary bytes.
Then you would be able to already write to Kafka JSON messages in the
Python functions.
This however isn't supported yet, but technically it is quite easy to
achieve. I've just filed a issue for this [5], in case you'd like to follow
that.

Cheers,
Gordon

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
[2]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module

[3]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
[4]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
[5] https://issues.apache.org/jira/browse/FLINK-18340

On Wed, Jun 17, 2020 at 9:25 AM Sunil <su...@gmail.com> wrote:

> checking to see if this is possible currently.
> Read json data from kafka topic => process using statefun => write out to
> kafka in json format.
>
> I could have a separate process to read the source json data convert to
> protobuf into another kafka topic but it sounds in-efficient.
> e.g.
> Read json data from kafka topic =>convert json to protobuf =>  process
> using statefun => write out to kafka in protobuf format.=> convert protobuf
> to json message
>
> Appreciate any advice on how to process json messages using statefun ,
> also if this is not possible in the current python sdk, can i do that using
> the java/scala sdk?
>
> Thanks.
>
> On 2020/06/15 15:34:39, Sunil Sattiraju <su...@gmail.com> wrote:
> > Thanks Igal,
> > I dont have control over the data source inside kafka ( current kafka
> topic contains either json or avro formats only, i am trying to reproduce
> this scenario using my test data generator ).
> >
> > is it possible to convert the json to proto at the receiving end of
> statefun applicaiton?
> >
> > On 2020/06/15 14:51:01, Igal Shilman <ig...@ververica.com> wrote:
> > > Hi,
> > >
> > > The values must be valid encoded Protobuf messages [1], while in your
> > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > You can take a look at this example with a generator that produces
> Protobuf
> > > messages [2][3]
> > >
> > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > [2]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > [3]
> > >
> https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > >
> > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <
> sunilsattiraju@gmail.com>
> > > wrote:
> > >
> > > > Hi, Based on the example from
> > > >
> https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > > > I am trying to ingest json data in kafka, but unable to achieve
> based on
> > > > the examples.
> > > >
> > > > event-generator.py
> > > >
> > > > def produce():
> > > >     request = {}
> > > >     request['id'] = "abc-123"
> > > >     request['field1'] = "field1-1"
> > > >     request['field2'] = "field2-2"
> > > >     request['field3'] = "field3-3"
> > > >     if len(sys.argv) == 2:
> > > >         delay_seconds = int(sys.argv[1])
> > > >     else:
> > > >         delay_seconds = 1
> > > >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> > > >     for request in random_requests_dict():
> > > >         producer.send(topic='test-topic',
> > > >                       value=json.dumps(request).encode('utf-8'))
> > > >         producer.flush()
> > > >         time.sleep(delay_seconds)
> > > >
> > > > Below is the proto definition of the json data ( i dont always know
> all
> > > > the fields, but i know id fields definitely exists)
> > > > message.proto
> > > >
> > > > message MyRow {
> > > >     string id = 1;
> > > >     google.protobuf.Struct message = 2;
> > > > }
> > > >
> > > > Below is greeter that received the data
> > > > tokenizer.py ( same like greeter.py saving state of id instead of
> counting
> > > > )
> > > >
> > > >
> > > > @app.route('/statefun', methods=['POST'])
> > > > def handle():
> > > >     my_row = MyRow()
> > > >     data = my_row.ParseFromString(request.data) // Is this the right
> way
> > > > to do it?
> > > >     response_data = handler(request.data)
> > > >     response = make_response(response_data)
> > > >     response.headers.set('Content-Type', 'application/octet-stream')
> > > >     return response
> > > >
> > > >
> > > > but, below is the error message. I am a newbie with proto and
> appreciate
> > > > any help
> > > >
> > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > > > Traceback (most recent call last):
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 2447,
> > > > in wsgi_app
> > > >     response = self.full_dispatch_request()
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1952,
> > > > in full_dispatch_request
> > > >     rv = self.handle_user_exception(e)
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1821,
> > > > in handle_user_exception
> > > >     reraise(exc_type, exc_value, tb)
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py",
> line 39,
> > > > in reraise
> > > >     raise value
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1950,
> > > > in full_dispatch_request
> > > >     rv = self.dispatch_request()
> > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> 1936,
> > > > in dispatch_request
> > > >     return self.view_functions[rule.endpoint](**req.view_args)
> > > >   File "/app/tokenizer.py", line 101, in handle
> > > >     response_data = handler(data)
> > > >   File
> "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > > > line 38, in __call__
> > > >     request.ParseFromString(request_bytes)
> > > >   File
> > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py",
> line
> > > > 199, in ParseFromString
> > > >     return self.MergeFromString(serialized)
> > > >   File
> > > >
> "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > > > line 1131, in MergeFromString
> > > >     serialized = memoryview(serialized)
> > > > TypeError: memoryview: a bytes-like object is required, not 'int'
> > > >
> > > >
> > >
> >
>

Re: Any python example with json data from Kafka using flink-statefun

Posted by Sunil <su...@gmail.com>.
Thanks Gordon.
Really appreciate your detailed response and this definitely helps.


On 2020/06/17 04:45:11, "Tzu-Li (Gordon) Tai" <tz...@apache.org> wrote: 
> (forwarding this to user@ as it is more suited to be located there)
> 
> Hi Sunil,
> 
> With remote functions (using the Python SDK), messages sent to / from them
> must be Protobuf messages.
> This is a requirement since remote functions can be written in any
> language, and we use Protobuf as a means for cross-language messaging.
> If you are defining Kafka ingresses in a remote module (via textual YAML
> module configs), then records in the Kafka ingress will be directly routed
> to the remote functions, and therefore they are required to be Protobuf
> messages as well.
> 
> With embedded functions (using the current Java SDK), then what you are
> trying to do is possible.
> When using the Java SDK, the Kafka ingress allows providing a
> `KafkaIngressDeserializer` [1], where you can convert the bytes in Kafka
> into any type you intend for messaging within the StateFun application. So
> there, you can convert your JSON records.
> 
> If you want to still write your main application logic in Python, but the
> input and output messages in Kafka are required to be JSON,
> what you can currently do is have a mix of remote module [2] containing the
> application logic as Python functions,
> and a separate embedded module [3] containing the Java Kafka ingress and
> egresses.
> So, concretely, your 2 modules will contain:
> 
> Remote module:
> - Your Python functions implementing the main business logic.
> 
> Embedded module:
> - Java Kafka ingress with deserializer that converts JSON to Protobuf
> messages. Here you have the freedom to extract only the fields that you
> need.
> - A Java router [4] that routes those converted messages to the remote
> functions, by their logical address
> - A Java Kafka egress with serializer that converts Protobuf messages from
> remote functions into JSON Kafka records.
> - A Java function that simply forwards input messages to the Kafka Kafka
> egress. If the remote functions need to write JSON messages to Kafka, they
> send a Protobuf message to this function.
> 
> 
> Hope this helps.
> Note that the egress side of things can definitely be easier (without the
> extra forwarding through a Java function) if the Python SDK's
> `kafka_egress_record` method allows supplying arbitrary bytes.
> Then you would be able to already write to Kafka JSON messages in the
> Python functions.
> This however isn't supported yet, but technically it is quite easy to
> achieve. I've just filed a issue for this [5], in case you'd like to follow
> that.
> 
> Cheers,
> Gordon
> 
> [1]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/apache-kafka.html#kafka-deserializer
> [2]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#remote-module
> 
> [3]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/sdk/modules.html#embedded-module
> [4]
> https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/io-module/index.html#router
> [5] https://issues.apache.org/jira/browse/FLINK-18340
> 
> On Wed, Jun 17, 2020 at 9:25 AM Sunil <su...@gmail.com> wrote:
> 
> > checking to see if this is possible currently.
> > Read json data from kafka topic => process using statefun => write out to
> > kafka in json format.
> >
> > I could have a separate process to read the source json data convert to
> > protobuf into another kafka topic but it sounds in-efficient.
> > e.g.
> > Read json data from kafka topic =>convert json to protobuf =>  process
> > using statefun => write out to kafka in protobuf format.=> convert protobuf
> > to json message
> >
> > Appreciate any advice on how to process json messages using statefun ,
> > also if this is not possible in the current python sdk, can i do that using
> > the java/scala sdk?
> >
> > Thanks.
> >
> > On 2020/06/15 15:34:39, Sunil Sattiraju <su...@gmail.com> wrote:
> > > Thanks Igal,
> > > I dont have control over the data source inside kafka ( current kafka
> > topic contains either json or avro formats only, i am trying to reproduce
> > this scenario using my test data generator ).
> > >
> > > is it possible to convert the json to proto at the receiving end of
> > statefun applicaiton?
> > >
> > > On 2020/06/15 14:51:01, Igal Shilman <ig...@ververica.com> wrote:
> > > > Hi,
> > > >
> > > > The values must be valid encoded Protobuf messages [1], while in your
> > > > attached code snippet you are sending utf-8 encoded JSON strings.
> > > > You can take a look at this example with a generator that produces
> > Protobuf
> > > > messages [2][3]
> > > >
> > > > [1] https://developers.google.com/protocol-buffers/docs/pythontutorial
> > > > [2]
> > > >
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/generator/event-generator.py#L37
> > > > [3]
> > > >
> > https://github.com/apache/flink-statefun/blob/8376afa6b064bfa2374eefbda5e149cd490985c0/statefun-examples/statefun-python-greeter-example/greeter/messages.proto#L25
> > > >
> > > > On Mon, Jun 15, 2020 at 4:25 PM Sunil Sattiraju <
> > sunilsattiraju@gmail.com>
> > > > wrote:
> > > >
> > > > > Hi, Based on the example from
> > > > >
> > https://github.com/apache/flink-statefun/tree/master/statefun-examples/statefun-python-greeter-example
> > > > > I am trying to ingest json data in kafka, but unable to achieve
> > based on
> > > > > the examples.
> > > > >
> > > > > event-generator.py
> > > > >
> > > > > def produce():
> > > > >     request = {}
> > > > >     request['id'] = "abc-123"
> > > > >     request['field1'] = "field1-1"
> > > > >     request['field2'] = "field2-2"
> > > > >     request['field3'] = "field3-3"
> > > > >     if len(sys.argv) == 2:
> > > > >         delay_seconds = int(sys.argv[1])
> > > > >     else:
> > > > >         delay_seconds = 1
> > > > >     producer = KafkaProducer(bootstrap_servers=[KAFKA_BROKER])
> > > > >     for request in random_requests_dict():
> > > > >         producer.send(topic='test-topic',
> > > > >                       value=json.dumps(request).encode('utf-8'))
> > > > >         producer.flush()
> > > > >         time.sleep(delay_seconds)
> > > > >
> > > > > Below is the proto definition of the json data ( i dont always know
> > all
> > > > > the fields, but i know id fields definitely exists)
> > > > > message.proto
> > > > >
> > > > > message MyRow {
> > > > >     string id = 1;
> > > > >     google.protobuf.Struct message = 2;
> > > > > }
> > > > >
> > > > > Below is greeter that received the data
> > > > > tokenizer.py ( same like greeter.py saving state of id instead of
> > counting
> > > > > )
> > > > >
> > > > >
> > > > > @app.route('/statefun', methods=['POST'])
> > > > > def handle():
> > > > >     my_row = MyRow()
> > > > >     data = my_row.ParseFromString(request.data) // Is this the right
> > way
> > > > > to do it?
> > > > >     response_data = handler(request.data)
> > > > >     response = make_response(response_data)
> > > > >     response.headers.set('Content-Type', 'application/octet-stream')
> > > > >     return response
> > > > >
> > > > >
> > > > > but, below is the error message. I am a newbie with proto and
> > appreciate
> > > > > any help
> > > > >
> > > > > 11:55:17,996 tokenizer ERROR Exception on /statefun [POST]
> > > > > Traceback (most recent call last):
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 2447,
> > > > > in wsgi_app
> > > > >     response = self.full_dispatch_request()
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1952,
> > > > > in full_dispatch_request
> > > > >     rv = self.handle_user_exception(e)
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1821,
> > > > > in handle_user_exception
> > > > >     reraise(exc_type, exc_value, tb)
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/_compat.py",
> > line 39,
> > > > > in reraise
> > > > >     raise value
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1950,
> > > > > in full_dispatch_request
> > > > >     rv = self.dispatch_request()
> > > > >   File "/usr/local/lib/python3.8/site-packages/flask/app.py", line
> > 1936,
> > > > > in dispatch_request
> > > > >     return self.view_functions[rule.endpoint](**req.view_args)
> > > > >   File "/app/tokenizer.py", line 101, in handle
> > > > >     response_data = handler(data)
> > > > >   File
> > "/usr/local/lib/python3.8/site-packages/statefun/request_reply.py",
> > > > > line 38, in __call__
> > > > >     request.ParseFromString(request_bytes)
> > > > >   File
> > > > > "/usr/local/lib/python3.8/site-packages/google/protobuf/message.py",
> > line
> > > > > 199, in ParseFromString
> > > > >     return self.MergeFromString(serialized)
> > > > >   File
> > > > >
> > "/usr/local/lib/python3.8/site-packages/google/protobuf/internal/python_message.py",
> > > > > line 1131, in MergeFromString
> > > > >     serialized = memoryview(serialized)
> > > > > TypeError: memoryview: a bytes-like object is required, not 'int'
> > > > >
> > > > >
> > > >
> > >
> >
>