You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Felipe Gutierrez <fe...@gmail.com> on 2020/11/09 12:41:35 UTC

Stream aggregation using Flink Table API (Blink plan)

Hi community,

I am testing the "Split Distinct Aggregation" [1] consuming the taxi
ride data set. My sql query from the table environment is the one
below:

Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
COUNT(driverId) FROM TaxiRide GROUP BY startDate");

and I am enableing:
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
configuration.setString("table.exec.mini-batch.size", "5000");
configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
and finally
configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");

I was expecting that the query plan at the WEB UI show to me two hash
phases as it is present here on the image [1]. Instead, it is showing
to me the same plan with one hash phase as I was deploying only one
Local aggregate and one Global aggregate (of course, taking the
parallel instances into consideration). Please see the query execution
plan image attached.

Is there something that I am missing when I config the Table API?
By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
Is the "MiniBatch Aggregation" aggregating as a processing time window
on the operator after the hash phase? If it is, isn't it the same as a
window aggregation instead of an unbounded window as the example
presents?

Thanks!
Felipe

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
[2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Felipe Gutierrez <fe...@gmail.com>.
I see. now it has different query plans. It was documented on another
page so I got confused. Thanks!
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Thu, Nov 12, 2020 at 12:41 PM Jark Wu <im...@gmail.com> wrote:
>
> Hi Felipe,
>
> The default value of `table.optimizer.agg-phase-strategy` is AUTO, if mini-batch is enabled,
> if will use TWO-PHASE, otherwise ONE-PHASE.
>
> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy
>
> On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <fe...@gmail.com> wrote:
>>
>> Hi Jack,
>>
>> I don't get the difference from the "MiniBatch Aggregation" if
>> compared with the "Local-Global Aggregation". On the web page [1] it
>> says that I have to enable the TWO_PHASE parameter. So I compared the
>> query plan from both, with and without the TWO_PHASE parameter. And
>> they are the same. So, I conclude that the mini-batch already is a
>> TWO_PHASE strategy since it is already pre-aggregating locally. Is it
>> correct?
>>
>> Here are both query plans:
>> Thanks, Felipe
>>
>> Table API: mini-batch.enable                            : true
>> Table API: distinct-agg.split.enabled                   : false
>> Table API: parallelism                                  : 4
>> Table API: mini-batch.latency                           : 1 s
>> Table API: mini_batch.size                              : 1000
>> Table API: mini_batch.two_phase                         : false
>>
>> {
>>   "nodes" : [ {
>>     "id" : 1,
>>     "type" : "Source: source",
>>     "pact" : "Data Source",
>>     "contents" : "Source: source",
>>     "parallelism" : 4
>>   }, {
>>     "id" : 2,
>>     "type" : "tokenizer",
>>     "pact" : "Operator",
>>     "contents" : "tokenizer",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 1,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 3,
>>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "pact" : "Operator",
>>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 2,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 4,
>>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "pact" : "Operator",
>>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 3,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 5,
>>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(passengerCnt) AS count$0])",
>>     "pact" : "Operator",
>>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 4,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 7,
>>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(count$0) AS EXPR$0])",
>>     "pact" : "Operator",
>>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 5,
>>       "ship_strategy" : "HASH",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 8,
>>     "type" : "SinkConversionToTuple2",
>>     "pact" : "Operator",
>>     "contents" : "SinkConversionToTuple2",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 7,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 9,
>>     "type" : "flat-output",
>>     "pact" : "Operator",
>>     "contents" : "flat-output",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 8,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 10,
>>     "type" : "Sink: sink",
>>     "pact" : "Data Sink",
>>     "contents" : "Sink: sink",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 9,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   } ]
>> }
>>
>> Table API: mini-batch.enable                            : true
>> Table API: distinct-agg.split.enabled                   : false
>> Table API: parallelism                                  : 4
>> Table API: mini-batch.latency                           : 1 s
>> Table API: mini_batch.size                              : 1000
>> Table API: mini_batch.two_phase                         : true
>>
>> {
>>   "nodes" : [ {
>>     "id" : 1,
>>     "type" : "Source: source",
>>     "pact" : "Data Source",
>>     "contents" : "Source: source",
>>     "parallelism" : 4
>>   }, {
>>     "id" : 2,
>>     "type" : "tokenizer",
>>     "pact" : "Operator",
>>     "contents" : "tokenizer",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 1,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 3,
>>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "pact" : "Operator",
>>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
>> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
>> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 2,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 4,
>>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "pact" : "Operator",
>>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 3,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 5,
>>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(passengerCnt) AS count$0])",
>>     "pact" : "Operator",
>>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 4,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 7,
>>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
>> COUNT(count$0) AS EXPR$0])",
>>     "pact" : "Operator",
>>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
>> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 5,
>>       "ship_strategy" : "HASH",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 8,
>>     "type" : "SinkConversionToTuple2",
>>     "pact" : "Operator",
>>     "contents" : "SinkConversionToTuple2",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 7,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 9,
>>     "type" : "flat-output",
>>     "pact" : "Operator",
>>     "contents" : "flat-output",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 8,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   }, {
>>     "id" : 10,
>>     "type" : "Sink: sink",
>>     "pact" : "Data Sink",
>>     "contents" : "Sink: sink",
>>     "parallelism" : 4,
>>     "predecessors" : [ {
>>       "id" : 9,
>>       "ship_strategy" : "FORWARD",
>>       "side" : "second"
>>     } ]
>>   } ]
>> }
>>
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>>
>>
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
>> <fe...@gmail.com> wrote:
>> >
>> > I see, thanks Timo
>> >
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com
>> >
>> > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <tw...@apache.org> wrote:
>> > >
>> > > Hi Felipe,
>> > >
>> > > with non-deterministic Jark meant that you never know if the mini batch
>> > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
>> > > execution. This depends how fast records arrive at the operator.
>> > >
>> > > In general, processing time can be considered non-deterministic, because
>> > > 100ms must not be 100ms. This depends on the CPU load and other tasks
>> > > such garbage collection etc. Only event-time (and thus event time
>> > > windows) that work on the timestamp in the data instead of machine time
>> > > is determistic,
>> > >
>> > > Regards,
>> > > Timo
>> > >
>> > >
>> > > On 10.11.20 12:02, Felipe Gutierrez wrote:
>> > > > Hi Jark,
>> > > >
>> > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
>> > > > and now the query plan is splitting into two hash partition phases.
>> > > >
>> > > > what do you mean by deterministic time? Why only the window aggregate
>> > > > is deterministic? If I implement the ProcessingTimeCallback [1]
>> > > > interface is it deterministic?
>> > > >
>> > > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
>> > > > Thanks
>> > > >
>> > > > --
>> > > > -- Felipe Gutierrez
>> > > > -- skype: felipe.o.gutierrez
>> > > > -- https://felipeogutierrez.blogspot.com
>> > > >
>> > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <im...@gmail.com> wrote:
>> > > >>
>> > > >> Hi Felipe,
>> > > >>
>> > > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
>> > > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>> > > >>
>> > > >> However, the query in your example is using COUNT(driverId).
>> > > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
>> > > >>
>> > > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
>> > > >>
>> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
>> > > >>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
>> > > >>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
>> > > >>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
>> > > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
>> > > >>    But a window aggregate is triggered by a deterministic time.
>> > > >>
>> > > >>
>> > > >> Best,
>> > > >> Jark
>> > > >>
>> > > >>
>> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <fe...@gmail.com> wrote:
>> > > >>>
>> > > >>> I realized that I forgot the image. Now it is attached.
>> > > >>> --
>> > > >>> -- Felipe Gutierrez
>> > > >>> -- skype: felipe.o.gutierrez
>> > > >>> -- https://felipeogutierrez.blogspot.com
>> > > >>>
>> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>> > > >>> <fe...@gmail.com> wrote:
>> > > >>>>
>> > > >>>> Hi community,
>> > > >>>>
>> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>> > > >>>> ride data set. My sql query from the table environment is the one
>> > > >>>> below:
>> > > >>>>
>> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>> > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>> > > >>>>
>> > > >>>> and I am enableing:
>> > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
>> > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>> > > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
>> > > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>> > > >>>> and finally
>> > > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>> > > >>>>
>> > > >>>> I was expecting that the query plan at the WEB UI show to me two hash
>> > > >>>> phases as it is present here on the image [1]. Instead, it is showing
>> > > >>>> to me the same plan with one hash phase as I was deploying only one
>> > > >>>> Local aggregate and one Global aggregate (of course, taking the
>> > > >>>> parallel instances into consideration). Please see the query execution
>> > > >>>> plan image attached.
>> > > >>>>
>> > > >>>> Is there something that I am missing when I config the Table API?
>> > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>> > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
>> > > >>>> on the operator after the hash phase? If it is, isn't it the same as a
>> > > >>>> window aggregation instead of an unbounded window as the example
>> > > >>>> presents?
>> > > >>>>
>> > > >>>> Thanks!
>> > > >>>> Felipe
>> > > >>>>
>> > > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>> > > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>> > > >>>> --
>> > > >>>> -- Felipe Gutierrez
>> > > >>>> -- skype: felipe.o.gutierrez
>> > > >>>> -- https://felipeogutierrez.blogspot.com
>> > > >
>> > >

Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Jark Wu <im...@gmail.com>.
Hi Felipe,

