You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Shen Li <cs...@gmail.com> on 2018/03/26 18:33:04 UTC

Source split consistency in distributed environment

Hi,

Does the split API in Bounded/UnboundedSource guarantee to return the same
result if invoked in different parallel instances in a distributed
environment?

For example, assume the original source can split into 3 sub-sources. Say
the runner creates 3 parallel source operator instances (perhaps running in
different servers) and uses each instance to handle 1 of the 3 sub-sources.
In this case, if each operator instance invokes the split method in a
distributed manner, will they get the same split result?

My understanding is that the current API does not guarantee the 3 operator
instances will receive the same split result. It is possible that 1 of the
3 instances receive 4 sub-sources and the other two receives 3. Or, even if
they all get 3 sub-sources, there could be gaps and overlaps in the data
streams. If so, shall we add an API to indicate that whether a source can
split at runtime?

One solution is to avoid this problem is to split the source at translation
time and directly pass sub-sources to operator instances. But this is not
ideal. The server runs the translation might not have access to the source
(DB, KV, MQ, etc). Or the application may want to dynamically change the
source parallel width at runtime. Hence, the runner/engine sometimes have
to split the source during runtime in a distributed environment.

Thanks,
Shen

Re: Source split consistency in distributed environment

Posted by Shen Li <cs...@gmail.com>.
Thank you!

Shen

On Mon, Mar 26, 2018 at 5:34 PM, Eugene Kirpichov <ki...@google.com>
wrote:

>
>
> On Mon, Mar 26, 2018, 2:08 PM Shen Li <cs...@gmail.com> wrote:
>
>> Hi Eugene,
>>
>> Thanks. Does it mean the application cannot dynamically change the
>> parallel width of an UnboundedSource during runtime?
>>
> Correct, it's limited by how many parts it was split into initially. So it
> makes sense to initially split into a fairly large number of parts,
> assigning more than one at the same time to each worker, so if you need
> more workers then you have more parts ready.
>
>
>> > Correct. split() is applied to a single argument, so there's nothing
>> to execute in parallel here. It executes sequentially, and produces a
>> number of sources that can then be executed in parallel. It's pretty
>> similar to executing a DoFn on a single element.
>>
>> I was thinking about the following scenario where the split() will be
>> called in parallel. Suppose initially the translation phase invoked the
>> UnboundedSource#split() API and got 3 sub-sources. It then starts the
>> runtime where the source operator has 3 instances running in parallel, one
>> for each sub-source. This part works fine, and there will only be one
>> split() invocation during the translation. However, say after a while, the
>> application would like to increase the source parallelism from 3 to 4. But,
>> as it has already finished the translation, this change will be done
>> dynamically during runtime. The runtime will add another source instance.
>> Then, the four source instances will all call the split() API in parallel.
>> If this API consistently returns 4 sub-sources, then each source operator
>> instance can retrieve its own sub-source and proceed from there.
>>
>
>>
>>
>> Thanks,
>> Shen
>>
>>
>> On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <ki...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs...@gmail.com> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> Thanks for your response.
>>>>
>>>> > Each call to split may return a different set of sub sources but
>>>> they always represent the entire original source.
>>>>
>>>> Inconsistent sets of sub-sources prevent runners/engines from calling
>>>> the split API in a distributed manner during runtime.
>>>>
>>> Correct. split() is applied to a single argument, so there's nothing to
>>> execute in parallel here. It executes sequentially, and produces a number
>>> of sources that can then be executed in parallel. It's pretty similar to
>>> executing a DoFn on a single element.
>>>
>>>
>>>> Besides, the splitAtFraction(double fraction) is only available in
>>>> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>>>>
>>> There is no analogous API for unbounded sources.
>>>
>>>
>>>>
>>>> Another question: will Source transforms eventually become deprecated
>>>> and be replaced by the SplittableParDo?
>>>>
>>> Yes; this is already the case in the Portability framework.
>>>
>>>
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>>
>>>>
>>>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Contractually, the sources returned by splitting must represent the
>>>>> original source. Each call to split may return a different set of sub
>>>>> sources but they always represent the entire original source.
>>>>>
>>>>> Note that Dataflow does call split effectively during translation and
>>>>> then later calls APIs on sources to perform dynamic splitting[1].
>>>>>
>>>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to
>>>>> look at this doc[2] and presentation[3].
>>>>>
>>>>> 1: https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f11
>>>>> 3617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>>>> io/BoundedSource.java#L387
>>>>> 2: https://s.apache.org/splittable-do-fn
>>>>> 3: https://conferences.oreilly.com/strata/strata-ca/
>>>>> public/schedule/detail/63696
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>>>>> same result if invoked in different parallel instances in a distributed
>>>>>> environment?
>>>>>>
>>>>>> For example, assume the original source can split into 3 sub-sources.
>>>>>> Say the runner creates 3 parallel source operator instances (perhaps
>>>>>> running in different servers) and uses each instance to handle 1 of the 3
>>>>>> sub-sources. In this case, if each operator instance invokes the split
>>>>>> method in a distributed manner, will they get the same split result?
>>>>>>
>>>>>> My understanding is that the current API does not guarantee the 3
>>>>>> operator instances will receive the same split result. It is possible that
>>>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in
>>>>>> the data streams. If so, shall we add an API to indicate that whether a
>>>>>> source can split at runtime?
>>>>>>
>>>>>> One solution is to avoid this problem is to split the source at
>>>>>> translation time and directly pass sub-sources to operator instances. But
>>>>>> this is not ideal. The server runs the translation might not have access to
>>>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>>>>> change the source parallel width at runtime. Hence, the runner/engine
>>>>>> sometimes have to split the source during runtime in a distributed
>>>>>> environment.
>>>>>>
>>>>>> Thanks,
>>>>>> Shen
>>>>>>
>>>>>>
>>>>
>>

