You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Matthias Pohl (Jira)" <ji...@apache.org> on 2022/12/09 15:50:00 UTC

[jira] [Updated] (FLINK-30354) Reducing the number of ThreadPools in LookupFullCache and related cache-loading classes

     [ https://issues.apache.org/jira/browse/FLINK-30354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matthias Pohl updated FLINK-30354:
----------------------------------
    Description: 
In the course of reviewing FLINK-29405, I came up with a proposal to reduce the complexity of the {{LookupFullCache}} implementation and shrinking the amount of threadpools being used from 3 to 2. Here's the proposal I also shared in the [FLINK-29405 PR comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:

About the responsibilities how I see them:
* {{LookupFullCache}} is the composite class for combining the {{CacheLoader}} and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
* {{ReloadTriggerContext}} provides an async call to trigger the reload but also some utility methods for providing processing or event time (where it's not clear to me why this is connected with the reload. It looks like a future task based on the TODO comments)
* {{CacheLoader}} is in charge of loading the data into memory (if possible concurrently).
{{CacheReloadTrigger}} provides different strategies to trigger new reloads.

About the different executors:
* The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes longer, subsequently triggered calls pile up. Here, I'm wondering whether that's what we want. thinking
* {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in {{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading the data. It triggers {{CacheLoader#updateCache}} with {{CacheLoader#reloadLock}} being acquired. {{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data is loaded concurrently if possible using a {{FixedThreadPool}}.

My proposal is now to reduce the number of used thread pools: Instead of having a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}} implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where we specify the minimum number of threads being 1 and the maximum being the number of cores (similar to what is already there with [ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]). That would free the {{CacheLoader}} from starting and shutting down thread pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}} calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the {{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}} implementations could move into {{LookupFullCache}} as well calling it something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in charge of managing all cache loading-related threads. Additionally, it would manage the current execution through {{CompletableFutures}} (one for triggering the reload and one for executing the reload. Triggering a reload would require cancelling the current future (if it's not completed, yet) or ignoring the trigger if we want a reload to finish before triggering a new one.

{{CacheLoader#updateCache}} would become {{CacheLoader#updateCacheAsync(ExecutorService)}} returning a {{CompletableFuture}} that completes as soon as all subtasks are completed. {{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of creating its own future. The lifecycle (as already explained in the previous paragraph) would be managed by {{LookupFullCache}}. The benefit would be that we wouldn't have to deal interrupts in {{CacheLoader}}.

I see the following benefits:
* {{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event time and processing time functions are for, though).
* {{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the completion of the cache loading in {{LookupFullCache}} through the {{CompletableFuture}} instances.
* {{CacheReloadTrigger}} can focus on the strategy implementation without worrying about instantiating threads. This is duplicated code right now in {{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.

  was:
In the course of reviewing FLINK-29405, I came up with a proposal to reduce the complexity of the {{LookupFullCache}} implementation and shrinking the amount of threadpools being used from 3 to 2. Here's the proposal I also shared in the [FLINK-29405 PR comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:

About the responsibilities how I see them:
* {{LookupFullCache}} is the composite class for combining the {{CacheLoader}} and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
* {{ReloadTriggerContext}} provides an async call to trigger the reload but also some utility methods for providing processing or event time (where it's not clear to me why this is connected with the reload. It looks like a future task based on the TODO comments)
* {{CacheLoader}} is in charge of loading the data into memory (if possible concurrently).
{{CacheReloadTrigger}} provides different strategies to trigger new reloads.

About the different executors:
* The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes longer, subsequently triggered calls pile up. Here, I'm wondering whether that's what we want. thinking
* {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in {{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading the data. It triggers {{CacheLoader#updateCache}} with {{CacheLoader#reloadLock}} being acquired. {{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data is loaded concurrently if possible using a {{FixedThreadPool}}.

My proposal is now to reduce the number of used thread pools: Instead of having a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}} implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where we specify the minimum number of threads being 1 and the maximum being the number of cores (similar to what is already there with [ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]). That would free the {{CacheLoader}} from starting and shutting down thread pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}} calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the {{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}} implementations could move into {{LookupFullCache}} as well calling it something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in charge of managing all cache loading-related threads. Additionally, it would manage the current execution through {{CompletableFutures}} (one for triggering the reload and one for executing the reload. Triggering a reload would require cancelling the current future (if it's not completed, yet) or ignoring the trigger if we want a reload to finish before triggering a new one.

{{CacheLoader#updateCache}} would become {{CacheLoader#updateCacheAsync(ExecutorService)}} returning a {{CompletableFuture}} that completes as soon as all subtasks are completed. {{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of creating its own future. The lifecycle (as already explained in the previous paragraph) would be managed by {{LookupFullCache}}. The benefit would be that we wouldn't have to deal interrupts in {{CacheLoader}}.

I see the following benefits:

{{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event time and processing time functions are for, though).
{{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the completion of the cache loading in {{LookupFullCache}} through the {{CompletableFuture}} instances.
{{CacheReloadTrigger}} can focus on the strategy implementation without worrying about instantiating threads. This is duplicated code right now in {{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.
I might miss something here. I'm curious what you think. I probably got carried away a bit by your proposal introducing async calls. innocent I totally understand if you argue that it's way too much out-of-scope for this issue and we actually want to focus on fixing the test instability. In that case, I would do another round of review on your current proposal. But I'm happy to help you if you think that my proposal is reasonable. Or we create a follow-up Jira issue to tackle that.


> Reducing the number of ThreadPools in LookupFullCache and related cache-loading classes
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-30354
>                 URL: https://issues.apache.org/jira/browse/FLINK-30354
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table SQL / Runtime
>    Affects Versions: 1.17.0
>            Reporter: Matthias Pohl
>            Priority: Major
>
> In the course of reviewing FLINK-29405, I came up with a proposal to reduce the complexity of the {{LookupFullCache}} implementation and shrinking the amount of threadpools being used from 3 to 2. Here's the proposal I also shared in the [FLINK-29405 PR comment|https://github.com/apache/flink/pull/20919#pullrequestreview-1208332584]:
> About the responsibilities how I see them:
> * {{LookupFullCache}} is the composite class for combining the {{CacheLoader}} and the {{CacheReloadTrigger}} through the {{ReloadTriggerContext}}
> * {{ReloadTriggerContext}} provides an async call to trigger the reload but also some utility methods for providing processing or event time (where it's not clear to me why this is connected with the reload. It looks like a future task based on the TODO comments)
> * {{CacheLoader}} is in charge of loading the data into memory (if possible concurrently).
> {{CacheReloadTrigger}} provides different strategies to trigger new reloads.
> About the different executors:
> * The {{CacheReloadTrigger}} utilize a {{SingleThreadScheduledExecutor}} which triggers {{ReloadTriggerContext::reload}} subsequently. If the loading takes longer, subsequently triggered calls pile up. Here, I'm wondering whether that's what we want. thinking
> * {{CacheLoader}} utilizes a {{SingleThreadExecutor}} in {{CacheLoader#reloadExecutor}} which is kind of the "main" thread for reloading the data. It triggers {{CacheLoader#updateCache}} with {{CacheLoader#reloadLock}} being acquired. {{(InputFormat)CacheLoader#updateCache}} is implemented synchronously. The data is loaded concurrently if possible using a {{FixedThreadPool}}.
> My proposal is now to reduce the number of used thread pools: Instead of having a {{SingleThreadExecutor}} and a {{FixedThreadPool}} in the {{CacheLoader}} implementation, couldn't we come up with a custom {{ThreadPoolExecutor}} where we specify the minimum number of threads being 1 and the maximum being the number of cores (similar to what is already there with [ThreadUtils#newThreadPool|https://github.com/apache/flink/blob/d067629d4d200f940d0b58759459d7ff5832b292/flink-table/flink-sql-gateway-api/src/main/java/org/apache/flink/table/gateway/api/utils/ThreadUtils.java#L36]). That would free the {{CacheLoader}} from starting and shutting down thread pools by moving its ownership from {{CacheLoader}} to {{LookupFullCache}} calling it the {{cacheLoadingThreadPool}} (or similar). Additionally, the {{ScheduledThreadPool}} currently living in the {{CacheReloadTrigger}} implementations could move into {{LookupFullCache}} as well calling it something like {{cacheLoadSchedulingThreadPool}}. LookupFullCache would be in charge of managing all cache loading-related threads. Additionally, it would manage the current execution through {{CompletableFutures}} (one for triggering the reload and one for executing the reload. Triggering a reload would require cancelling the current future (if it's not completed, yet) or ignoring the trigger if we want a reload to finish before triggering a new one.
> {{CacheLoader#updateCache}} would become {{CacheLoader#updateCacheAsync(ExecutorService)}} returning a {{CompletableFuture}} that completes as soon as all subtasks are completed. {{CacheLoader#reloadAsync}} would return this {{CompletableFuture}} instead of creating its own future. The lifecycle (as already explained in the previous paragraph) would be managed by {{LookupFullCache}}. The benefit would be that we wouldn't have to deal interrupts in {{CacheLoader}}.
> I see the following benefits:
> * {{ReloadtriggerContext}} becomes obsolete (one has to clarify what the event time and processing time functions are for, though).
> * {{CacheLoader#awaitFirstLoad}} becomes obsolete as well. We can verify the completion of the cache loading in {{LookupFullCache}} through the {{CompletableFuture}} instances.
> * {{CacheReloadTrigger}} can focus on the strategy implementation without worrying about instantiating threads. This is duplicated code right now in {{PeriodicCacheReloadTrigger}} and {{TimedCacheReloadTrigger}}.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)