The default value of `table.optimizer.agg-phase-strategy` is AUTO, if
mini-batch is enabled,
if will use TWO-PHASE, otherwise ONE-PHASE.

https://ci.apache.org/projects/flink/flink-docs-master/dev/table/config.html#table-optimizer-agg-phase-strategy

On Thu, 12 Nov 2020 at 17:52, Felipe Gutierrez <fe...@gmail.com>
wrote:

> Hi Jack,
>
> I don't get the difference from the "MiniBatch Aggregation" if
> compared with the "Local-Global Aggregation". On the web page [1] it
> says that I have to enable the TWO_PHASE parameter. So I compared the
> query plan from both, with and without the TWO_PHASE parameter. And
> they are the same. So, I conclude that the mini-batch already is a
> TWO_PHASE strategy since it is already pre-aggregating locally. Is it
> correct?
>
> Here are both query plans:
> Thanks, Felipe
>
> Table API: mini-batch.enable                            : true
> Table API: distinct-agg.split.enabled                   : false
> Table API: parallelism                                  : 4
> Table API: mini-batch.latency                           : 1 s
> Table API: mini_batch.size                              : 1000
> Table API: mini_batch.two_phase                         : false
>
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: source",
>     "pact" : "Data Source",
>     "contents" : "Source: source",
>     "parallelism" : 4
>   }, {
>     "id" : 2,
>     "type" : "tokenizer",
>     "pact" : "Operator",
>     "contents" : "tokenizer",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 3,
>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "pact" : "Operator",
>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 4,
>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "pact" : "Operator",
>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 3,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 5,
>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(passengerCnt) AS count$0])",
>     "pact" : "Operator",
>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 4,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 7,
>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(count$0) AS EXPR$0])",
>     "pact" : "Operator",
>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 5,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 8,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 7,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 9,
>     "type" : "flat-output",
>     "pact" : "Operator",
>     "contents" : "flat-output",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 8,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Sink: sink",
>     "pact" : "Data Sink",
>     "contents" : "Sink: sink",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
>
> Table API: mini-batch.enable                            : true
> Table API: distinct-agg.split.enabled                   : false
> Table API: parallelism                                  : 4
> Table API: mini-batch.latency                           : 1 s
> Table API: mini_batch.size                              : 1000
> Table API: mini_batch.two_phase                         : true
>
> {
>   "nodes" : [ {
>     "id" : 1,
>     "type" : "Source: source",
>     "pact" : "Data Source",
>     "contents" : "Source: source",
>     "parallelism" : 4
>   }, {
>     "id" : 2,
>     "type" : "tokenizer",
>     "pact" : "Operator",
>     "contents" : "tokenizer",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 1,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 3,
>     "type" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "pact" : "Operator",
>     "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
> fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
> passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 2,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 4,
>     "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "pact" : "Operator",
>     "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 3,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 5,
>     "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(passengerCnt) AS count$0])",
>     "pact" : "Operator",
>     "contents" : "LocalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(passengerCnt) AS count$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 4,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 7,
>     "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
> COUNT(count$0) AS EXPR$0])",
>     "pact" : "Operator",
>     "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
> select=[taxiId, COUNT(count$0) AS EXPR$0])",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 5,
>       "ship_strategy" : "HASH",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 8,
>     "type" : "SinkConversionToTuple2",
>     "pact" : "Operator",
>     "contents" : "SinkConversionToTuple2",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 7,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 9,
>     "type" : "flat-output",
>     "pact" : "Operator",
>     "contents" : "flat-output",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 8,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   }, {
>     "id" : 10,
>     "type" : "Sink: sink",
>     "pact" : "Data Sink",
>     "contents" : "Sink: sink",
>     "parallelism" : 4,
>     "predecessors" : [ {
>       "id" : 9,
>       "ship_strategy" : "FORWARD",
>       "side" : "second"
>     } ]
>   } ]
> }
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html
>
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
> <fe...@gmail.com> wrote:
> >
> > I see, thanks Timo
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <tw...@apache.org> wrote:
> > >
> > > Hi Felipe,
> > >
> > > with non-deterministic Jark meant that you never know if the mini batch
> > > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> > > execution. This depends how fast records arrive at the operator.
> > >
> > > In general, processing time can be considered non-deterministic,
> because
> > > 100ms must not be 100ms. This depends on the CPU load and other tasks
> > > such garbage collection etc. Only event-time (and thus event time
> > > windows) that work on the timestamp in the data instead of machine time
> > > is determistic,
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 10.11.20 12:02, Felipe Gutierrez wrote:
> > > > Hi Jark,
> > > >
> > > > thanks for your reply. Indeed, I forgot to write DISTINCT on the
> query
> > > > and now the query plan is splitting into two hash partition phases.
> > > >
> > > > what do you mean by deterministic time? Why only the window aggregate
> > > > is deterministic? If I implement the ProcessingTimeCallback [1]
> > > > interface is it deterministic?
> > > >
> > > > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > > > Thanks
> > > >
> > > > --
> > > > -- Felipe Gutierrez
> > > > -- skype: felipe.o.gutierrez
> > > > -- https://felipeogutierrez.blogspot.com
> > > >
> > > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <im...@gmail.com> wrote:
> > > >>
> > > >> Hi Felipe,
> > > >>
> > > >> The "Split Distinct Aggregation", i.e. the
> "table.optimizer.distinct-agg.split.enabled" option,
> > > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> > > >>
> > > >> However, the query in your example is using COUNT(driverId).
> > > >> You can update it to COUNT(DISTINCT driverId), and it should show
> two hash phases.
> > > >>
> > > >> Regarding "MiniBatch Aggregation", it is not the same as a
> processing-time window aggregation.
> > > >>
> > > >> 1) MiniBatch is just an optimization on unbounded aggregation, it
> buffers some input records in memory
> > > >>   and processes them together to reduce the state accessing. But
> processing-time window is still a per-record
> > > >>   state accessing style. Besides, the local aggregation also
> applies mini-batch, it only sends the accumulator
> > > >>   of current this mini-batch to the downstream global aggregation,
> and this improves performance a lot.
> > > >> 2) The size of MiniBach is not deterministic. It may be triggered
> by the number of records or a timeout.
> > > >>    But a window aggregate is triggered by a deterministic time.
> > > >>
> > > >>
> > > >> Best,
> > > >> Jark
> > > >>
> > > >>
> > > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <
> felipe.o.gutierrez@gmail.com> wrote:
> > > >>>
> > > >>> I realized that I forgot the image. Now it is attached.
> > > >>> --
> > > >>> -- Felipe Gutierrez
> > > >>> -- skype: felipe.o.gutierrez
> > > >>> -- https://felipeogutierrez.blogspot.com
> > > >>>
> > > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> > > >>> <fe...@gmail.com> wrote:
> > > >>>>
> > > >>>> Hi community,
> > > >>>>
> > > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the
> taxi
> > > >>>> ride data set. My sql query from the table environment is the one
> > > >>>> below:
> > > >>>>
> > > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> > > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> > > >>>>
> > > >>>> and I am enableing:
> > > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> > > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3
> s");
> > > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> > > >>>> configuration.setString("table.optimizer.agg-phase-strategy",
> "TWO_PHASE");
> > > >>>> and finally
> > > >>>>
> configuration.setString("table.optimizer.distinct-agg.split.enabled",
> "true");
> > > >>>>
> > > >>>> I was expecting that the query plan at the WEB UI show to me two
> hash
> > > >>>> phases as it is present here on the image [1]. Instead, it is
> showing
> > > >>>> to me the same plan with one hash phase as I was deploying only
> one
> > > >>>> Local aggregate and one Global aggregate (of course, taking the
> > > >>>> parallel instances into consideration). Please see the query
> execution
> > > >>>> plan image attached.
> > > >>>>
> > > >>>> Is there something that I am missing when I config the Table API?
> > > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation"
> [2].
> > > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time
> window
> > > >>>> on the operator after the hash phase? If it is, isn't it the same
> as a
> > > >>>> window aggregation instead of an unbounded window as the example
> > > >>>> presents?
> > > >>>>
> > > >>>> Thanks!
> > > >>>> Felipe
> > > >>>>
> > > >>>> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> > > >>>> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> > > >>>> --
> > > >>>> -- Felipe Gutierrez
> > > >>>> -- skype: felipe.o.gutierrez
> > > >>>> -- https://felipeogutierrez.blogspot.com
> > > >
> > >
>

Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi Jack,

