You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Steven Zhen Wu (Jira)" <ji...@apache.org> on 2022/04/26 22:38:00 UTC

[jira] [Commented] (FLINK-27405) Refactor SourceCoordinator to abstract BaseCoordinator implementation

    [ https://issues.apache.org/jira/browse/FLINK-27405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17528443#comment-17528443 ] 

Steven Zhen Wu commented on FLINK-27405:
----------------------------------------

cc [~pnowojski] [~dwysakowicz] [~thw] please share your thoughts on extracting a `CoordinatorBase` abstract class from `SourceCoordinator` to promote code reuse. if this is ok with you, [~gang ye] can create a PR later.

> Refactor SourceCoordinator to abstract BaseCoordinator implementation
> ---------------------------------------------------------------------
>
>                 Key: FLINK-27405
>                 URL: https://issues.apache.org/jira/browse/FLINK-27405
>             Project: Flink
>          Issue Type: Improvement
>          Components: Runtime / Coordination
>            Reporter: gang ye
>            Priority: Major
>
> To solve small files issue caused by data skewness, Flink Iceberg data shuffling was proposed(design doc [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#). The basic idea is to use statistics operator to collect local statistics for traffic distribution at taskmanagers (workers). Local statistics are periodically sent to the statistics coordinator (running in jobmanager). Once globally aggregated statistics are ready, the statistics coordinator broadcasts them to all operator instances. And then a customized partitioner uses the global statistics which is passed down from statistics operator to distribute data to Iceberg writers.
> In the process of Flink Iceberg data shuffling implementation, we found that, StatisticsCoordinator can share function with SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar function as SourceCoordinatorConext#callInCoordinatorThread and the StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want to refactor the source coordinator classes to abstract a general coordinator implementation to reduce the duplicated code when adding other coordinators. 
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)