You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by vtygoss <vt...@126.com> on 2021/09/18 09:46:16 UTC

throughput reduced when mini-batch is enabled; how to concat multiset using separator

Hi, Flink community!




i have two problems,


1. how to concat multiset using separator?
    In spark sql: concat_ws(seperator, collect_set(column)). But in flink, the result data type of function 'collect(distinct column) ' is multiset, the corresponding class of multiset is org.apache.flink.calcite.shaded.com.google.common.collect.Multiset when writing UDF? Is there any better way such as builtin functions?  


2. when i enable mini-batch, the throughput of flink application is reduced by 27%, but the mini-batch is designed to increase throughput. did i do something wrong?    


Thank you for any suggestions.


```
       TableEnvironment tenv=...
  tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.infer-source-parallelism",false)
  tenv.getConfig.getConfiguration.setBoolean("table.exec.hive.fallback-mapred-reader", true)
  tenv.getConfig.getConfiguration.setString("table.exec.sink.not-null-enforcer", "drop")
  tenv.getConfig.getConfiguration.setInteger("table.exec.resource.default-parallelism", GlobalProp.instance.getProperty("flink.parallelism.max","20").toInt)
 
  tenv.getConfig.getConfiguration.setBoolean("table.optimizer.join-reorder-enabled",true)
  tenv.getConfig.getConfiguration.setInteger("table.optimizer.distinct-agg.split.bucket-num", 10240)
  tenv.getConfig.getConfiguration.setBoolean("table.optimizer.distinct-agg.split.enabled", true)
  tenv.getConfig.getConfiguration.setInteger("table.optimizer.join.broadcast-threshold", 128 * 1024 * 1024)
  tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-sub-plan-enabled", true)
  tenv.getConfig.getConfiguration.setBoolean("table.optimizer.reuse-source-enabled", true)
  tenv.getConfig.getConfiguration.setBoolean("table.optimizer.source.predicate-pushdown-enabled", true)
  tenv.getConfig.getConfiguration.setInteger("table.exec.async-lookup.buffer-capacity", 10000)
  // mini-batch
  tenv.getConfig.getConfiguration.setBoolean("table.exec.mini-batch.enabled", true)
  tenv.getConfig.getConfiguration.setString("table.exec.mini-batch.allow-latency", "1 min")
  tenv.getConfig.getConfiguration.setInteger("table.exec.mini-batch.size", 10000)
  tenv
```




Best Regards!

Re: throughput reduced when mini-batch is enabled; how to concat multiset using separator

Posted by JING ZHANG <be...@gmail.com>.
Sorry, I forget to say MiniBatch optimization is not proper for
`LISTAGG(DISTINCT
column)` or any other aggregate function on DISTINCT fields because they
need keep MapState for history values.

JING ZHANG <be...@gmail.com> 于2021年9月18日周六 下午8:00写道:

> Hi,
> > But in flink, the result data type of function 'collect(distinct
> column) ' is multiset, the corresponding class of multiset is
> org.apache.flink.calcite.shaded.com.google.common.collect.Multiset?
> No. Result Data Type of function 'collect(distinct column)' is `org.apache.flink.table.types.logical.MultisetType`.
> Its data value is a `java.util.Map`. If you write a UDF to process the
> result of `collect` function. The input data of eval method of UDF should
> be Map type.
> [image: image.png]
> > How to concat  'collect(distinct column)' result  using separator? Is
> there any better way such as builtin functions?
> Yes, you could try `LISTAGG(DISTINCT column, separator_character)`. please
> see more information in [1] to find `LISTAGG` aggregate function.
>
>> SELECT LISTAGG(DISTINCT c, ',')  FROM MyTable GROUP BY b"
>
>
> >  when i enable mini-batch, the throughput of flink application
> is reduced by 27%, but the mini-batch is designed to increase throughput.
> did i do something wrong?
>
> First of all, for some special UDAG, mini-batch has no benefits, for
> example (COUNT DISTINCT), COLLECT(column), MAX/MIN/firstValue/lastValue on
> retract input stream and so on.
>
> Because for those UDAG, a MapState is needed to store history column
> values.
>
> MiniBatch would caching a bundle of inputs in a buffer. When the bundle of
> inputs is triggered to process,  aggregate operator still needs to access
> the map state by per input.
>
> MiniBatch optimization is more proper for count, max/min on append input
> stream.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/#aggregate-functions
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tuning/
>
> Hope the information is helpful.
>
> Best,
> JING ZHANG
>
> vtygoss <vt...@126.com> 于2021年9月18日周六 下午5:46写道:
>
>> Hi, Flink community!
>>
>>
>>
>> i have two problems,
>>
>>
>> 1. how to concat multiset using separator?
>>
>>     In spark sql: concat_ws(seperator, collect_set(column)). But in
>> flink, the result data type of function 'collect(distinct column) ' is
>> multiset, the corresponding class of multiset is
>> org.apache.flink.calcite.shaded.com.google.common.collect.Multiset when
>> writing UDF? Is there any better way such as builtin functions?
>>
>>
>> 2. when i enable mini-batch, the throughput of flink application
>> is reduced by 27%, but the mini-batch is designed to increase throughput.
>> did i do something wrong?
>>
>>
>> Thank you for any suggestions.
>>
>>
>> ```
>>
>>        TableEnvironment tenv=...
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.exec.hive.infer-source-parallelism",false)
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.exec.hive.fallback-mapred-reader", true)
>>   tenv.getConfig.getConfiguration.setString(
>> "table.exec.sink.not-null-enforcer", "drop")
>>   tenv.getConfig.getConfiguration.setInteger(
>> "table.exec.resource.default-parallelism",
>> GlobalProp.instance.getProperty("flink.parallelism.max","20").toInt)
>>
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.optimizer.join-reorder-enabled",true)
>>   tenv.getConfig.getConfiguration.setInteger(
>> "table.optimizer.distinct-agg.split.bucket-num", 10240)
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.optimizer.distinct-agg.split.enabled", true)
>>   tenv.getConfig.getConfiguration.setInteger(
>> "table.optimizer.join.broadcast-threshold", 128 * 1024 * 1024)
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.optimizer.reuse-sub-plan-enabled", true)
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.optimizer.reuse-source-enabled", true)
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.optimizer.source.predicate-pushdown-enabled", true)
>>   tenv.getConfig.getConfiguration.setInteger(
>> "table.exec.async-lookup.buffer-capacity", 10000)
>>   // mini-batch
>>   tenv.getConfig.getConfiguration.setBoolean(
>> "table.exec.mini-batch.enabled", true)
>>   tenv.getConfig.getConfiguration.setString(
>> "table.exec.mini-batch.allow-latency", "1 min")
>>   tenv.getConfig.getConfiguration.setInteger("table.exec.mini-batch.size"
>> , 10000)
>>   tenv
>>
>> ```
>>
>>
>>
>> Best Regards!
>>
>