Re: Source split consistency in distributed environment

Posted by Eugene Kirpichov <ki...@google.com>.
On Mon, Mar 26, 2018, 2:08 PM Shen Li <cs...@gmail.com> wrote:

> Hi Eugene,
>
> Thanks. Does it mean the application cannot dynamically change the
> parallel width of an UnboundedSource during runtime?
>
Correct, it's limited by how many parts it was split into initially. So it
makes sense to initially split into a fairly large number of parts,
assigning more than one at the same time to each worker, so if you need
more workers then you have more parts ready.


> > Correct. split() is applied to a single argument, so there's nothing to
> execute in parallel here. It executes sequentially, and produces a number
> of sources that can then be executed in parallel. It's pretty similar to
> executing a DoFn on a single element.
>
> I was thinking about the following scenario where the split() will be
> called in parallel. Suppose initially the translation phase invoked the
> UnboundedSource#split() API and got 3 sub-sources. It then starts the
> runtime where the source operator has 3 instances running in parallel, one
> for each sub-source. This part works fine, and there will only be one
> split() invocation during the translation. However, say after a while, the
> application would like to increase the source parallelism from 3 to 4. But,
> as it has already finished the translation, this change will be done
> dynamically during runtime. The runtime will add another source instance.
> Then, the four source instances will all call the split() API in parallel.
> If this API consistently returns 4 sub-sources, then each source operator
> instance can retrieve its own sub-source and proceed from there.
>

