You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "gang ye (Jira)" <ji...@apache.org> on 2022/04/26 04:07:00 UTC

[jira] [Created] (FLINK-27405) Refactor SourceCoordinator to abstract BaseCoordinator implementation

gang ye created FLINK-27405:
-------------------------------

             Summary: Refactor SourceCoordinator to abstract BaseCoordinator implementation
                 Key: FLINK-27405
                 URL: https://issues.apache.org/jira/browse/FLINK-27405
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Coordination
            Reporter: gang ye


To solve small files issue caused by data skewness, Flink Iceberg data shuffling was proposed(design doc [https://docs.google.com/document/d/13N8cMqPi-ZPSKbkXGOBMPOzbv2Fua59j8bIjjtxLWqo/edit]#). The basic idea is to use statistics operator to collect local statistics for traffic distribution at taskmanagers (workers). Local statistics are periodically sent to the statistics coordinator (running in jobmanager). Once globally aggregated statistics are ready, the statistics coordinator broadcasts them to all operator instances. And then a customized partitioner uses the global statistics which is passed down from statistics operator to distribute data to Iceberg writers.

In the process of Flink Iceberg data shuffling implementation, we found that, StatisticsCoordinator can share function with SourceCoordinator#runInEventLoop, StatisticsCoordinatorContext needs similar function as SourceCoordinatorConext#callInCoordinatorThread and the StatisticsCoordinatorProvider ExecutorThreadFactory logic is almost same as SourceCoordinatorProvider#CoordinatorExecutorThreadFactory. So we would want to refactor the source coordinator classes to abstract a general coordinator implementation to reduce the duplicated code when adding other coordinators. 

 



--
This message was sent by Atlassian Jira
(v8.20.7#820007)