Re: throughput reduced when mini-batch is enabled; how to concat multiset using separator

Posted by JING ZHANG <be...@gmail.com>.
Hi,
> But in flink, the result data type of function 'collect(distinct column)
' is multiset, the corresponding class of multiset is
org.apache.flink.calcite.shaded.com.google.common.collect.Multiset?
No. Result Data Type of function 'collect(distinct column)' is
`org.apache.flink.table.types.logical.MultisetType`.
Its data value is a `java.util.Map`. If you write a UDF to process the
result of `collect` function. The input data of eval method of UDF should
be Map type.
[image: image.png]
> How to concat  'collect(distinct column)' result  using separator? Is
there any better way such as builtin functions?
Yes, you could try `LISTAGG(DISTINCT column, separator_character)`. please
see more information in [1] to find `LISTAGG` aggregate function.

> SELECT LISTAGG(DISTINCT c, ',')  FROM MyTable GROUP BY b"


>  when i enable mini-batch, the throughput of flink application is reduced
by 27%, but the mini-batch is designed to increase throughput. did i do
something wrong?

First of all, for some special UDAG, mini-batch has no benefits, for
example (COUNT DISTINCT), COLLECT(column), MAX/MIN/firstValue/lastValue on
retract input stream and so on.

Because for those UDAG, a MapState is needed to store history column
values.

MiniBatch would caching a bundle of inputs in a buffer. When the bundle of
inputs is triggered to process,  aggregate operator still needs to access
the map state by per input.

MiniBatch optimization is more proper for count, max/min on append input
stream.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/systemfunctions/#aggregate-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/tuning/

Hope the information is helpful.

Best,
JING ZHANG

vtygoss <vt...@126.com> 于2021年9月18日周六 下午5:46写道:

> Hi, Flink community!
>
>
>
> i have two problems,
>
>
> 1. how to concat multiset using separator?
>
>     In spark sql: concat_ws(seperator, collect_set(column)). But in flink,
> the result data type of function 'collect(distinct column) ' is multiset,
> the corresponding class of multiset is
> org.apache.flink.calcite.shaded.com.google.common.collect.Multiset when
> writing UDF? Is there any better way such as builtin functions?
>
>
> 2. when i enable mini-batch, the throughput of flink application
> is reduced by 27%, but the mini-batch is designed to increase throughput.
> did i do something wrong?
>
>
> Thank you for any suggestions.
>
>
> ```
>
>        TableEnvironment tenv=...
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.exec.hive.infer-source-parallelism",false)
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.exec.hive.fallback-mapred-reader", true)
>   tenv.getConfig.getConfiguration.setString(
> "table.exec.sink.not-null-enforcer", "drop")
>   tenv.getConfig.getConfiguration.setInteger(
> "table.exec.resource.default-parallelism",
> GlobalProp.instance.getProperty("flink.parallelism.max","20").toInt)
>
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.optimizer.join-reorder-enabled",true)
>   tenv.getConfig.getConfiguration.setInteger(
> "table.optimizer.distinct-agg.split.bucket-num", 10240)
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.optimizer.distinct-agg.split.enabled", true)
>   tenv.getConfig.getConfiguration.setInteger(
> "table.optimizer.join.broadcast-threshold", 128 * 1024 * 1024)
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.optimizer.reuse-sub-plan-enabled", true)
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.optimizer.reuse-source-enabled", true)
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.optimizer.source.predicate-pushdown-enabled", true)
>   tenv.getConfig.getConfiguration.setInteger(
> "table.exec.async-lookup.buffer-capacity", 10000)
>   // mini-batch
>   tenv.getConfig.getConfiguration.setBoolean(
> "table.exec.mini-batch.enabled", true)
>   tenv.getConfig.getConfiguration.setString(
> "table.exec.mini-batch.allow-latency", "1 min")
>   tenv.getConfig.getConfiguration.setInteger("table.exec.mini-batch.size"
> , 10000)
>   tenv
>
> ```
>
>
>
> Best Regards!
>