I don't get the difference from the "MiniBatch Aggregation" if
compared with the "Local-Global Aggregation". On the web page [1] it
says that I have to enable the TWO_PHASE parameter. So I compared the
query plan from both, with and without the TWO_PHASE parameter. And
they are the same. So, I conclude that the mini-batch already is a
TWO_PHASE strategy since it is already pre-aggregating locally. Is it
correct?

Here are both query plans:
Thanks, Felipe

Table API: mini-batch.enable                            : true
Table API: distinct-agg.split.enabled                   : false
Table API: parallelism                                  : 4
Table API: mini-batch.latency                           : 1 s
Table API: mini_batch.size                              : 1000
Table API: mini_batch.two_phase                         : false

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: source",
    "pact" : "Data Source",
    "contents" : "Source: source",
    "parallelism" : 4
  }, {
    "id" : 2,
    "type" : "tokenizer",
    "pact" : "Operator",
    "contents" : "tokenizer",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "pact" : "Operator",
    "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(passengerCnt) AS count$0])",
    "pact" : "Operator",
    "contents" : "LocalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(passengerCnt) AS count$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(count$0) AS EXPR$0])",
    "pact" : "Operator",
    "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(count$0) AS EXPR$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 8,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 7,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 9,
    "type" : "flat-output",
    "pact" : "Operator",
    "contents" : "flat-output",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 8,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Sink: sink",
    "pact" : "Data Sink",
    "contents" : "Sink: sink",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}