>
>
> Thanks,
> Shen
>
>
> On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <ki...@google.com>
> wrote:
>
>>
>>
>> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs...@gmail.com> wrote:
>>
>>> Hi Lukasz,
>>>
>>> Thanks for your response.
>>>
>>> > Each call to split may return a different set of sub sources but they
>>> always represent the entire original source.
>>>
>>> Inconsistent sets of sub-sources prevent runners/engines from calling
>>> the split API in a distributed manner during runtime.
>>>
>> Correct. split() is applied to a single argument, so there's nothing to
>> execute in parallel here. It executes sequentially, and produces a number
>> of sources that can then be executed in parallel. It's pretty similar to
>> executing a DoFn on a single element.
>>
>>
>>> Besides, the splitAtFraction(double fraction) is only available in
>>> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>>>
>> There is no analogous API for unbounded sources.
>>
>>
>>>
>>> Another question: will Source transforms eventually become deprecated
>>> and be replaced by the SplittableParDo?
>>>
>> Yes; this is already the case in the Portability framework.
>>
>>
>>>
>>> Thanks,
>>> Shen
>>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>
>>>> Contractually, the sources returned by splitting must represent the
>>>> original source. Each call to split may return a different set of sub
>>>> sources but they always represent the entire original source.
>>>>
>>>> Note that Dataflow does call split effectively during translation and
>>>> then later calls APIs on sources to perform dynamic splitting[1].
>>>>
>>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to
>>>> look at this doc[2] and presentation[3].
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f113617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L387
>>>> 2: https://s.apache.org/splittable-do-fn
>>>> 3:
>>>> https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696
>>>>
>>>>
>>>>
>>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>>>> same result if invoked in different parallel instances in a distributed
>>>>> environment?
>>>>>
>>>>> For example, assume the original source can split into 3 sub-sources.
>>>>> Say the runner creates 3 parallel source operator instances (perhaps
>>>>> running in different servers) and uses each instance to handle 1 of the 3
>>>>> sub-sources. In this case, if each operator instance invokes the split
>>>>> method in a distributed manner, will they get the same split result?
>>>>>
>>>>> My understanding is that the current API does not guarantee the 3
>>>>> operator instances will receive the same split result. It is possible that
>>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in
>>>>> the data streams. If so, shall we add an API to indicate that whether a
>>>>> source can split at runtime?
>>>>>
>>>>> One solution is to avoid this problem is to split the source at
>>>>> translation time and directly pass sub-sources to operator instances. But
>>>>> this is not ideal. The server runs the translation might not have access to
>>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>>>> change the source parallel width at runtime. Hence, the runner/engine
>>>>> sometimes have to split the source during runtime in a distributed
>>>>> environment.
>>>>>
>>>>> Thanks,
>>>>> Shen
>>>>>
>>>>>
>>>
>

Re: Source split consistency in distributed environment

Posted by Shen Li <cs...@gmail.com>.
Hi Eugene,

Thanks. Does it mean the application cannot dynamically change the parallel
width of an UnboundedSource during runtime?

> Correct. split() is applied to a single argument, so there's nothing to
execute in parallel here. It executes sequentially, and produces a number
of sources that can then be executed in parallel. It's pretty similar to
executing a DoFn on a single element.

I was thinking about the following scenario where the split() will be
called in parallel. Suppose initially the translation phase invoked the
UnboundedSource#split() API and got 3 sub-sources. It then starts the
runtime where the source operator has 3 instances running in parallel, one
for each sub-source. This part works fine, and there will only be one
split() invocation during the translation. However, say after a while, the
application would like to increase the source parallelism from 3 to 4. But,
as it has already finished the translation, this change will be done
dynamically during runtime. The runtime will add another source instance.
Then, the four source instances will all call the split() API in parallel.
If this API consistently returns 4 sub-sources, then each source operator
instance can retrieve its own sub-source and proceed from there.



Thanks,
Shen


On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <ki...@google.com>
wrote:

>
>
> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs...@gmail.com> wrote:
>
>> Hi Lukasz,
>>
>> Thanks for your response.
>>
>> > Each call to split may return a different set of sub sources but they
>> always represent the entire original source.
>>
>> Inconsistent sets of sub-sources prevent runners/engines from calling the
>> split API in a distributed manner during runtime.
>>
> Correct. split() is applied to a single argument, so there's nothing to
> execute in parallel here. It executes sequentially, and produces a number
> of sources that can then be executed in parallel. It's pretty similar to
> executing a DoFn on a single element.
>
>
>> Besides, the splitAtFraction(double fraction) is only available in
>> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>>
> There is no analogous API for unbounded sources.
>
>
>>
>> Another question: will Source transforms eventually become deprecated and
>> be replaced by the SplittableParDo?
>>
> Yes; this is already the case in the Portability framework.
>
>
>>
>> Thanks,
>> Shen
>>
>>
>>
>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Contractually, the sources returned by splitting must represent the
>>> original source. Each call to split may return a different set of sub
>>> sources but they always represent the entire original source.
>>>
>>> Note that Dataflow does call split effectively during translation and
>>> then later calls APIs on sources to perform dynamic splitting[1].
>>>
>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to
>>> look at this doc[2] and presentation[3].
>>>
>>> 1: https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f11
>>> 3617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>> io/BoundedSource.java#L387
>>> 2: https://s.apache.org/splittable-do-fn
>>> 3: https://conferences.oreilly.com/strata/strata-ca/
>>> public/schedule/detail/63696
>>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>>> same result if invoked in different parallel instances in a distributed
>>>> environment?
>>>>
>>>> For example, assume the original source can split into 3 sub-sources.
>>>> Say the runner creates 3 parallel source operator instances (perhaps
>>>> running in different servers) and uses each instance to handle 1 of the 3
>>>> sub-sources. In this case, if each operator instance invokes the split
>>>> method in a distributed manner, will they get the same split result?
>>>>
>>>> My understanding is that the current API does not guarantee the 3
>>>> operator instances will receive the same split result. It is possible that
>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in
>>>> the data streams. If so, shall we add an API to indicate that whether a
>>>> source can split at runtime?
>>>>
>>>> One solution is to avoid this problem is to split the source at
>>>> translation time and directly pass sub-sources to operator instances. But
>>>> this is not ideal. The server runs the translation might not have access to
>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>>> change the source parallel width at runtime. Hence, the runner/engine
>>>> sometimes have to split the source during runtime in a distributed
>>>> environment.
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>>
>>

Re: Source split consistency in distributed environment

Posted by Eugene Kirpichov <ki...@google.com>.
On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs...@gmail.com> wrote:

> Hi Lukasz,
>
> Thanks for your response.
>
> > Each call to split may return a different set of sub sources but they
> always represent the entire original source.
>
> Inconsistent sets of sub-sources prevent runners/engines from calling the
> split API in a distributed manner during runtime.
>
Correct. split() is applied to a single argument, so there's nothing to
execute in parallel here. It executes sequentially, and produces a number
of sources that can then be executed in parallel. It's pretty similar to
executing a DoFn on a single element.


> Besides, the splitAtFraction(double fraction) is only available in
> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>
There is no analogous API for unbounded sources.


>
> Another question: will Source transforms eventually become deprecated and
> be replaced by the SplittableParDo?
>
Yes; this is already the case in the Portability framework.


>
> Thanks,
> Shen
>
>
>
> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:
>
>> Contractually, the sources returned by splitting must represent the
>> original source. Each call to split may return a different set of sub
>> sources but they always represent the entire original source.
>>
>> Note that Dataflow does call split effectively during translation and
>> then later calls APIs on sources to perform dynamic splitting[1].
>>
>> Note, that this is being replaced with SplittableDoFn. Worthwhile to look
>> at this doc[2] and presentation[3].
>>
>> 1:
>> https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f113617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L387
>> 2: https://s.apache.org/splittable-do-fn
>> 3:
>> https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696
>>
>>
>>
>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>> same result if invoked in different parallel instances in a distributed
>>> environment?
>>>
>>> For example, assume the original source can split into 3 sub-sources.
>>> Say the runner creates 3 parallel source operator instances (perhaps
>>> running in different servers) and uses each instance to handle 1 of the 3
>>> sub-sources. In this case, if each operator instance invokes the split
>>> method in a distributed manner, will they get the same split result?
>>>
>>> My understanding is that the current API does not guarantee the 3
>>> operator instances will receive the same split result. It is possible that
>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in
>>> the data streams. If so, shall we add an API to indicate that whether a
>>> source can split at runtime?
>>>
>>> One solution is to avoid this problem is to split the source at
>>> translation time and directly pass sub-sources to operator instances. But
>>> this is not ideal. The server runs the translation might not have access to
>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>> change the source parallel width at runtime. Hence, the runner/engine
>>> sometimes have to split the source during runtime in a distributed
>>> environment.
>>>
>>> Thanks,
>>> Shen
>>>
>>>
>

