You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Colin Williams <co...@gmail.com> on 2017/12/09 02:06:49 UTC

Parallelizing a tumbling group window

Hello,

I've inherited some flink application code.

We're currently creating a table using a Tumbling SQL query similar to the
first example in

 https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.
html#group-windows

Where each generated SQL query looks something like

SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)

We are also using a UDFAGG function in some of the queries which I think
might be cleaned up and optimized a bit (using scala types and possibly not
well implemented)

We then turn the result table back into a datastream using toAppendStream,
and eventually add a derivative stream to a sink. We've configured
TimeCharacteristic to event-time processing.

In some streaming scenarios everything is working fine with a parallelism
of 1, but in others it appears that we can't keep up with the event source.

Then we are investigating how to enable parallelism specifically on the SQL
table query or aggregator.

Can anyone suggest a good way to go about this? It wasn't clear from the
documentation.

Best,

Colin Williams

Re: Parallelizing a tumbling group window

Posted by Colin Williams <co...@gmail.com>.
Thanks for the reply. Unfortunately that project was unexpectedly cancelled
but for other reasons. I was happy to work on it, and hopefully gained some
insight. I have another question today unrelated towards Elasticsearch
sinks, and will ask there.

On Fri, Jan 5, 2018 at 2:52 AM, Fabian Hueske <fh...@gmail.com> wrote:

> Hi Colin,
>
> There are two things that come to my mind:
>
> 1) You mentioned "suspect jobs are grouping by a field of constant
> values". Does that mean that the grouping key is always constant? Flink
> parallelizes the window computation per key, i.e., there is one thread per
> key. Although it would be possible to perform pre-aggregations, this is not
> done yet. There is an effort to add support for this to the DataStream API
> [1]. The Table API will hopefully leverage this once it has been added to
> the DataStream API.
> 2) Another reason for backpressure can be non-aligned watermarks, i.e.,
> the watermarks of different partitions diverge too much from each other. In
> this case, windows cannot be finalized because everything is aligned to the
> lowest watermark.
>
> Hope this helps to clarify things.
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-7561
>
> 2017-12-30 0:11 GMT+01:00 Colin Williams <colin.williams.seattle@gmail.com
> >:
>
>> Hi Timo and flink-user,
>>
>>
>> It's been a few weeks and we've made some changes to the application
>> mentioned on this email. we've also updated for flink 1.4 . We are using
>> the SQL / Table API with a tumbling window and user defined agg to generate
>> a SQL query string like:
>>
>>
>> SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).
>>
>>
>>
>> I've experimented with parallelism of the operators and setting the
>> environments parallelism as suggested. I've been setting parallelism values
>> of 2 or 4 to all operators except the consumer and sink.
>>
>>
>> For some jobs with large kafka source topics, under load we experience
>> back pressure and see some lag. But when trying to address via parallelism:
>> so far I've only seen very degraded performance from the increased
>> parallelism settings.
>>
>>
>> Furthermore, the suspect jobs are grouping by a field of constant values.
>> Then these jobs usually have 40,000 or so grouped records enter the
>> aggregator for each minute window.
>>
>>
>>
>> I would think that the tumbling windows would allow the job to process
>> each window in another task slot, parallelizing each window. But maybe
>> that's not happening?
>>
>>
>>
>> Can you help us to understand why parallelizing the job only has a
>> degraded impact on performance and what I can do to change this?
>>
>>
>>
>>
>> Happy New Year!
>>
>>
>>
>> Colin Williams
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <tw...@apache.org> wrote:
>>
>>> Hi Colin,
>>>
>>> unfortunately, selecting the parallelism for parts of a SQL query is not
>>> supported yet. By default, tumbling window operators use the default
>>> parallelism of the environment. Simple project and select operations have
>>> the same parallelism as the inputs they are applied on.
>>>
>>> I think the easiest solution so far is to explicilty set the parallelism
>>> of operators that are not part of the Table API and use the environment's
>>> parallelism to scale the SQL query.
>>>
>>> I hope that helps.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>>>
>>> Hello,
>>>
>>> I've inherited some flink application code.
>>>
>>> We're currently creating a table using a Tumbling SQL query similar to
>>> the first example in
>>>
>>>  https://ci.apache.org/projects/flink/flink-docs-release-1.3
>>> /dev/table/sql.html#group-windows
>>>
>>> Where each generated SQL query looks something like
>>>
>>> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
>>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>>>
>>> We are also using a UDFAGG function in some of the queries which I think
>>> might be cleaned up and optimized a bit (using scala types and possibly not
>>> well implemented)
>>>
>>> We then turn the result table back into a datastream using
>>> toAppendStream, and eventually add a derivative stream to a sink. We've
>>> configured TimeCharacteristic to event-time processing.
>>>
>>> In some streaming scenarios everything is working fine with a
>>> parallelism of 1, but in others it appears that we can't keep up with the
>>> event source.
>>>
>>> Then we are investigating how to enable parallelism specifically on the
>>> SQL table query or aggregator.
>>>
>>> Can anyone suggest a good way to go about this? It wasn't clear from the
>>> documentation.
>>>
>>> Best,
>>>
>>> Colin Williams
>>>
>>>
>>>
>>>
>>>
>>
>