Table API: mini-batch.enable                            : true
Table API: distinct-agg.split.enabled                   : false
Table API: parallelism                                  : 4
Table API: mini-batch.latency                           : 1 s
Table API: mini_batch.size                              : 1000
Table API: mini_batch.two_phase                         : true

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: source",
    "pact" : "Data Source",
    "contents" : "Source: source",
    "parallelism" : 4
  }, {
    "id" : 2,
    "type" : "tokenizer",
    "pact" : "Operator",
    "contents" : "tokenizer",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 3,
    "type" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "pact" : "Operator",
    "contents" : "SourceConversion(table=[Unregistered_DataStream_2],
fields=[dayOfTheYear, driverId, endLat, endLon, endTime, isStart,
passengerCnt, rideId, startLat, startLon, startTime, taxiId])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "pact" : "Operator",
    "contents" : "MiniBatchAssigner(interval=[1000ms], mode=[ProcTime])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 3,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "LocalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(passengerCnt) AS count$0])",
    "pact" : "Operator",
    "contents" : "LocalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(passengerCnt) AS count$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "GlobalGroupAggregate(groupBy=[taxiId], select=[taxiId,
COUNT(count$0) AS EXPR$0])",
    "pact" : "Operator",
    "contents" : "GlobalGroupAggregate(groupBy=[taxiId],
