You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2023/04/19 01:27:00 UTC

[jira] [Resolved] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup

     [ https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Dong Lin resolved FLINK-31753.
------------------------------
    Resolution: Fixed

> Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-31753
>                 URL: https://issues.apache.org/jira/browse/FLINK-31753
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, DataStream CoCroup is still considerably slower than DataSet when co-grouping two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 records from each stream under different modes. The co-group function is chosen to be very lightweight so that benchmark is dominated by the Flink's co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode so that users' don't have to take big regression in order to migrate from DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet<Tuple3<Integer, Integer, Double>> data1 =
>         env.fromCollection(
>                 new DataGenerator(numRecords),
>                 Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet<Tuple3<Integer, Integer, Double>> data2 =
>         env.fromCollection(
>                 new DataGenerator(numRecords),
>                 Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> data1.coGroup(data2)
>         .where((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple -> tuple.f0)
>         .equalTo((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple -> tuple.f0)
>         .with(
>                 new RichCoGroupFunction<
>                         Tuple3<Integer, Integer, Double>,
>                         Tuple3<Integer, Integer, Double>,
>                         Integer>() {
>                     @Override
>                     public void open(Configuration parameters) throws Exception {
>                         super.open(parameters);
>                     }
>                     @Override
>                     public void close() throws Exception {
>                         super.close();
>                     }
>                     @Override
>                     public void coGroup(
>                             Iterable<Tuple3<Integer, Integer, Double>> iterable,
>                             Iterable<Tuple3<Integer, Integer, Double>> iterable1,
>                             Collector<Integer> collector)
>                             throws Exception {
>                         collector.collect(1);
>                     }
>                 })
>         .write(new CountingAndDiscardingSink(), "/tmp");
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)