Re: Parallelizing a tumbling group window

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Colin,

There are two things that come to my mind:

1) You mentioned "suspect jobs are grouping by a field of constant values".
Does that mean that the grouping key is always constant? Flink parallelizes
the window computation per key, i.e., there is one thread per key. Although
it would be possible to perform pre-aggregations, this is not done yet.
There is an effort to add support for this to the DataStream API [1]. The
Table API will hopefully leverage this once it has been added to the
DataStream API.
2) Another reason for backpressure can be non-aligned watermarks, i.e., the
watermarks of different partitions diverge too much from each other. In
this case, windows cannot be finalized because everything is aligned to the
lowest watermark.

Hope this helps to clarify things.

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-7561

2017-12-30 0:11 GMT+01:00 Colin Williams <co...@gmail.com>:

> Hi Timo and flink-user,
>
>
> It's been a few weeks and we've made some changes to the application
> mentioned on this email. we've also updated for flink 1.4 . We are using
> the SQL / Table API with a tumbling window and user defined agg to generate
> a SQL query string like:
>
>
> SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).
>
>
>
> I've experimented with parallelism of the operators and setting the
> environments parallelism as suggested. I've been setting parallelism values
> of 2 or 4 to all operators except the consumer and sink.
>
>
> For some jobs with large kafka source topics, under load we experience
> back pressure and see some lag. But when trying to address via parallelism:
> so far I've only seen very degraded performance from the increased
> parallelism settings.
>
>
> Furthermore, the suspect jobs are grouping by a field of constant values.
> Then these jobs usually have 40,000 or so grouped records enter the
> aggregator for each minute window.
>
>
>
> I would think that the tumbling windows would allow the job to process
> each window in another task slot, parallelizing each window. But maybe
> that's not happening?
>
>
>
> Can you help us to understand why parallelizing the job only has a
> degraded impact on performance and what I can do to change this?
>
>
>
>
> Happy New Year!
>
>
>
> Colin Williams
>
>
>
>
>
>
>
>
>
>
> On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <tw...@apache.org> wrote:
>
>> Hi Colin,
>>
>> unfortunately, selecting the parallelism for parts of a SQL query is not
>> supported yet. By default, tumbling window operators use the default
>> parallelism of the environment. Simple project and select operations have
>> the same parallelism as the inputs they are applied on.
>>
>> I think the easiest solution so far is to explicilty set the parallelism
>> of operators that are not part of the Table API and use the environment's
>> parallelism to scale the SQL query.
>>
>> I hope that helps.
>>
>> Regards,
>> Timo
>>
>>
>> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>>
>> Hello,
>>
>> I've inherited some flink application code.
>>
>> We're currently creating a table using a Tumbling SQL query similar to
>> the first example in
>>
>>  https://ci.apache.org/projects/flink/flink-docs-release-1.3
>> /dev/table/sql.html#group-windows
>>
>> Where each generated SQL query looks something like
>>
>> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
>> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
>> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>>
>> We are also using a UDFAGG function in some of the queries which I think
>> might be cleaned up and optimized a bit (using scala types and possibly not
>> well implemented)
>>
>> We then turn the result table back into a datastream using
>> toAppendStream, and eventually add a derivative stream to a sink. We've
>> configured TimeCharacteristic to event-time processing.
>>
>> In some streaming scenarios everything is working fine with a parallelism
>> of 1, but in others it appears that we can't keep up with the event source.
>>
>> Then we are investigating how to enable parallelism specifically on the
>> SQL table query or aggregator.
>>
>> Can anyone suggest a good way to go about this? It wasn't clear from the
>> documentation.
>>
>> Best,
>>
>> Colin Williams
>>
>>
>>
>>
>>
>

Re: Parallelizing a tumbling group window

Posted by Colin Williams <co...@gmail.com>.
Hi Timo and flink-user,