select=[taxiId, COUNT(count$0) AS EXPR$0])",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 5,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 8,
    "type" : "SinkConversionToTuple2",
    "pact" : "Operator",
    "contents" : "SinkConversionToTuple2",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 7,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 9,
    "type" : "flat-output",
    "pact" : "Operator",
    "contents" : "flat-output",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 8,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 10,
    "type" : "Sink: sink",
    "pact" : "Data Sink",
    "contents" : "Sink: sink",
    "parallelism" : 4,
    "predecessors" : [ {
      "id" : 9,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}


[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html


--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 6:25 PM Felipe Gutierrez
<fe...@gmail.com> wrote:
>
> I see, thanks Timo
>
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <tw...@apache.org> wrote:
> >
> > Hi Felipe,
> >
> > with non-deterministic Jark meant that you never know if the mini batch
> > timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> > execution. This depends how fast records arrive at the operator.
> >
> > In general, processing time can be considered non-deterministic, because
> > 100ms must not be 100ms. This depends on the CPU load and other tasks
> > such garbage collection etc. Only event-time (and thus event time
> > windows) that work on the timestamp in the data instead of machine time
> > is determistic,
> >
> > Regards,
> > Timo
> >
> >
> > On 10.11.20 12:02, Felipe Gutierrez wrote:
> > > Hi Jark,
> > >
> > > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> > > and now the query plan is splitting into two hash partition phases.
> > >
> > > what do you mean by deterministic time? Why only the window aggregate
> > > is deterministic? If I implement the ProcessingTimeCallback [1]
> > > interface is it deterministic?
> > >
> > > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > > Thanks
> > >
> > > --
> > > -- Felipe Gutierrez
> > > -- skype: felipe.o.gutierrez
> > > -- https://felipeogutierrez.blogspot.com
> > >
> > > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <im...@gmail.com> wrote:
> > >>
> > >> Hi Felipe,
> > >>
> > >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
> > >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> > >>
> > >> However, the query in your example is using COUNT(driverId).
> > >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
> > >>
> > >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
> > >>
> > >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
> > >>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
> > >>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
> > >>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
> > >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
> > >>    But a window aggregate is triggered by a deterministic time.
> > >>
> > >>
> > >> Best,
> > >> Jark
> > >>
> > >>
> > >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <fe...@gmail.com> wrote:
> > >>>
> > >>> I realized that I forgot the image. Now it is attached.
> > >>> --
> > >>> -- Felipe Gutierrez
> > >>> -- skype: felipe.o.gutierrez
> > >>> -- https://felipeogutierrez.blogspot.com
> > >>>
> > >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> > >>> <fe...@gmail.com> wrote:
> > >>>>
> > >>>> Hi community,
> > >>>>
> > >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> > >>>> ride data set. My sql query from the table environment is the one
> > >>>> below:
> > >>>>
> > >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> > >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> > >>>>
> > >>>> and I am enableing:
> > >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> > >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> > >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> > >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> > >>>> and finally
> > >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
> > >>>>
> > >>>> I was expecting that the query plan at the WEB UI show to me two hash
> > >>>> phases as it is present here on the image [1]. Instead, it is showing
> > >>>> to me the same plan with one hash phase as I was deploying only one
> > >>>> Local aggregate and one Global aggregate (of course, taking the
> > >>>> parallel instances into consideration). Please see the query execution
> > >>>> plan image attached.
> > >>>>
> > >>>> Is there something that I am missing when I config the Table API?
> > >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> > >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
> > >>>> on the operator after the hash phase? If it is, isn't it the same as a
> > >>>> window aggregation instead of an unbounded window as the example
> > >>>> presents?
> > >>>>
> > >>>> Thanks!
> > >>>> Felipe
> > >>>>
> > >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> > >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> > >>>> --
> > >>>> -- Felipe Gutierrez
> > >>>> -- skype: felipe.o.gutierrez
> > >>>> -- https://felipeogutierrez.blogspot.com
> > >
> >

Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Felipe Gutierrez <fe...@gmail.com>.
I see, thanks Timo

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 3:22 PM Timo Walther <tw...@apache.org> wrote:
>
> Hi Felipe,
>
> with non-deterministic Jark meant that you never know if the mini batch
> timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the
> execution. This depends how fast records arrive at the operator.
>
> In general, processing time can be considered non-deterministic, because
> 100ms must not be 100ms. This depends on the CPU load and other tasks
> such garbage collection etc. Only event-time (and thus event time
> windows) that work on the timestamp in the data instead of machine time
> is determistic,
>
> Regards,
> Timo
>
>
> On 10.11.20 12:02, Felipe Gutierrez wrote:
> > Hi Jark,
> >
> > thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> > and now the query plan is splitting into two hash partition phases.
> >
> > what do you mean by deterministic time? Why only the window aggregate
> > is deterministic? If I implement the ProcessingTimeCallback [1]
> > interface is it deterministic?
> >
> > [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> > Thanks
> >
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
> >
> > On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <im...@gmail.com> wrote:
> >>
> >> Hi Felipe,
> >>
> >> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
> >>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
> >>
> >> However, the query in your example is using COUNT(driverId).
> >> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
> >>
> >> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
> >>
> >> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
> >>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
> >>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
> >>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
> >> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
> >>    But a window aggregate is triggered by a deterministic time.
> >>
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <fe...@gmail.com> wrote:
> >>>
> >>> I realized that I forgot the image. Now it is attached.
> >>> --
> >>> -- Felipe Gutierrez
> >>> -- skype: felipe.o.gutierrez
> >>> -- https://felipeogutierrez.blogspot.com
> >>>
> >>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> >>> <fe...@gmail.com> wrote:
> >>>>
> >>>> Hi community,
> >>>>
> >>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> >>>> ride data set. My sql query from the table environment is the one
> >>>> below:
> >>>>
> >>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> >>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> >>>>
> >>>> and I am enableing:
> >>>> configuration.setString("table.exec.mini-batch.enabled", "true");
> >>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> >>>> configuration.setString("table.exec.mini-batch.size", "5000");
> >>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> >>>> and finally
> >>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
> >>>>
> >>>> I was expecting that the query plan at the WEB UI show to me two hash
> >>>> phases as it is present here on the image [1]. Instead, it is showing
> >>>> to me the same plan with one hash phase as I was deploying only one
> >>>> Local aggregate and one Global aggregate (of course, taking the
> >>>> parallel instances into consideration). Please see the query execution
> >>>> plan image attached.
> >>>>
> >>>> Is there something that I am missing when I config the Table API?
> >>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> >>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
> >>>> on the operator after the hash phase? If it is, isn't it the same as a
> >>>> window aggregation instead of an unbounded window as the example
> >>>> presents?
> >>>>
> >>>> Thanks!
> >>>> Felipe
> >>>>
> >>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> >>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> >>>> --
> >>>> -- Felipe Gutierrez
> >>>> -- skype: felipe.o.gutierrez
> >>>> -- https://felipeogutierrez.blogspot.com
> >
>

Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Timo Walther <tw...@apache.org>.
Hi Felipe,

with non-deterministic Jark meant that you never know if the mini batch 
timer (every 3 s) or the mini batch threshold (e.g. 3 rows) fires the 
execution. This depends how fast records arrive at the operator.

In general, processing time can be considered non-deterministic, because 
100ms must not be 100ms. This depends on the CPU load and other tasks 
such garbage collection etc. Only event-time (and thus event time 
windows) that work on the timestamp in the data instead of machine time 
is determistic,

Regards,
Timo


On 10.11.20 12:02, Felipe Gutierrez wrote:
> Hi Jark,
> 
> thanks for your reply. Indeed, I forgot to write DISTINCT on the query
> and now the query plan is splitting into two hash partition phases.
> 
> what do you mean by deterministic time? Why only the window aggregate
> is deterministic? If I implement the ProcessingTimeCallback [1]
> interface is it deterministic?
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
> Thanks
> 
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
> 
> On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <im...@gmail.com> wrote:
>>
>> Hi Felipe,
>>
>> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
>>   only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>>
>> However, the query in your example is using COUNT(driverId).
>> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
>>
>> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
>>
>> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
>>   and processes them together to reduce the state accessing. But processing-time window is still a per-record
>>   state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
>>   of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
>> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
>>    But a window aggregate is triggered by a deterministic time.
>>
>>
>> Best,
>> Jark
>>
>>
>> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <fe...@gmail.com> wrote:
>>>
>>> I realized that I forgot the image. Now it is attached.
>>> --
>>> -- Felipe Gutierrez
>>> -- skype: felipe.o.gutierrez
>>> -- https://felipeogutierrez.blogspot.com
>>>
>>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>>> <fe...@gmail.com> wrote:
>>>>
>>>> Hi community,
>>>>
>>>> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>>>> ride data set. My sql query from the table environment is the one
>>>> below:
>>>>
>>>> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>>>> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>>>>
>>>> and I am enableing:
>>>> configuration.setString("table.exec.mini-batch.enabled", "true");
>>>> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>>>> configuration.setString("table.exec.mini-batch.size", "5000");
>>>> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>>>> and finally
>>>> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>>>>
>>>> I was expecting that the query plan at the WEB UI show to me two hash
>>>> phases as it is present here on the image [1]. Instead, it is showing
>>>> to me the same plan with one hash phase as I was deploying only one
>>>> Local aggregate and one Global aggregate (of course, taking the
>>>> parallel instances into consideration). Please see the query execution
>>>> plan image attached.
>>>>
>>>> Is there something that I am missing when I config the Table API?
>>>> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>>>> Is the "MiniBatch Aggregation" aggregating as a processing time window
>>>> on the operator after the hash phase? If it is, isn't it the same as a
>>>> window aggregation instead of an unbounded window as the example
>>>> presents?
>>>>
>>>> Thanks!
>>>> Felipe
>>>>
>>>> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>>>> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>>>> --
>>>> -- Felipe Gutierrez
>>>> -- skype: felipe.o.gutierrez
>>>> -- https://felipeogutierrez.blogspot.com
> 


Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Felipe Gutierrez <fe...@gmail.com>.
Hi Jark,

thanks for your reply. Indeed, I forgot to write DISTINCT on the query
and now the query plan is splitting into two hash partition phases.

what do you mean by deterministic time? Why only the window aggregate
is deterministic? If I implement the ProcessingTimeCallback [1]
interface is it deterministic?

[1] https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/streaming/runtime/tasks/ProcessingTimeCallback.html
Thanks

--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Tue, Nov 10, 2020 at 7:55 AM Jark Wu <im...@gmail.com> wrote:
>
> Hi Felipe,
>
> The "Split Distinct Aggregation", i.e. the "table.optimizer.distinct-agg.split.enabled" option,
>  only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).
>
> However, the query in your example is using COUNT(driverId).
> You can update it to COUNT(DISTINCT driverId), and it should show two hash phases.
>
> Regarding "MiniBatch Aggregation", it is not the same as a processing-time window aggregation.
>
> 1) MiniBatch is just an optimization on unbounded aggregation, it buffers some input records in memory
>  and processes them together to reduce the state accessing. But processing-time window is still a per-record
>  state accessing style. Besides, the local aggregation also applies mini-batch, it only sends the accumulator
>  of current this mini-batch to the downstream global aggregation, and this improves performance a lot.
> 2) The size of MiniBach is not deterministic. It may be triggered by the number of records or a timeout.
>   But a window aggregate is triggered by a deterministic time.
>
>
> Best,
> Jark
>
>
> On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <fe...@gmail.com> wrote:
>>
>> I realized that I forgot the image. Now it is attached.
>> --
>> -- Felipe Gutierrez
>> -- skype: felipe.o.gutierrez
>> -- https://felipeogutierrez.blogspot.com
>>
>> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
>> <fe...@gmail.com> wrote:
>> >
>> > Hi community,
>> >
>> > I am testing the "Split Distinct Aggregation" [1] consuming the taxi
>> > ride data set. My sql query from the table environment is the one
>> > below:
>> >
>> > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
>> > COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>> >
>> > and I am enableing:
>> > configuration.setString("table.exec.mini-batch.enabled", "true");
>> > configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
>> > configuration.setString("table.exec.mini-batch.size", "5000");
>> > configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
>> > and finally
>> > configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>> >
>> > I was expecting that the query plan at the WEB UI show to me two hash
>> > phases as it is present here on the image [1]. Instead, it is showing
>> > to me the same plan with one hash phase as I was deploying only one
>> > Local aggregate and one Global aggregate (of course, taking the
>> > parallel instances into consideration). Please see the query execution
>> > plan image attached.
>> >
>> > Is there something that I am missing when I config the Table API?
>> > By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
>> > Is the "MiniBatch Aggregation" aggregating as a processing time window
>> > on the operator after the hash phase? If it is, isn't it the same as a
>> > window aggregation instead of an unbounded window as the example
>> > presents?
>> >
>> > Thanks!
>> > Felipe
>> >
>> > [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
>> > [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
>> > --
>> > -- Felipe Gutierrez
>> > -- skype: felipe.o.gutierrez
>> > -- https://felipeogutierrez.blogspot.com

Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Jark Wu <im...@gmail.com>.
Hi Felipe,

The "Split Distinct Aggregation", i.e. the
"table.optimizer.distinct-agg.split.enabled" option,
 only works for distinct aggregations (e.g. COUNT(DISTINCT ...)).

However, the query in your example is using COUNT(driverId).
You can update it to COUNT(DISTINCT driverId), and it should show two hash
phases.

Regarding "MiniBatch Aggregation", it is not the same as a processing-time
window aggregation.

1) MiniBatch is just an optimization on unbounded aggregation, it buffers
some input records in memory
 and processes them together to reduce the state accessing. But
processing-time window is still a per-record
 state accessing style. Besides, the local aggregation also applies
mini-batch, it only sends the accumulator
 of current this mini-batch to the downstream global aggregation, and this
improves performance a lot.
2) The size of MiniBach is not deterministic. It may be triggered by the
number of records or a timeout.
  But a window aggregate is triggered by a deterministic time.


