You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Krzysztof Chmielewski <kr...@gmail.com> on 2022/01/19 15:16:06 UTC

Hive Source - SplitEnumeratorContext and its callAsync method - possible bug

Hi,
in documentation for SplitEnumeratorContext::callAsync method we read that:

"(...)  When this method is invoked multiple times, The Callables may be
executed in a thread pool concurrently.
It is important to make sure that the callable does not modify any shared
state, especially the states that will be a part of the
SplitEnumerator.snapshotState(long) (...)"

The ContinuousHiveSplitEnumerator::start method exectutes below code:

enumeratorContext.callAsync(
        monitor, this::handleNewSplits, discoveryInterval, discoveryInterval);

The handleNewSplits method does this:

this.seenPartitionsSinceOffset = newSplitsAndState.seenPartitions;

My question is, on how many threads "monitor" callable will be executed?
If more than one (and this is possible accordingly to the callAsync
documentation) then
I think that this reference assignment in handleNewSplits method could lead
to bugs. Since both threads that were executing monitor callable can return
totally different collection.

For ContinuousFileSplitEnumerator this was resolved in a different way.
Filtering of already processed paths is done by Source-Coordinator thread
and there is no reference assignment.

Regards,
Krzysztof Chmielewski

Re: Hive Source - SplitEnumeratorContext and its callAsync method - possible bug

Posted by Krzysztof Chmielewski <kr...@gmail.com>.
Ok,
I think it was premature alert :)

1. We have a framework guarantee that start method will be called only once
per SplitEnumerator instance, hence context.callAsync will be called only
once
2. callAsync uses ScheduledExecutorService::scheduleAtFixedRate under the
hood so If any execution of this task takes longer than its period, then
subsequent executions may start late, but will not concurrently execute.

So I guess we are safe here, it will be executed task, by task and
sensationally passed back to handleNewSplits method.


śr., 19 sty 2022 o 16:16 Krzysztof Chmielewski <
krzysiek.chmielewski@gmail.com> napisał(a):

> Hi,
> in documentation for SplitEnumeratorContext::callAsync method we read that:
>
> "(...)  When this method is invoked multiple times, The Callables may be
> executed in a thread pool concurrently.
> It is important to make sure that the callable does not modify any shared
> state, especially the states that will be a part of the
> SplitEnumerator.snapshotState(long) (...)"
>
> The ContinuousHiveSplitEnumerator::start method exectutes below code:
>
> enumeratorContext.callAsync(
>         monitor, this::handleNewSplits, discoveryInterval, discoveryInterval);
>
> The handleNewSplits method does this:
>
> this.seenPartitionsSinceOffset = newSplitsAndState.seenPartitions;
>
> My question is, on how many threads "monitor" callable will be executed?
> If more than one (and this is possible accordingly to the callAsync
> documentation) then
> I think that this reference assignment in handleNewSplits method could
> lead to bugs. Since both threads that were executing monitor callable can
> return totally different collection.
>
> For ContinuousFileSplitEnumerator this was resolved in a different way.
> Filtering of already processed paths is done by Source-Coordinator thread
> and there is no reference assignment.
>
> Regards,
> Krzysztof Chmielewski
>
>