Re: Source split consistency in distributed environment

Posted by Shen Li <cs...@gmail.com>.
Hi Lukasz,

Thanks for your response.

> Each call to split may return a different set of sub sources but they
always represent the entire original source.

Inconsistent sets of sub-sources prevent runners/engines from calling the
split API in a distributed manner during runtime. Besides,
the splitAtFraction(double fraction) is only available in BoundedSources.
How do you perform dynamic splitting for UnboundedSources?

Another question: will Source transforms eventually become deprecated and
be replaced by the SplittableParDo?

Thanks,
Shen



On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:

> Contractually, the sources returned by splitting must represent the
> original source. Each call to split may return a different set of sub
> sources but they always represent the entire original source.
>
> Note that Dataflow does call split effectively during translation and then
> later calls APIs on sources to perform dynamic splitting[1].
>
> Note, that this is being replaced with SplittableDoFn. Worthwhile to look
> at this doc[2] and presentation[3].
>
> 1: https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f11
> 3617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/
> io/BoundedSource.java#L387
> 2: https://s.apache.org/splittable-do-fn
> 3: https://conferences.oreilly.com/strata/strata-ca/
> public/schedule/detail/63696
>
>
>
> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs...@gmail.com> wrote:
>
>> Hi,
>>
>> Does the split API in Bounded/UnboundedSource guarantee to return the
>> same result if invoked in different parallel instances in a distributed
>> environment?
>>
>> For example, assume the original source can split into 3 sub-sources. Say
>> the runner creates 3 parallel source operator instances (perhaps running in
>> different servers) and uses each instance to handle 1 of the 3 sub-sources.
>> In this case, if each operator instance invokes the split method in a
>> distributed manner, will they get the same split result?
>>
>> My understanding is that the current API does not guarantee the 3
>> operator instances will receive the same split result. It is possible that
>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in
>> the data streams. If so, shall we add an API to indicate that whether a
>> source can split at runtime?
>>
>> One solution is to avoid this problem is to split the source at
>> translation time and directly pass sub-sources to operator instances. But
>> this is not ideal. The server runs the translation might not have access to
>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>> change the source parallel width at runtime. Hence, the runner/engine
>> sometimes have to split the source during runtime in a distributed
>> environment.
>>
>> Thanks,
>> Shen
>>
>>

Re: Source split consistency in distributed environment

Posted by Lukasz Cwik <lc...@google.com>.
Contractually, the sources returned by splitting must represent the
original source. Each call to split may return a different set of sub
sources but they always represent the entire original source.

Note that Dataflow does call split effectively during translation and then
later calls APIs on sources to perform dynamic splitting[1].

Note, that this is being replaced with SplittableDoFn. Worthwhile to look
at this doc[2] and presentation[3].

1:
https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f113617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L387
2: https://s.apache.org/splittable-do-fn
3:
https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696



On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs...@gmail.com> wrote:

> Hi,
>
> Does the split API in Bounded/UnboundedSource guarantee to return the same
> result if invoked in different parallel instances in a distributed
> environment?
>
> For example, assume the original source can split into 3 sub-sources. Say
> the runner creates 3 parallel source operator instances (perhaps running in
> different servers) and uses each instance to handle 1 of the 3 sub-sources.
> In this case, if each operator instance invokes the split method in a
> distributed manner, will they get the same split result?
>
> My understanding is that the current API does not guarantee the 3 operator
> instances will receive the same split result. It is possible that 1 of the
> 3 instances receive 4 sub-sources and the other two receives 3. Or, even if
> they all get 3 sub-sources, there could be gaps and overlaps in the data
> streams. If so, shall we add an API to indicate that whether a source can
> split at runtime?
>
> One solution is to avoid this problem is to split the source at
> translation time and directly pass sub-sources to operator instances. But
> this is not ideal. The server runs the translation might not have access to
> the source (DB, KV, MQ, etc). Or the application may want to dynamically
> change the source parallel width at runtime. Hence, the runner/engine
> sometimes have to split the source during runtime in a distributed
> environment.
>
> Thanks,
> Shen
>
>