Best,
Jark


On Mon, 9 Nov 2020 at 21:45, Felipe Gutierrez <fe...@gmail.com>
wrote:

> I realized that I forgot the image. Now it is attached.
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com
>
> On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
> <fe...@gmail.com> wrote:
> >
> > Hi community,
> >
> > I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> > ride data set. My sql query from the table environment is the one
> > below:
> >
> > Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> > COUNT(driverId) FROM TaxiRide GROUP BY startDate");
> >
> > and I am enableing:
> > configuration.setString("table.exec.mini-batch.enabled", "true");
> > configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> > configuration.setString("table.exec.mini-batch.size", "5000");
> > configuration.setString("table.optimizer.agg-phase-strategy",
> "TWO_PHASE");
> > and finally
> > configuration.setString("table.optimizer.distinct-agg.split.enabled",
> "true");
> >
> > I was expecting that the query plan at the WEB UI show to me two hash
> > phases as it is present here on the image [1]. Instead, it is showing
> > to me the same plan with one hash phase as I was deploying only one
> > Local aggregate and one Global aggregate (of course, taking the
> > parallel instances into consideration). Please see the query execution
> > plan image attached.
> >
> > Is there something that I am missing when I config the Table API?
> > By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> > Is the "MiniBatch Aggregation" aggregating as a processing time window
> > on the operator after the hash phase? If it is, isn't it the same as a
> > window aggregation instead of an unbounded window as the example
> > presents?
> >
> > Thanks!
> > Felipe
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> > [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> > --
> > -- Felipe Gutierrez
> > -- skype: felipe.o.gutierrez
> > -- https://felipeogutierrez.blogspot.com
>

Re: Stream aggregation using Flink Table API (Blink plan)

Posted by Felipe Gutierrez <fe...@gmail.com>.
I realized that I forgot the image. Now it is attached.
--
-- Felipe Gutierrez
-- skype: felipe.o.gutierrez
-- https://felipeogutierrez.blogspot.com

On Mon, Nov 9, 2020 at 1:41 PM Felipe Gutierrez
<fe...@gmail.com> wrote:
>
> Hi community,
>
> I am testing the "Split Distinct Aggregation" [1] consuming the taxi
> ride data set. My sql query from the table environment is the one
> below:
>
> Table tableCountDistinct = tableEnv.sqlQuery("SELECT startDate,
> COUNT(driverId) FROM TaxiRide GROUP BY startDate");
>
> and I am enableing:
> configuration.setString("table.exec.mini-batch.enabled", "true");
> configuration.setString("table.exec.mini-batch.allow-latency", "3 s");
> configuration.setString("table.exec.mini-batch.size", "5000");
> configuration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE");
> and finally
> configuration.setString("table.optimizer.distinct-agg.split.enabled", "true");
>
> I was expecting that the query plan at the WEB UI show to me two hash
> phases as it is present here on the image [1]. Instead, it is showing
> to me the same plan with one hash phase as I was deploying only one
> Local aggregate and one Global aggregate (of course, taking the
> parallel instances into consideration). Please see the query execution
> plan image attached.
>
> Is there something that I am missing when I config the Table API?
> By the way, I am a bit confused with the "MiniBatch Aggregation" [2].
> Is the "MiniBatch Aggregation" aggregating as a processing time window
> on the operator after the hash phase? If it is, isn't it the same as a
> window aggregation instead of an unbounded window as the example
> presents?
>
> Thanks!
> Felipe
>
> [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#split-distinct-aggregation
> [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html#minibatch-aggregation
> --
> -- Felipe Gutierrez
> -- skype: felipe.o.gutierrez
> -- https://felipeogutierrez.blogspot.com