You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Dong Lin (Jira)" <ji...@apache.org> on 2023/04/12 08:45:00 UTC
[jira] [Commented] (FLINK-31753) Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup
[ https://issues.apache.org/jira/browse/FLINK-31753?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17711254#comment-17711254 ]
Dong Lin commented on FLINK-31753:
----------------------------------
Merged to apache/flink-ml master branch d7c9c8b5242a3c161d430a03fc4e4c3b0d1d78ff
> Support DataStream CoGroup in stream Mode with similar performance as DataSet CoGroup
> -------------------------------------------------------------------------------------
>
> Key: FLINK-31753
> URL: https://issues.apache.org/jira/browse/FLINK-31753
> Project: Flink
> Issue Type: Bug
> Components: Library / Machine Learning
> Reporter: Dong Lin
> Assignee: Dong Lin
> Priority: Major
> Labels: pull-request-available
> Fix For: ml-2.3.0
>
>
> DataSet has been deprecated and will be removed from Flink. However, DataStream CoCroup is still considerably slower than DataSet when co-grouping two bounded streams.
> Here are the benchmark results of co-grouping two bounded streams with 4*10^6 records from each stream under different modes. The co-group function is chosen to be very lightweight so that benchmark is dominated by the Flink's co-group overhead.
> DataSet: 5.6 sec
> DataStream batch mode: 15.4 sec
> DataStream stream mode with rocksdb: 81 sec
> We should be able to performance co-group operation in DataStream stream mode so that users' don't have to take big regression in order to migrate from DataSet to DataStream.
> We will first add util function in Flink ML to unblock the migration of some algorithms from Alink to Flink ML.
> Here is the code used to benchmark DataSet's CoGroup.
> {code:java}
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> env.getConfig().enableObjectReuse();
> env.getConfig().disableGenericTypes();
> env.setRestartStrategy(RestartStrategies.noRestart());
> env.setParallelism(1);
> DataSet<Tuple3<Integer, Integer, Double>> data1 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> DataSet<Tuple3<Integer, Integer, Double>> data2 =
> env.fromCollection(
> new DataGenerator(numRecords),
> Types.TUPLE(Types.INT, Types.INT, Types.DOUBLE));
> data1.coGroup(data2)
> .where((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple -> tuple.f0)
> .equalTo((KeySelector<Tuple3<Integer, Integer, Double>, Integer>) tuple -> tuple.f0)
> .with(
> new RichCoGroupFunction<
> Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>,
> Integer>() {
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> }
> @Override
> public void close() throws Exception {
> super.close();
> }
> @Override
> public void coGroup(
> Iterable<Tuple3<Integer, Integer, Double>> iterable,
> Iterable<Tuple3<Integer, Integer, Double>> iterable1,
> Collector<Integer> collector)
> throws Exception {
> collector.collect(1);
> }
> })
> .write(new CountingAndDiscardingSink(), "/tmp");
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)