You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Robert Bradshaw <ro...@google.com> on 2018/09/19 11:49:56 UTC

***UNCHECKED*** Re: Discussion: Scheduling across runner and SDKHarness in Portability framework

On Fri, Sep 14, 2018 at 3:01 PM Thomas Weise <th...@apache.org> wrote:

> That's actually how the Flink runner already works - bundle processing
> starts when elements are available (see FlinkExecutableStageFunction for
> batch mode).
>
> But we still have the possibility of the SDK getting concurrent requests
> due to parallelism (and pipelined execution).
>

Concurrent requests should be fine, but are there cases (in batch) where a
bundle that was started cannot finish without other bundles finishing
first? What pipelining would we see if the graph is of the form
ExecutableStage - GBK - ExecutableStage - GBK - ... (or is it not always a
digraph of this form, possibly with branching)?


>
> Thanks,
> Thomas
>
>
> On Fri, Sep 14, 2018 at 2:56 AM Robert Bradshaw <ro...@google.com>
> wrote:
>
>> Currently the best solution we've come up with is that we must process an
>> unbounded number of bundles concurrently to avoid deadlock. Especially in
>> the batch case, this may be wasteful as we bring up workers for many stages
>> that are not actually executable until upstream stages finish. Since it may
>> be invasive to require runners to only schedule stages that can be actively
>> worked on, I've been thinking about what we could do in the common runner
>> libraries themselves. One idea is to postpone the actual sending of a
>> process bundle request until there is data on the channel to consume. With
>> Reads as Impulses, and triggers as data, all bundles are driven by some
>> input.
>>
>> This would mean we never ask the SDK to process bundles it cannot
>> immediately start working on. There is still the open question of whether
>> being able to *start* a bundle implies that one is able to *finish* a
>> bundle (i.e. do any runners start bundles and then block, pending other
>> bundle completion, before closing the data channel (though clearly a runner
>> can chop a bundle off at any point if it wants)).
>>
>> Does this approach sound feasible?
>>
>>
>> On Thu, Aug 30, 2018 at 2:54 AM Ankur Goenka <go...@google.com> wrote:
>>
>>> I managed to write a small document based on the discussion.
>>> Please take a look at
>>> https://docs.google.com/document/d/1oAXVPbJ0dzj2_8LXEWFAgqCP5Tpld3q5B3QU254PQ6A/edit?usp=sharing
>>>
>>>
>>> On Tue, Aug 21, 2018 at 11:01 PM Henning Rohde <he...@google.com>
>>> wrote:
>>>
>>>> Sending bundles that cannot be executed, i.e., the situation described
>>>> to cause deadlock in Flink in the beginning of the thread with mapB. The
>>>> discussion of exposing (or assuming an infinitely large) concurrency level
>>>> -- while a useful concept in its own right -- came around as a way to
>>>> unblock mapB.
>>>>
>>>> On Tue, Aug 21, 2018 at 2:16 PM Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Henning, can you clarify by what you mean with send non-executable
>>>>> bundles to the SDK harness and how it is useful for Flink?
>>>>>
>>>>> On Tue, Aug 21, 2018 at 2:01 PM Henning Rohde <he...@google.com>
>>>>> wrote:
>>>>>
>>>>>> I think it will be useful to the runner to know upfront what the
>>>>>> fundamental threading capabilities are for the SDK harness (say, "fixed",
>>>>>> "linear", "dynamic", ..) so that the runner can upfront make a good static
>>>>>> decision on #harnesses and how many resources they should each have. It's
>>>>>> wasteful to give the Foo SDK a whole many-core machine with TBs of memory,
>>>>>> if it can only support a single bundle at a time. I think this is also in
>>>>>> line with what Thomas and Luke are suggesting.
>>>>>>
>>>>>> However, it still seems to me to be a semantically problematic idea
>>>>>> to send non-executable bundles to the SDK harness. I understand it's useful
>>>>>> for Flink, but is that really the best path forward?
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Aug 20, 2018 at 5:44 PM Ankur Goenka <go...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> That's right.
>>>>>>> To add to it. We added multi threading to python streaming as a
>>>>>>> single thread is sub optimal for streaming use case.
>>>>>>> Shall we move towards a conclusion on the SDK bundle processing
>>>>>>> upper bound?
>>>>>>>
>>>>>>> On Mon, Aug 20, 2018 at 1:54 PM Lukasz Cwik <lc...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Ankur, I can see where you are going with your argument. I believe
>>>>>>>> there is certain information which is static and won't change at pipeline
>>>>>>>> creation time (such as Python SDK is most efficient doing one bundle at a
>>>>>>>> time) and some stuff which is best at runtime, like memory and CPU limits,
>>>>>>>> worker count.
>>>>>>>>
>>>>>>>> On Mon, Aug 20, 2018 at 1:47 PM Ankur Goenka <go...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I would prefer to to keep it dynamic as it can be changed by the
>>>>>>>>> infrastructure or the pipeline author.
>>>>>>>>> Like in case of Python, number of concurrent bundle can be changed
>>>>>>>>> by setting pipeline option worker_count. And for Java it can be computed
>>>>>>>>> based on the cpus on the machine.
>>>>>>>>>
>>>>>>>>> For Flink runner, we can use the worker_count parameter for now to
>>>>>>>>> increase the parallelism. And we can have 1 container for each mapPartition
>>>>>>>>> task on Flink while reusing containers as container creation is expensive
>>>>>>>>> especially for Python where it installs a bunch of dependencies. There is 1
>>>>>>>>> caveat though. I have seen machine crashes because of too many simultaneous
>>>>>>>>> container creation. We can rate limit container creation in the code to
>>>>>>>>> avoid this.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Ankur
>>>>>>>>>
>>>>>>>>> On Mon, Aug 20, 2018 at 9:20 AM Lukasz Cwik <lc...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> +1 on making the resources part of a proto. Based upon what
>>>>>>>>>> Henning linked to, the provisioning API seems like an appropriate place to
>>>>>>>>>> provide this information.
>>>>>>>>>>
>>>>>>>>>> Thomas, I believe the environment proto is the best place to add
>>>>>>>>>> information that a runner may want to know about upfront during pipeline
>>>>>>>>>> pipeline creation. I wouldn't stick this into PipelineOptions for the long
>>>>>>>>>> term.
>>>>>>>>>> If you have time to capture these thoughts and update the
>>>>>>>>>> environment proto, I would suggest going down that path. Otherwise anything
>>>>>>>>>> short term like PipelineOptions will do.
>>>>>>>>>>
>>>>>>>>>> On Sun, Aug 19, 2018 at 5:41 PM Thomas Weise <th...@apache.org>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> For SDKs where the upper limit is constant and known upfront,
>>>>>>>>>>> why not communicate this along with the other harness resource info as part
>>>>>>>>>>> of the job submission?
>>>>>>>>>>>
>>>>>>>>>>> Regarding use of GRPC headers: Why not make this explicit in the
>>>>>>>>>>> proto instead?
>>>>>>>>>>>
>>>>>>>>>>> WRT runner dictating resource constraints: The runner actually
>>>>>>>>>>> may also not have that information. It would need to be supplied as part of
>>>>>>>>>>> the pipeline options? The cluster resource manager needs to allocate
>>>>>>>>>>> resources for both, the runner and the SDK harness(es).
>>>>>>>>>>>
>>>>>>>>>>> Finally, what can be done to unblock the Flink runner / Python
>>>>>>>>>>> until solution discussed here is in place? An extra runner option for SDK
>>>>>>>>>>> singleton on/off?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Aug 18, 2018 at 1:34 AM Ankur Goenka <go...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Sounds good to me.
>>>>>>>>>>>> GRPC Header of the control channel seems to be a good place to
>>>>>>>>>>>> add upper bound information.
>>>>>>>>>>>> Added jiras:
>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5166
>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-5167
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:51 PM Henning Rohde <
>>>>>>>>>>>> herohde@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Regarding resources: the runner can currently dictate the
>>>>>>>>>>>>> mem/cpu/disk resources that the harness is allowed to use via the
>>>>>>>>>>>>> provisioning api. The SDK harness need not -- and should not -- speculate
>>>>>>>>>>>>> on what else might be running on the machine:
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/beam/blob/0e14965707b5d48a3de7fa69f09d88ef0aa48c09/model/fn-execution/src/main/proto/beam_provision_api.proto#L69
>>>>>>>>>>>>>
>>>>>>>>>>>>> A realistic startup-time computation in the SDK harness would
>>>>>>>>>>>>> be something simple like: max(1, min(cpu*100, mem_mb/10)) say, and use that
>>>>>>>>>>>>> at most number of threads. Or just hardcode to 300. Or a user-provided
>>>>>>>>>>>>> value. Whatever the value is the maximum number of bundles in flight
>>>>>>>>>>>>> allowed at any given time and needs to be communicated to the runner via
>>>>>>>>>>>>> some message. Anything beyond would be rejected (but this shouldn't happen,
>>>>>>>>>>>>> because the runner should respect that number).
>>>>>>>>>>>>>
>>>>>>>>>>>>> A dynamic computation would use the same limits from the SDK,
>>>>>>>>>>>>> but take into account its own resource usage (incl. the usage by running
>>>>>>>>>>>>> bundles).
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 6:20 PM Ankur Goenka <
>>>>>>>>>>>>> goenka@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am thinking upper bound to be more on the lines of
>>>>>>>>>>>>>> theocratical upper limit or any other static high value beyond which the
>>>>>>>>>>>>>> SDK will reject bundle verbosely. The idea is that SDK will not keep
>>>>>>>>>>>>>> bundles in queue while waiting on current bundles to finish. It will simply
>>>>>>>>>>>>>> reject any additional bundle.
>>>>>>>>>>>>>> Beyond this I don't have a good answer to dynamic upper
>>>>>>>>>>>>>> bound. As SDK does not have the complete picture of processes on the
>>>>>>>>>>>>>> machine with which it share resources, resources might not be a good proxy
>>>>>>>>>>>>>> for upper bound from the SDK point of view.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 6:01 PM Lukasz Cwik <lc...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Ankur, how would you expect an SDK to compute a realistic
>>>>>>>>>>>>>>> upper bound (upfront or during pipeline computation)?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> First thought that came to my mind was that the SDK would
>>>>>>>>>>>>>>> provide CPU/memory/... resourcing information and the runner making a
>>>>>>>>>>>>>>> judgement call as to whether it should ask the SDK to do more work or less
>>>>>>>>>>>>>>> but its not an explicit don't do more then X bundles in parallel.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:55 PM Ankur Goenka <
>>>>>>>>>>>>>>> goenka@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Makes sense. Having exposed upper bound on concurrency with
>>>>>>>>>>>>>>>> optimum concurrency can give a good balance. This is good information to
>>>>>>>>>>>>>>>> expose while keeping the requirements from the SDK simple. SDK can publish
>>>>>>>>>>>>>>>> 1 as the optimum concurrency and upper bound to keep things simple.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Runner introspection of upper bound on concurrency is
>>>>>>>>>>>>>>>> important for correctness while introspection of optimum concurrency is
>>>>>>>>>>>>>>>> important for efficiency. This separates efficiency and correctness
>>>>>>>>>>>>>>>> requirements.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 5:05 PM Henning Rohde <
>>>>>>>>>>>>>>>> herohde@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I agree with Luke's observation, with the caveat that "infinite
>>>>>>>>>>>>>>>>> amount of bundles in parallel" is limited by the available resources. For
>>>>>>>>>>>>>>>>> example, the Go SDK harness will accept an arbitrary amount of parallel
>>>>>>>>>>>>>>>>> work, but too much work will cause either excessive GC pressure with
>>>>>>>>>>>>>>>>> crippling slowness or an outright OOM. Unless it's always 1, a reasonable
>>>>>>>>>>>>>>>>> upper bound will either have to be provided by the user or computed from
>>>>>>>>>>>>>>>>> the mem/cpu resources given. Of course, as some bundles takes more
>>>>>>>>>>>>>>>>> resources than others, so any static value will be an estimate or ignore
>>>>>>>>>>>>>>>>> resource limits.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> That said, I do not like that an "efficiency" aspect
>>>>>>>>>>>>>>>>> becomes a subtle requirement for correctness due to Flink internals. I fear
>>>>>>>>>>>>>>>>> that road leads to trouble.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:26 PM Ankur Goenka <
>>>>>>>>>>>>>>>>> goenka@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The later case of having a of supporting single bundle
>>>>>>>>>>>>>>>>>> execution at a time on SDK and runner not using this flag is exactly the
>>>>>>>>>>>>>>>>>> reason we got into the Dead Lock here.
>>>>>>>>>>>>>>>>>> I agree with exposing SDK optimum concurrency level ( 1
>>>>>>>>>>>>>>>>>> in later case ) and let runner decide to use it or not. But at the same
>>>>>>>>>>>>>>>>>> time expect SDK to handle infinite amount of bundles even if its not
>>>>>>>>>>>>>>>>>> efficient.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Ankur
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 4:11 PM Lukasz Cwik <
>>>>>>>>>>>>>>>>>> lcwik@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I believe in practice SDK harnesses will fall into one
>>>>>>>>>>>>>>>>>>> of two capabilities, can process effectively an infinite amount of bundles
>>>>>>>>>>>>>>>>>>> in parallel or can only process a single bundle at a time.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I believe it is more difficult for a runner to handle
>>>>>>>>>>>>>>>>>>> the latter case well and to perform all the environment management that
>>>>>>>>>>>>>>>>>>> would make that efficient. It may be inefficient for an SDK but I do
>>>>>>>>>>>>>>>>>>> believe it should be able to say that I'm not great at anything more then a
>>>>>>>>>>>>>>>>>>> single bundle at a time but utilizing this information by a runner should
>>>>>>>>>>>>>>>>>>> be optional.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 1:53 PM Ankur Goenka <
>>>>>>>>>>>>>>>>>>> goenka@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> To recap the discussion it seems that we have come-up
>>>>>>>>>>>>>>>>>>>> with following point.
>>>>>>>>>>>>>>>>>>>> SDKHarness Management and initialization.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    1. Runner completely own the work assignment to
>>>>>>>>>>>>>>>>>>>>    SDKHarness.
>>>>>>>>>>>>>>>>>>>>    2. Runner should know the capabilities and capacity
>>>>>>>>>>>>>>>>>>>>    of SDKHarness and should assign work accordingly.
>>>>>>>>>>>>>>>>>>>>    3. Spinning up of SDKHarness is runner's
>>>>>>>>>>>>>>>>>>>>    responsibility and it can be done statically (a fixed pre configured number
>>>>>>>>>>>>>>>>>>>>    of SDKHarness) or dynamically or based on certain other configuration/logic
>>>>>>>>>>>>>>>>>>>>    which runner choose.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> SDKHarness Expectation. This is more in question and we
>>>>>>>>>>>>>>>>>>>> should outline the responsibility of SDKHarness.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>    1. SDKHarness should publish how many concurrent
>>>>>>>>>>>>>>>>>>>>    tasks it can execute.
>>>>>>>>>>>>>>>>>>>>    2. SDKHarness should start executing all the tasks
>>>>>>>>>>>>>>>>>>>>    items assigned in parallel in a timely manner or fail task.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Also to add to simplification side. I think for better
>>>>>>>>>>>>>>>>>>>> adoption, we should have simple SDKHarness as well as simple Runner
>>>>>>>>>>>>>>>>>>>> integration to encourage integration with more runner. Also many runners
>>>>>>>>>>>>>>>>>>>> might not expose some of the internal scheduling characteristics so we
>>>>>>>>>>>>>>>>>>>> should not expect scheduling characteristics for runner integration.
>>>>>>>>>>>>>>>>>>>> Moreover scheduling characteristics can change based on pipeline type,
>>>>>>>>>>>>>>>>>>>> infrastructure, available resource etc. So I am a bit hesitant to add
>>>>>>>>>>>>>>>>>>>> runner scheduling specifics for runner integration.
>>>>>>>>>>>>>>>>>>>> A good balance between SDKHarness complexity and Runner
>>>>>>>>>>>>>>>>>>>> integration can be helpful in easier adoption.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Ankur
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 12:22 PM Henning Rohde <
>>>>>>>>>>>>>>>>>>>> herohde@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Finding a good balance is indeed the art of
>>>>>>>>>>>>>>>>>>>>> portability, because the range of capability (and assumptions) on both
>>>>>>>>>>>>>>>>>>>>> sides is wide.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> It was originally the idea to allow the SDK harness to
>>>>>>>>>>>>>>>>>>>>> be an extremely simple bundle executer (specifically, single-threaded
>>>>>>>>>>>>>>>>>>>>> execution one instruction at a time) however inefficient -- a more
>>>>>>>>>>>>>>>>>>>>> sophisticated SDK harness would support more features and be more
>>>>>>>>>>>>>>>>>>>>> efficient. For the issue described here, it seems problematic to me to send
>>>>>>>>>>>>>>>>>>>>> non-executable bundles to the SDK harness under the expectation that the
>>>>>>>>>>>>>>>>>>>>> SDK harness will concurrently work its way deeply enough down the
>>>>>>>>>>>>>>>>>>>>> instruction queue to unblock itself. That would be an extremely subtle
>>>>>>>>>>>>>>>>>>>>> requirement for SDK authors and one practical
>>>>>>>>>>>>>>>>>>>>> question becomes: what should an SDK do with a bundle instruction that it
>>>>>>>>>>>>>>>>>>>>> doesn't have capacity to execute? If a runner needs
>>>>>>>>>>>>>>>>>>>>> to make such assumptions, I think that information should probably rather
>>>>>>>>>>>>>>>>>>>>> be explicit along the lines of proposal 1 -- i.e., some kind of negotiation
>>>>>>>>>>>>>>>>>>>>> between resources allotted to the SDK harness (a preliminary variant are in
>>>>>>>>>>>>>>>>>>>>> the provisioning api) and what the SDK harness in return can do (and a
>>>>>>>>>>>>>>>>>>>>> valid answer might be: 1 bundle at a time irrespectively of resources
>>>>>>>>>>>>>>>>>>>>> given) or a per-bundle special "overloaded" error response. For other
>>>>>>>>>>>>>>>>>>>>> aspects, such as side input readiness, the runner handles that complexity
>>>>>>>>>>>>>>>>>>>>> and the overall bias has generally been to move complexity to the runner
>>>>>>>>>>>>>>>>>>>>> side.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> The SDK harness and initialization overhead is
>>>>>>>>>>>>>>>>>>>>> entirely SDK, job type and even pipeline specific. A docker container is
>>>>>>>>>>>>>>>>>>>>> also just a process, btw, and doesn't inherently carry much overhead. That
>>>>>>>>>>>>>>>>>>>>> said, on a single host, a static docker configuration is generally a lot
>>>>>>>>>>>>>>>>>>>>> simpler to work with.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Henning
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 10:18 AM Thomas Weise <
>>>>>>>>>>>>>>>>>>>>> thw@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> It is good to see this discussed!
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I think there needs to be a good balance between the
>>>>>>>>>>>>>>>>>>>>>> SDK harness capabilities/complexity and responsibilities. Additionally the
>>>>>>>>>>>>>>>>>>>>>> user will need to be able to adjust the runner behavior, since the type of
>>>>>>>>>>>>>>>>>>>>>> workload executed in the harness also is a factor. Elsewhere we already
>>>>>>>>>>>>>>>>>>>>>> discussed that the current assumption of a single SDK harness instance per
>>>>>>>>>>>>>>>>>>>>>> Flink task manager brings problems with it and that there needs to be more
>>>>>>>>>>>>>>>>>>>>>> than one way how the runner can spin up SDK harnesses.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> There was the concern that instantiation if multiple
>>>>>>>>>>>>>>>>>>>>>> SDK harnesses per TM host is expensive (resource usage, initialization time
>>>>>>>>>>>>>>>>>>>>>> etc.). That may hold true for a specific scenario, such as batch workloads
>>>>>>>>>>>>>>>>>>>>>> and the use of Docker containers. But it may look totally different for a
>>>>>>>>>>>>>>>>>>>>>> streaming topology or when SDK harness is just a process on the same host.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>>>> Thomas
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 8:36 AM Lukasz Cwik <
>>>>>>>>>>>>>>>>>>>>>> lcwik@google.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> SDK harnesses were always responsible for executing
>>>>>>>>>>>>>>>>>>>>>>> all work given to it concurrently. Runners have been responsible for
>>>>>>>>>>>>>>>>>>>>>>> choosing how much work to give to the SDK harness in such a way that best
>>>>>>>>>>>>>>>>>>>>>>> utilizes the SDK harness.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> I understand that multithreading in python is
>>>>>>>>>>>>>>>>>>>>>>> inefficient due to the global interpreter lock, it would be upto the runner
>>>>>>>>>>>>>>>>>>>>>>> in this case to make sure that the amount of work it gives to each SDK
>>>>>>>>>>>>>>>>>>>>>>> harness best utilizes it while spinning up an appropriate number of SDK
>>>>>>>>>>>>>>>>>>>>>>> harnesses.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Fri, Aug 17, 2018 at 7:32 AM Maximilian Michels <
>>>>>>>>>>>>>>>>>>>>>>> mxm@apache.org> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Hi Ankur,
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Thanks for looking into this problem. The cause
>>>>>>>>>>>>>>>>>>>>>>>> seems to be Flink's
>>>>>>>>>>>>>>>>>>>>>>>> pipelined execution mode. It runs multiple tasks in
>>>>>>>>>>>>>>>>>>>>>>>> one task slot and
>>>>>>>>>>>>>>>>>>>>>>>> produces a deadlock when the pipelined operators
>>>>>>>>>>>>>>>>>>>>>>>> schedule the SDK
>>>>>>>>>>>>>>>>>>>>>>>> harness DoFns in non-topological order.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The problem would be resolved if we scheduled the
>>>>>>>>>>>>>>>>>>>>>>>> tasks in topological
>>>>>>>>>>>>>>>>>>>>>>>> order. Doing that is not easy because they run in
>>>>>>>>>>>>>>>>>>>>>>>> separate Flink
>>>>>>>>>>>>>>>>>>>>>>>> operators and the SDK Harness would have to have
>>>>>>>>>>>>>>>>>>>>>>>> insight into the
>>>>>>>>>>>>>>>>>>>>>>>> execution graph (which is not desirable).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The easiest method, which you proposed in 1) is to
>>>>>>>>>>>>>>>>>>>>>>>> ensure that the
>>>>>>>>>>>>>>>>>>>>>>>> number of threads in the SDK harness matches the
>>>>>>>>>>>>>>>>>>>>>>>> number of
>>>>>>>>>>>>>>>>>>>>>>>> ExecutableStage DoFn operators.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> The approach in 2) is what Flink does as well. It
>>>>>>>>>>>>>>>>>>>>>>>> glues together
>>>>>>>>>>>>>>>>>>>>>>>> horizontal parts of the execution graph, also in
>>>>>>>>>>>>>>>>>>>>>>>> multiple threads. So I
>>>>>>>>>>>>>>>>>>>>>>>> agree with your proposed solution.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>>>>>>>>>> Max
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On 17.08.18 03:10, Ankur Goenka wrote:
>>>>>>>>>>>>>>>>>>>>>>>> > Hi,
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > tl;dr Dead Lock in task execution caused by
>>>>>>>>>>>>>>>>>>>>>>>> limited task parallelism on
>>>>>>>>>>>>>>>>>>>>>>>> > SDKHarness.
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > *Setup:*
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> >   * Job type: /*Beam Portable Python Batch*/ Job
>>>>>>>>>>>>>>>>>>>>>>>> on Flink standalone
>>>>>>>>>>>>>>>>>>>>>>>> >     cluster.
>>>>>>>>>>>>>>>>>>>>>>>> >   * Only a single job is scheduled on the cluster.
>>>>>>>>>>>>>>>>>>>>>>>> >   * Everything is running on a single machine
>>>>>>>>>>>>>>>>>>>>>>>> with single Flink task
>>>>>>>>>>>>>>>>>>>>>>>> >     manager.
>>>>>>>>>>>>>>>>>>>>>>>> >   * Flink Task Manager Slots is 1.
>>>>>>>>>>>>>>>>>>>>>>>> >   * Flink Parallelism is 1.
>>>>>>>>>>>>>>>>>>>>>>>> >   * Python SDKHarness has 1 thread.
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > *Example pipeline:*
>>>>>>>>>>>>>>>>>>>>>>>> > Read -> MapA -> GroupBy -> MapB -> WriteToSink
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > *Issue:*
>>>>>>>>>>>>>>>>>>>>>>>> > With multi stage job, Flink schedule different
>>>>>>>>>>>>>>>>>>>>>>>> dependent sub tasks
>>>>>>>>>>>>>>>>>>>>>>>> > concurrently on Flink worker as long as it can
>>>>>>>>>>>>>>>>>>>>>>>> get slots. Each map tasks
>>>>>>>>>>>>>>>>>>>>>>>> > are then executed on SDKHarness.
>>>>>>>>>>>>>>>>>>>>>>>> > Its possible that MapB gets to SDKHarness before
>>>>>>>>>>>>>>>>>>>>>>>> MapA and hence gets
>>>>>>>>>>>>>>>>>>>>>>>> > into the execution queue before MapA. Because we
>>>>>>>>>>>>>>>>>>>>>>>> only have 1 execution
>>>>>>>>>>>>>>>>>>>>>>>> > thread on SDKHarness, MapA will never get a
>>>>>>>>>>>>>>>>>>>>>>>> chance to execute as MapB
>>>>>>>>>>>>>>>>>>>>>>>> > will never release the execution thread. MapB
>>>>>>>>>>>>>>>>>>>>>>>> will wait for input from
>>>>>>>>>>>>>>>>>>>>>>>> > MapA. This gets us to a dead lock in a simple
>>>>>>>>>>>>>>>>>>>>>>>> pipeline.
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > *Mitigation:*
>>>>>>>>>>>>>>>>>>>>>>>> > Set worker_count in pipeline options more than
>>>>>>>>>>>>>>>>>>>>>>>> the expected sub tasks
>>>>>>>>>>>>>>>>>>>>>>>> > in pipeline.
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > *Proposal:*
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> >  1. We can get the maximum concurrency from the
>>>>>>>>>>>>>>>>>>>>>>>> runner and make sure
>>>>>>>>>>>>>>>>>>>>>>>> >     that we have more threads than max
>>>>>>>>>>>>>>>>>>>>>>>> concurrency. This approach
>>>>>>>>>>>>>>>>>>>>>>>> >     assumes that Beam has insight into runner
>>>>>>>>>>>>>>>>>>>>>>>> execution plan and can
>>>>>>>>>>>>>>>>>>>>>>>> >     make decision based on it.
>>>>>>>>>>>>>>>>>>>>>>>> >  2. We dynamically create thread and cache them
>>>>>>>>>>>>>>>>>>>>>>>> with a high upper bound
>>>>>>>>>>>>>>>>>>>>>>>> >     in SDKHarness. We can warn if we are hitting
>>>>>>>>>>>>>>>>>>>>>>>> the upper bound of
>>>>>>>>>>>>>>>>>>>>>>>> >     threads. This approach assumes that runner
>>>>>>>>>>>>>>>>>>>>>>>> does a good job of
>>>>>>>>>>>>>>>>>>>>>>>> >     scheduling and will distribute tasks more or
>>>>>>>>>>>>>>>>>>>>>>>> less evenly.
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > We expect good scheduling from runners so I
>>>>>>>>>>>>>>>>>>>>>>>> prefer approach 2. It is
>>>>>>>>>>>>>>>>>>>>>>>> > simpler to implement and the implementation is
>>>>>>>>>>>>>>>>>>>>>>>> not runner specific. This
>>>>>>>>>>>>>>>>>>>>>>>> > approach better utilize resource as it creates
>>>>>>>>>>>>>>>>>>>>>>>> only as many threads as
>>>>>>>>>>>>>>>>>>>>>>>> > needed instead of the peak thread requirement.
>>>>>>>>>>>>>>>>>>>>>>>> > And last but not the least, it gives runner
>>>>>>>>>>>>>>>>>>>>>>>> control over managing truly
>>>>>>>>>>>>>>>>>>>>>>>> > active tasks.
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > Please let me know if I am missing something and
>>>>>>>>>>>>>>>>>>>>>>>> your thoughts on the
>>>>>>>>>>>>>>>>>>>>>>>> > approach.
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>> > Thanks,
>>>>>>>>>>>>>>>>>>>>>>>> > Ankur
>>>>>>>>>>>>>>>>>>>>>>>> >
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>