It's been a few weeks and we've made some changes to the application
mentioned on this email. we've also updated for flink 1.4 . We are using
the SQL / Table API with a tumbling window and user defined agg to generate
a SQL query string like:


SELECT measurement, `tag_AppId`(tagset), UDFAGG(`field_Adds`(fieldset)),
TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE).



I've experimented with parallelism of the operators and setting the
environments parallelism as suggested. I've been setting parallelism values
of 2 or 4 to all operators except the consumer and sink.


For some jobs with large kafka source topics, under load we experience back
pressure and see some lag. But when trying to address via parallelism: so
far I've only seen very degraded performance from the increased parallelism
settings.


Furthermore, the suspect jobs are grouping by a field of constant values.
Then these jobs usually have 40,000 or so grouped records enter the
aggregator for each minute window.



I would think that the tumbling windows would allow the job to process each
window in another task slot, parallelizing each window. But maybe that's
not happening?



Can you help us to understand why parallelizing the job only has a degraded
impact on performance and what I can do to change this?




Happy New Year!



Colin Williams










On Mon, Dec 11, 2017 at 4:15 AM, Timo Walther <tw...@apache.org> wrote:

> Hi Colin,
>
> unfortunately, selecting the parallelism for parts of a SQL query is not
> supported yet. By default, tumbling window operators use the default
> parallelism of the environment. Simple project and select operations have
> the same parallelism as the inputs they are applied on.
>
> I think the easiest solution so far is to explicilty set the parallelism
> of operators that are not part of the Table API and use the environment's
> parallelism to scale the SQL query.
>
> I hope that helps.
>
> Regards,
> Timo
>
>
> Am 12/9/17 um 3:06 AM schrieb Colin Williams:
>
> Hello,
>
> I've inherited some flink application code.
>
> We're currently creating a table using a Tumbling SQL query similar to the
> first example in
>
>  https://ci.apache.org/projects/flink/flink-docs-release-1.
> 3/dev/table/sql.html#group-windows
>
> Where each generated SQL query looks something like
>
> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)),
> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY
> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>
> We are also using a UDFAGG function in some of the queries which I think
> might be cleaned up and optimized a bit (using scala types and possibly not
> well implemented)
>
> We then turn the result table back into a datastream using toAppendStream,
> and eventually add a derivative stream to a sink. We've configured
> TimeCharacteristic to event-time processing.
>
> In some streaming scenarios everything is working fine with a parallelism
> of 1, but in others it appears that we can't keep up with the event source.
>
> Then we are investigating how to enable parallelism specifically on the
> SQL table query or aggregator.
>
> Can anyone suggest a good way to go about this? It wasn't clear from the
> documentation.
>
> Best,
>
> Colin Williams
>
>
>
>
>

Re: Parallelizing a tumbling group window

Posted by Timo Walther <tw...@apache.org>.
Hi Colin,

unfortunately, selecting the parallelism for parts of a SQL query is not 
supported yet. By default, tumbling window operators use the default 
parallelism of the environment. Simple project and select operations 
have the same parallelism as the inputs they are applied on.

I think the easiest solution so far is to explicilty set the parallelism 
of operators that are not part of the Table API and use the 
environment's parallelism to scale the SQL query.

I hope that helps.

Regards,
Timo


Am 12/9/17 um 3:06 AM schrieb Colin Williams:
> Hello,
>
> I've inherited some flink application code.
>
> We're currently creating a table using a Tumbling SQL query similar to 
> the first example in
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#group-windows 
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/sql.html#group-windows>
>
> Where each generated SQL query looks something like
>
> SELECT measurement, `tag_AppId`(tagset), P99(`field_Adds`(fieldset)), 
> TUMBLE_START(rowtime, INTERVAL '10' MINUTE) FROM LINES GROUP BY 
> measurement, `tag_AppId`(tagset), TUMBLE(rowtime, INTERVAL '10' MINUTE)
>
>
> We are also using a UDFAGG function in some of the queries which I 
> think might be cleaned up and optimized a bit (using scala types and 
> possibly not well implemented)
>
> We then turn the result table back into a datastream using 
> toAppendStream, and eventually add a derivative stream to a sink. 
> We've configured TimeCharacteristic to event-time processing.
>
> In some streaming scenarios everything is working fine with a 
> parallelism of 1, but in others it appears that we can't keep up with 
> the event source.
>
> Then we are investigating how to enable parallelism specifically on 
> the SQL table query or aggregator.
>
> Can anyone suggest a good way to go about this? It wasn't clear from 
> the documentation.
>
> Best,
>
> Colin Williams
>
>
>