You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Timothy Bess <td...@gmail.com> on 2021/04/16 21:48:22 UTC

Flink Statefun Python Batch

Hi everyone,

Is there a good way to access the batch of leads that Statefun sends to the
Python SDK rather than processing events one by one? We're trying to run
our data scientist's machine learning model through the SDK, but the code
is very slow when we do single events and we don't get many of the benefits
of Pandas/etc.

Thanks,

Tim

Re: Flink Statefun Python Batch

Posted by Igal Shilman <ig...@ververica.com>.
Hi Tim,

I've created a tiny PoC, let me know if this helps,
I can't guarantee tho, that this is how we'll eventually approach this, but
it should be somewhere along these lines.

https://github.com/igalshilman/flink-statefun/tree/tim

Thanks,
Igal.


On Thu, Apr 22, 2021 at 6:53 AM Timothy Bess <td...@gmail.com> wrote:

> Hi Igal and Konstantin,
>
> Wow! I appreciate the offer of creating a branch to test with, but for now
> we were able to get it working by tuning a few configs and moving other
> blocking IO out of statefun, so no rush there. That said if you do add
> that, I'd definitely switch over.
>
> That's great! I'll try to think up some suggestions to put into those
> tickets. Yeah I'd be up for a call on Thursday or Friday If you're free
> then, just let me know (my timezone is EDT).
>
> Thanks,
>
> Tim
>
> On Wed, Apr 21, 2021, 4:18 AM Konstantin Knauf <kn...@gmail.com>
> wrote:
>
>> Hi Igal, Hi Timothy,
>>
>> this sounds very interesting. Both state introspection as well as
>> OpenTracing support have been requested by multiple users before, so
>> certainly something we are willing to invest into. Timothy, would you have
>> time for a 30min call in the next days to understand your use case and
>> requirements better? In the meantime, let's document these feature requests
>> in Jira.
>>
>> * Exposing Batches to SDKs:
>> https://issues.apache.org/jira/browse/FLINK-22389
>> * Support for OpenTracing:
>> https://issues.apache.org/jira/browse/FLINK-22390
>> * Support for State Introspection:
>> https://issues.apache.org/jira/browse/FLINK-22391
>>
>> Please feel free to edit, comment on these issues directly, too.
>>
>> Cheers,
>>
>> Konstantin
>>
>>
>>
>> Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <
>> igal@ververica.com>:
>>
>>> Hi Tim,
>>>
>>> Yes, I think that this feature can be implemented relatively fast.
>>> If this blocks you at the moment, I can prepare a branch for you to
>>> experiment with, in the following days.
>>>
>>> Regarding to open tracing integration, I think the community can benefit
>>> a lot out of this,
>>> and definitely contributions are welcome!
>>>
>>> @Konstantin Knauf <kn...@apache.org> would you like to understand more
>>> in depth, Tim's use case with opentracing?
>>>
>>> Thanks,
>>> Igal.
>>>
>>>
>>>
>>> On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <td...@gmail.com> wrote:
>>>
>>>> Hi Igal,
>>>>
>>>> Yes! that's exactly what I was thinking. The batching will naturally
>>>> happen as the model applies backpressure. We're using pandas and it's
>>>> pretty costly to create a dataframe and everything to process a single
>>>> event. Internally the SDK has access to the batch and is calling my
>>>> function, which creates a dataframe for each individual event. This causes
>>>> a ton of overhead since we basically get destroyed by the constant factors
>>>> around creating and operating on dataframes.
>>>>
>>>> Knowing how the SDK works, it seems like it'd be easy to do something
>>>> like your example and maybe have a different decorator for "batch
>>>> functions" where the SDK just passes in everything at once.
>>>>
>>>> Also just out of curiosity are there plans to build out more
>>>> introspection into statefun's flink state? I was thinking it would be super
>>>> useful to add either Queryable state or have some control topic that
>>>> statefun listens to that allows me to send events to introspect or modify
>>>> flink state.
>>>>
>>>> For example like:
>>>>
>>>> // control topic request
>>>> {"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
>>>> // response
>>>> {"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }
>>>>
>>>> Or
>>>>
>>>> {"type": "SetState", "namespace": "foo", "type": "bar", "id": "1",
>>>> value: "base64bytes"}
>>>> {"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}
>>>>
>>>> Also having opentracing integration where Statefun passes b3 headers
>>>> with each request so we can trace a message's route through statefun would
>>>> be _super_ useful. We'd literally be able to see the entire path of an
>>>> event from ingress to egress and time spent in each function. Not sure if
>>>> there are any plans around that, but since we're live with a statefun
>>>> project now, it's possible we could contribute some if you guys are open to
>>>> it.
>>>>
>>>> Thanks,
>>>>
>>>> Tim
>>>>
>>>> On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <ig...@ververica.com>
>>>> wrote:
>>>>
>>>>> Hi Tim!
>>>>>
>>>>> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
>>>>> batching, that kicks in the presence of a slow
>>>>> /congested remote function. Keep in mind that under normal
>>>>> circumstances batching does not happen (effectively a batch of size 1 will
>>>>> be sent). [1]
>>>>> This batch is not currently exposed via the SDKs (both Java and
>>>>> Python) as it is an implementation detail (see [2]).
>>>>>
>>>>> The way I understand your message (please correct me if I'm wrong): is
>>>>> that evaluation of the ML model is costly, and it would benefit from some
>>>>> sort of batching (like pandas do i assume ?)
>>>>> instead of being applied for every event individually.
>>>>> If this is the case, perhaps exposing this batch can be a useful
>>>>> feature to add.
>>>>>
>>>>> For example:
>>>>>
>>>>> @functions.bind_tim(..)
>>>>> def ml(context, messages: typing.List[Message]):
>>>>>   ...
>>>>>
>>>>>
>>>>>
>>>>> Let me know what you think,
>>>>> Igal.
>>>>>
>>>>>
>>>>>
>>>>> [1]
>>>>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
>>>>> [2]
>>>>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219
>>>>>
>>>>> On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <td...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> Is there a good way to access the batch of leads that Statefun sends
>>>>>> to the Python SDK rather than processing events one by one? We're trying to
>>>>>> run our data scientist's machine learning model through the SDK, but the
>>>>>> code is very slow when we do single events and we don't get many of the
>>>>>> benefits of Pandas/etc.
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Tim
>>>>>>
>>>>>
>>
>> --
>> *Konstantin Knauf*
>> Schneckenburgerstr. 21
>> 81675 München
>> Germany
>> Mobil +49 174 3413182
>> knauf.konstantin@gmail.com
>>
>

Re: Flink Statefun Python Batch

Posted by Timothy Bess <td...@gmail.com>.
Hi Igal and Konstantin,

Wow! I appreciate the offer of creating a branch to test with, but for now
we were able to get it working by tuning a few configs and moving other
blocking IO out of statefun, so no rush there. That said if you do add
that, I'd definitely switch over.

That's great! I'll try to think up some suggestions to put into those
tickets. Yeah I'd be up for a call on Thursday or Friday If you're free
then, just let me know (my timezone is EDT).

Thanks,

Tim

On Wed, Apr 21, 2021, 4:18 AM Konstantin Knauf <kn...@gmail.com>
wrote:

> Hi Igal, Hi Timothy,
>
> this sounds very interesting. Both state introspection as well as
> OpenTracing support have been requested by multiple users before, so
> certainly something we are willing to invest into. Timothy, would you have
> time for a 30min call in the next days to understand your use case and
> requirements better? In the meantime, let's document these feature requests
> in Jira.
>
> * Exposing Batches to SDKs:
> https://issues.apache.org/jira/browse/FLINK-22389
> * Support for OpenTracing:
> https://issues.apache.org/jira/browse/FLINK-22390
> * Support for State Introspection:
> https://issues.apache.org/jira/browse/FLINK-22391
>
> Please feel free to edit, comment on these issues directly, too.
>
> Cheers,
>
> Konstantin
>
>
>
> Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <
> igal@ververica.com>:
>
>> Hi Tim,
>>
>> Yes, I think that this feature can be implemented relatively fast.
>> If this blocks you at the moment, I can prepare a branch for you to
>> experiment with, in the following days.
>>
>> Regarding to open tracing integration, I think the community can benefit
>> a lot out of this,
>> and definitely contributions are welcome!
>>
>> @Konstantin Knauf <kn...@apache.org> would you like to understand more
>> in depth, Tim's use case with opentracing?
>>
>> Thanks,
>> Igal.
>>
>>
>>
>> On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <td...@gmail.com> wrote:
>>
>>> Hi Igal,
>>>
>>> Yes! that's exactly what I was thinking. The batching will naturally
>>> happen as the model applies backpressure. We're using pandas and it's
>>> pretty costly to create a dataframe and everything to process a single
>>> event. Internally the SDK has access to the batch and is calling my
>>> function, which creates a dataframe for each individual event. This causes
>>> a ton of overhead since we basically get destroyed by the constant factors
>>> around creating and operating on dataframes.
>>>
>>> Knowing how the SDK works, it seems like it'd be easy to do something
>>> like your example and maybe have a different decorator for "batch
>>> functions" where the SDK just passes in everything at once.
>>>
>>> Also just out of curiosity are there plans to build out more
>>> introspection into statefun's flink state? I was thinking it would be super
>>> useful to add either Queryable state or have some control topic that
>>> statefun listens to that allows me to send events to introspect or modify
>>> flink state.
>>>
>>> For example like:
>>>
>>> // control topic request
>>> {"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
>>> // response
>>> {"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }
>>>
>>> Or
>>>
>>> {"type": "SetState", "namespace": "foo", "type": "bar", "id": "1",
>>> value: "base64bytes"}
>>> {"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}
>>>
>>> Also having opentracing integration where Statefun passes b3 headers
>>> with each request so we can trace a message's route through statefun would
>>> be _super_ useful. We'd literally be able to see the entire path of an
>>> event from ingress to egress and time spent in each function. Not sure if
>>> there are any plans around that, but since we're live with a statefun
>>> project now, it's possible we could contribute some if you guys are open to
>>> it.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>> On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <ig...@ververica.com> wrote:
>>>
>>>> Hi Tim!
>>>>
>>>> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
>>>> batching, that kicks in the presence of a slow
>>>> /congested remote function. Keep in mind that under normal
>>>> circumstances batching does not happen (effectively a batch of size 1 will
>>>> be sent). [1]
>>>> This batch is not currently exposed via the SDKs (both Java and Python)
>>>> as it is an implementation detail (see [2]).
>>>>
>>>> The way I understand your message (please correct me if I'm wrong): is
>>>> that evaluation of the ML model is costly, and it would benefit from some
>>>> sort of batching (like pandas do i assume ?)
>>>> instead of being applied for every event individually.
>>>> If this is the case, perhaps exposing this batch can be a useful
>>>> feature to add.
>>>>
>>>> For example:
>>>>
>>>> @functions.bind_tim(..)
>>>> def ml(context, messages: typing.List[Message]):
>>>>   ...
>>>>
>>>>
>>>>
>>>> Let me know what you think,
>>>> Igal.
>>>>
>>>>
>>>>
>>>> [1]
>>>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
>>>> [2]
>>>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219
>>>>
>>>> On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <td...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> Is there a good way to access the batch of leads that Statefun sends
>>>>> to the Python SDK rather than processing events one by one? We're trying to
>>>>> run our data scientist's machine learning model through the SDK, but the
>>>>> code is very slow when we do single events and we don't get many of the
>>>>> benefits of Pandas/etc.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Tim
>>>>>
>>>>
>
> --
> *Konstantin Knauf*
> Schneckenburgerstr. 21
> 81675 München
> Germany
> Mobil +49 174 3413182
> knauf.konstantin@gmail.com
>

Re: Flink Statefun Python Batch

Posted by Konstantin Knauf <kn...@gmail.com>.
Hi Igal, Hi Timothy,

this sounds very interesting. Both state introspection as well as
OpenTracing support have been requested by multiple users before, so
certainly something we are willing to invest into. Timothy, would you have
time for a 30min call in the next days to understand your use case and
requirements better? In the meantime, let's document these feature requests
in Jira.

* Exposing Batches to SDKs:
https://issues.apache.org/jira/browse/FLINK-22389
* Support for OpenTracing: https://issues.apache.org/jira/browse/FLINK-22390
* Support for State Introspection:
https://issues.apache.org/jira/browse/FLINK-22391

Please feel free to edit, comment on these issues directly, too.

Cheers,

Konstantin



Am Mi., 21. Apr. 2021 um 09:15 Uhr schrieb Igal Shilman <igal@ververica.com
>:

> Hi Tim,
>
> Yes, I think that this feature can be implemented relatively fast.
> If this blocks you at the moment, I can prepare a branch for you to
> experiment with, in the following days.
>
> Regarding to open tracing integration, I think the community can benefit a
> lot out of this,
> and definitely contributions are welcome!
>
> @Konstantin Knauf <kn...@apache.org> would you like to understand more
> in depth, Tim's use case with opentracing?
>
> Thanks,
> Igal.
>
>
>
> On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <td...@gmail.com> wrote:
>
>> Hi Igal,
>>
>> Yes! that's exactly what I was thinking. The batching will naturally
>> happen as the model applies backpressure. We're using pandas and it's
>> pretty costly to create a dataframe and everything to process a single
>> event. Internally the SDK has access to the batch and is calling my
>> function, which creates a dataframe for each individual event. This causes
>> a ton of overhead since we basically get destroyed by the constant factors
>> around creating and operating on dataframes.
>>
>> Knowing how the SDK works, it seems like it'd be easy to do something
>> like your example and maybe have a different decorator for "batch
>> functions" where the SDK just passes in everything at once.
>>
>> Also just out of curiosity are there plans to build out more
>> introspection into statefun's flink state? I was thinking it would be super
>> useful to add either Queryable state or have some control topic that
>> statefun listens to that allows me to send events to introspect or modify
>> flink state.
>>
>> For example like:
>>
>> // control topic request
>> {"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
>> // response
>> {"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }
>>
>> Or
>>
>> {"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value:
>> "base64bytes"}
>> {"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}
>>
>> Also having opentracing integration where Statefun passes b3 headers with
>> each request so we can trace a message's route through statefun would be
>> _super_ useful. We'd literally be able to see the entire path of an event
>> from ingress to egress and time spent in each function. Not sure if there
>> are any plans around that, but since we're live with a statefun project
>> now, it's possible we could contribute some if you guys are open to it.
>>
>> Thanks,
>>
>> Tim
>>
>> On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <ig...@ververica.com> wrote:
>>
>>> Hi Tim!
>>>
>>> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
>>> batching, that kicks in the presence of a slow
>>> /congested remote function. Keep in mind that under normal circumstances
>>> batching does not happen (effectively a batch of size 1 will be sent). [1]
>>> This batch is not currently exposed via the SDKs (both Java and Python)
>>> as it is an implementation detail (see [2]).
>>>
>>> The way I understand your message (please correct me if I'm wrong): is
>>> that evaluation of the ML model is costly, and it would benefit from some
>>> sort of batching (like pandas do i assume ?)
>>> instead of being applied for every event individually.
>>> If this is the case, perhaps exposing this batch can be a useful feature
>>> to add.
>>>
>>> For example:
>>>
>>> @functions.bind_tim(..)
>>> def ml(context, messages: typing.List[Message]):
>>>   ...
>>>
>>>
>>>
>>> Let me know what you think,
>>> Igal.
>>>
>>>
>>>
>>> [1]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
>>> [2]
>>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219
>>>
>>> On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <td...@gmail.com>
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> Is there a good way to access the batch of leads that Statefun sends to
>>>> the Python SDK rather than processing events one by one? We're trying to
>>>> run our data scientist's machine learning model through the SDK, but the
>>>> code is very slow when we do single events and we don't get many of the
>>>> benefits of Pandas/etc.
>>>>
>>>> Thanks,
>>>>
>>>> Tim
>>>>
>>>

-- 
*Konstantin Knauf*
Schneckenburgerstr. 21
81675 München
Germany
Mobil +49 174 3413182
knauf.konstantin@gmail.com

Re: Flink Statefun Python Batch

Posted by Igal Shilman <ig...@ververica.com>.
Hi Tim,

Yes, I think that this feature can be implemented relatively fast.
If this blocks you at the moment, I can prepare a branch for you to
experiment with, in the following days.

Regarding to open tracing integration, I think the community can benefit a
lot out of this,
and definitely contributions are welcome!

@Konstantin Knauf <kn...@apache.org> would you like to understand more in
depth, Tim's use case with opentracing?

Thanks,
Igal.



On Tue, Apr 20, 2021 at 8:10 PM Timothy Bess <td...@gmail.com> wrote:

> Hi Igal,
>
> Yes! that's exactly what I was thinking. The batching will naturally
> happen as the model applies backpressure. We're using pandas and it's
> pretty costly to create a dataframe and everything to process a single
> event. Internally the SDK has access to the batch and is calling my
> function, which creates a dataframe for each individual event. This causes
> a ton of overhead since we basically get destroyed by the constant factors
> around creating and operating on dataframes.
>
> Knowing how the SDK works, it seems like it'd be easy to do something like
> your example and maybe have a different decorator for "batch functions"
> where the SDK just passes in everything at once.
>
> Also just out of curiosity are there plans to build out more introspection
> into statefun's flink state? I was thinking it would be super useful to add
> either Queryable state or have some control topic that statefun listens to
> that allows me to send events to introspect or modify flink state.
>
> For example like:
>
> // control topic request
> {"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
> // response
> {"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }
>
> Or
>
> {"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value:
> "base64bytes"}
> {"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}
>
> Also having opentracing integration where Statefun passes b3 headers with
> each request so we can trace a message's route through statefun would be
> _super_ useful. We'd literally be able to see the entire path of an event
> from ingress to egress and time spent in each function. Not sure if there
> are any plans around that, but since we're live with a statefun project
> now, it's possible we could contribute some if you guys are open to it.
>
> Thanks,
>
> Tim
>
> On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <ig...@ververica.com> wrote:
>
>> Hi Tim!
>>
>> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
>> batching, that kicks in the presence of a slow
>> /congested remote function. Keep in mind that under normal circumstances
>> batching does not happen (effectively a batch of size 1 will be sent). [1]
>> This batch is not currently exposed via the SDKs (both Java and Python)
>> as it is an implementation detail (see [2]).
>>
>> The way I understand your message (please correct me if I'm wrong): is
>> that evaluation of the ML model is costly, and it would benefit from some
>> sort of batching (like pandas do i assume ?)
>> instead of being applied for every event individually.
>> If this is the case, perhaps exposing this batch can be a useful feature
>> to add.
>>
>> For example:
>>
>> @functions.bind_tim(..)
>> def ml(context, messages: typing.List[Message]):
>>   ...
>>
>>
>>
>> Let me know what you think,
>> Igal.
>>
>>
>>
>> [1]
>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
>> [2]
>> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219
>>
>> On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <td...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> Is there a good way to access the batch of leads that Statefun sends to
>>> the Python SDK rather than processing events one by one? We're trying to
>>> run our data scientist's machine learning model through the SDK, but the
>>> code is very slow when we do single events and we don't get many of the
>>> benefits of Pandas/etc.
>>>
>>> Thanks,
>>>
>>> Tim
>>>
>>

Re: Flink Statefun Python Batch

Posted by Timothy Bess <td...@gmail.com>.
Hi Igal,

Yes! that's exactly what I was thinking. The batching will naturally happen
as the model applies backpressure. We're using pandas and it's pretty
costly to create a dataframe and everything to process a single event.
Internally the SDK has access to the batch and is calling my function,
which creates a dataframe for each individual event. This causes a ton of
overhead since we basically get destroyed by the constant factors around
creating and operating on dataframes.

Knowing how the SDK works, it seems like it'd be easy to do something like
your example and maybe have a different decorator for "batch functions"
where the SDK just passes in everything at once.

Also just out of curiosity are there plans to build out more introspection
into statefun's flink state? I was thinking it would be super useful to add
either Queryable state or have some control topic that statefun listens to
that allows me to send events to introspect or modify flink state.

For example like:

// control topic request
{"type": "FunctionIdsReq", "namespace": "foo", "type": "bar"}
// response
{"type": "FunctionIdsResp", "ids": [ "1", "2", "3", ... ] }

Or

{"type": "SetState", "namespace": "foo", "type": "bar", "id": "1", value:
"base64bytes"}
{"type": "DeleteState", "namespace": "foo", "type": "bar", "id": "1"}

Also having opentracing integration where Statefun passes b3 headers with
each request so we can trace a message's route through statefun would be
_super_ useful. We'd literally be able to see the entire path of an event
from ingress to egress and time spent in each function. Not sure if there
are any plans around that, but since we're live with a statefun project
now, it's possible we could contribute some if you guys are open to it.

Thanks,

Tim

On Tue, Apr 20, 2021 at 9:25 AM Igal Shilman <ig...@ververica.com> wrote:

> Hi Tim!
>
> Indeed the StateFun SDK / StateFun runtime, has an internal concept of
> batching, that kicks in the presence of a slow
> /congested remote function. Keep in mind that under normal circumstances
> batching does not happen (effectively a batch of size 1 will be sent). [1]
> This batch is not currently exposed via the SDKs (both Java and Python) as
> it is an implementation detail (see [2]).
>
> The way I understand your message (please correct me if I'm wrong): is
> that evaluation of the ML model is costly, and it would benefit from some
> sort of batching (like pandas do i assume ?)
> instead of being applied for every event individually.
> If this is the case, perhaps exposing this batch can be a useful feature
> to add.
>
> For example:
>
> @functions.bind_tim(..)
> def ml(context, messages: typing.List[Message]):
>   ...
>
>
>
> Let me know what you think,
> Igal.
>
>
>
> [1]
> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
> [2]
> https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219
>
> On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <td...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> Is there a good way to access the batch of leads that Statefun sends to
>> the Python SDK rather than processing events one by one? We're trying to
>> run our data scientist's machine learning model through the SDK, but the
>> code is very slow when we do single events and we don't get many of the
>> benefits of Pandas/etc.
>>
>> Thanks,
>>
>> Tim
>>
>

Re: Flink Statefun Python Batch

Posted by Igal Shilman <ig...@ververica.com>.
Hi Tim!

Indeed the StateFun SDK / StateFun runtime, has an internal concept of
batching, that kicks in the presence of a slow
/congested remote function. Keep in mind that under normal circumstances
batching does not happen (effectively a batch of size 1 will be sent). [1]
This batch is not currently exposed via the SDKs (both Java and Python) as
it is an implementation detail (see [2]).

The way I understand your message (please correct me if I'm wrong): is that
evaluation of the ML model is costly, and it would benefit from some sort
of batching (like pandas do i assume ?)
instead of being applied for every event individually.
If this is the case, perhaps exposing this batch can be a useful feature to
add.

For example:

@functions.bind_tim(..)
def ml(context, messages: typing.List[Message]):
  ...



Let me know what you think,
Igal.



[1]
https://github.com/apache/flink-statefun/blob/master/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto#L80
[2]
https://github.com/apache/flink-statefun/blob/master/statefun-sdk-python/statefun/request_reply_v3.py#L219

On Fri, Apr 16, 2021 at 11:48 PM Timothy Bess <td...@gmail.com> wrote:

> Hi everyone,
>
> Is there a good way to access the batch of leads that Statefun sends to
> the Python SDK rather than processing events one by one? We're trying to
> run our data scientist's machine learning model through the SDK, but the
> code is very slow when we do single events and we don't get many of the
> benefits of Pandas/etc.
>
> Thanks,
>
> Tim
>