You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Tomas Mazukna <to...@gmail.com> on 2017/10/30 18:11:50 UTC

Batch job per stream message?

Trying to figure out the best design in Flink.
Reading from a kafka topic which has messages with pointers to files to be
processed.
I am thinking to somehow kick off a batch job per file... unless there is
an easy way to get a separate dataset per file.
I can do almost all of this in the stream, parse file with flat map ->
explode its contents into multiple data elements -> map, etc...
On of these steps would be to grab another dataset from JDBC source and
join with the stream's contents...
I think I am mixing the two concepts here and the right approach would be
to kick of this mini batch job per file,
where I have file datase t+ jdbc dataset to join with.

So how would I go about kicking a batch from from streaming job?

Thanks,
Tomas

Re: Batch job per stream message?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Tomas,

I'm not familiar with the details of the AsyncFunction, but I'd interpret
this as follows:

- you can make one async call in the asyncInvoke method.
- this call will result in a callback and from that one callback you can
emit a single result by calling AsyncCollector.collect()

The asyncInvoke method is called once per event in the stream, so each
stream event results in one async call and one result.
It's kind of like a MapFunction that talks to an external service.

So if you need to make multiple calls per event, you need multiple
AsyncFunctions.

Best, Fabian

2017-11-01 16:12 GMT+01:00 Tomas Mazukna <to...@gmail.com>:

> Hi Fabian,
>
> thanks for pointing me in the right direction....
> reading through the documentation here:
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/
> dev/stream/asyncio.html
>
> seems like I can accomplish what I need with async call to a rest service
> or jdbc query per stream item being processed.
> The only confusion for is this statement:
>
> The AsyncCollector is completed with the first call of
> AsyncCollector.collect. All subsequent collect calls will be ignored.
>
> so basically there has to be an accumulator implemented inside
> AsyncFunction to gather up all results and return them in a single
> .collect() call.
> but how to know when to do so? or I am completely off track here....
>
>
>
> On Wed, 1 Nov 2017 at 03:57 Fabian Hueske <fh...@gmail.com> wrote:
>
>> Hi Tomas,
>>
>> triggering a batch DataSet job from a DataStream program for each input
>> record doesn't sound like a good idea to me.
>> You would have to make sure that the cluster always has sufficient
>> resources and handle failures.
>>
>> It would be preferable to have all data processing in a DataStream job.
>> You mentioned that the challenge is to join the data of the files with a
>> JDBC database.
>> I see two ways to do that in a DataStream program:
>> - replicate the JDBC table in a stateful operator. This means that you
>> have to publish updates on the database to the Flink program.
>> - query the JDBC table with an AsyncFunction. This operator concurrently
>> executes multiple calls to an external service which improves latency and
>> throughput. The operator ensures that checkpoints and watermarks are
>> correctly handled.
>>
>> Best, Fabian
>>
>> 2017-10-30 19:11 GMT+01:00 Tomas Mazukna <to...@gmail.com>:
>>
>>> Trying to figure out the best design in Flink.
>>> Reading from a kafka topic which has messages with pointers to files to
>>> be processed.
>>> I am thinking to somehow kick off a batch job per file... unless there
>>> is an easy way to get a separate dataset per file.
>>> I can do almost all of this in the stream, parse file with flat map ->
>>> explode its contents into multiple data elements -> map, etc...
>>> On of these steps would be to grab another dataset from JDBC source and
>>> join with the stream's contents...
>>> I think I am mixing the two concepts here and the right approach would
>>> be to kick of this mini batch job per file,
>>> where I have file datase t+ jdbc dataset to join with.
>>>
>>> So how would I go about kicking a batch from from streaming job?
>>>
>>> Thanks,
>>> Tomas
>>>
>>
>>

Re: Batch job per stream message?

Posted by Tomas Mazukna <to...@gmail.com>.
Hi Fabian,

thanks for pointing me in the right direction....
reading through the documentation here:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

seems like I can accomplish what I need with async call to a rest service
or jdbc query per stream item being processed.
The only confusion for is this statement:

The AsyncCollector is completed with the first call of
AsyncCollector.collect. All subsequent collect calls will be ignored.

so basically there has to be an accumulator implemented inside
AsyncFunction to gather up all results and return them in a single
.collect() call.
but how to know when to do so? or I am completely off track here....



On Wed, 1 Nov 2017 at 03:57 Fabian Hueske <fh...@gmail.com> wrote:

> Hi Tomas,
>
> triggering a batch DataSet job from a DataStream program for each input
> record doesn't sound like a good idea to me.
> You would have to make sure that the cluster always has sufficient
> resources and handle failures.
>
> It would be preferable to have all data processing in a DataStream job.
> You mentioned that the challenge is to join the data of the files with a
> JDBC database.
> I see two ways to do that in a DataStream program:
> - replicate the JDBC table in a stateful operator. This means that you
> have to publish updates on the database to the Flink program.
> - query the JDBC table with an AsyncFunction. This operator concurrently
> executes multiple calls to an external service which improves latency and
> throughput. The operator ensures that checkpoints and watermarks are
> correctly handled.
>
> Best, Fabian
>
> 2017-10-30 19:11 GMT+01:00 Tomas Mazukna <to...@gmail.com>:
>
>> Trying to figure out the best design in Flink.
>> Reading from a kafka topic which has messages with pointers to files to
>> be processed.
>> I am thinking to somehow kick off a batch job per file... unless there is
>> an easy way to get a separate dataset per file.
>> I can do almost all of this in the stream, parse file with flat map ->
>> explode its contents into multiple data elements -> map, etc...
>> On of these steps would be to grab another dataset from JDBC source and
>> join with the stream's contents...
>> I think I am mixing the two concepts here and the right approach would be
>> to kick of this mini batch job per file,
>> where I have file datase t+ jdbc dataset to join with.
>>
>> So how would I go about kicking a batch from from streaming job?
>>
>> Thanks,
>> Tomas
>>
>
>

Re: Batch job per stream message?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Tomas,

triggering a batch DataSet job from a DataStream program for each input
record doesn't sound like a good idea to me.
You would have to make sure that the cluster always has sufficient
resources and handle failures.

It would be preferable to have all data processing in a DataStream job. You
mentioned that the challenge is to join the data of the files with a JDBC
database.
I see two ways to do that in a DataStream program:
- replicate the JDBC table in a stateful operator. This means that you have
to publish updates on the database to the Flink program.
- query the JDBC table with an AsyncFunction. This operator concurrently
executes multiple calls to an external service which improves latency and
throughput. The operator ensures that checkpoints and watermarks are
correctly handled.

Best, Fabian

2017-10-30 19:11 GMT+01:00 Tomas Mazukna <to...@gmail.com>:

> Trying to figure out the best design in Flink.
> Reading from a kafka topic which has messages with pointers to files to be
> processed.
> I am thinking to somehow kick off a batch job per file... unless there is
> an easy way to get a separate dataset per file.
> I can do almost all of this in the stream, parse file with flat map ->
> explode its contents into multiple data elements -> map, etc...
> On of these steps would be to grab another dataset from JDBC source and
> join with the stream's contents...
> I think I am mixing the two concepts here and the right approach would be
> to kick of this mini batch job per file,
> where I have file datase t+ jdbc dataset to join with.
>
> So how would I go about kicking a batch from from streaming job?
>
> Thanks,
> Tomas
>