You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Thomas Weise <th...@apache.org> on 2018/06/06 17:10:50 UTC

SDK Harness Deployment

Hi,

The current plan for running the SDK harness is to execute docker to launch
SDK containers with service endpoints provided by the runner in the docker
command line.

In the case of Flink runner (prototype), the service endpoints are
dynamically allocated per executable stage. There is typically one Flink
task manager running per machine. Each TM has multiple task slots. A subset
of these task slots will run the Beam executable stages. Flink allows
multiple jobs in one TM, so we could have executable stages of different
pipelines running in a single TM, depending on how users deploy. The
prototype also has no cleanup for the SDK containers, they remain running
and orphaned once the runner is gone.

I'm trying to find out how this approach can be augmented for deployment on
Kubernetes. Our deployments won't allow multiple jobs per task manager, so
all task slots will belong to the same pipeline context. The intent is to
deploy SDK harness containers along with TMs in the same pod. No assumption
can be made about the order in which the containers are started, and the
SDK container wouldn't know the connect address at startup (it can only be
discovered after the pipeline gets deployed into the TMs).

I talked about that a while ago with Henning and one idea was to set a
fixed endpoint address so that the boot code in the SDK container knows
upfront where to connect to, even when that endpoint isn't available yet.
This approach may work with minimal changes to runner and little or no
change to SDK container (as long as the SDK is prepared to retry). The
downside is that all (parallel) task slots of the TM will use the same SDK
worker, which will likely lead to performance issues, at least with the
Python SDK that we are planning to use.

An alternative may be to define an SDK worker pool per pod, with a
discovery mechanism for workers to find the runner endpoints and a
coordination mechanism that distributes the dynamically allocated endpoints
that are provided by the executable stage task slots over the available
workers.

Any thoughts on this? Is anyone else looking at a docker free deployment?

Thanks,
Thomas

Re: SDK Harness Deployment

Posted by Henning Rohde <he...@google.com>.
> a Flink task manager lifecycle isn't tied to a single job (it is a worker
that exists even before the pipeline was submitted). Therefore we will need
to fall back to (1). Boot code would fail to connect until the job was
deployed [...].

The pipeline defines which environment (= SDK harness) to use, so we
couldn't in general be able run it before the job is submitted. If you have
some more specialized scenario in mind -- which I assume here is the case
-- where the setup supports only a fixed set of SDK harnesses (or a custom
one you author) then you may also be able to use the same SDK harness to
run multiple jobs. How practical that is depends on the SDK, whether it
uses job-specific information, what kind of isolation between jobs is
needed and whether the artifacts are identical.

For Go, for example, the default container image is identical across jobs
and the sole artifact is the user binary, so to be usable across jobs all
DoFns for all jobs must be compiled into a single binary. Nothing currently
uses the job-specific information. If the SDK harness is not given
persistent storage through the 'semi_persistent_path' flag, it is stateless
and could serve multiple jobs in sequence if restarted -- assuming
provisioning/artifacts each time updated to serve information for the new
job. An "exit" instruction might be helpful to force the SDK harness to
exit gracefully. If the setup is not accepting arbitrary container images
(or not using containers at all), you could also use a special container
image to restart just the inner Go process and re-pull
provisioning/artifacts instead of the whole container if that works better.
Either way, you could have a setup where a fixed set of such Go containers
match the same number of slots of the TM and can handle multiple jobs. If
the setup needs to support arbitrary SDKs and container images, however,
then you'd be back in the world of dynamically starting them somehow.

Thanks,
 Henning

On Fri, Jun 8, 2018 at 11:45 AM Thomas Weise <th...@apache.org> wrote:

> Sounds good.
>
> Regarding (2), the bootstrapping endpoints serve job specific info, but a
> Flink task manager lifecycle isn't tied to a single job (it is a worker
> that exists even before the pipeline was submitted). Therefore we will need
> to fall back to (1). Boot code would fail to connect until the job was
> deployed and the Flink runner has established the endpoint. This should
> work fine, as long as the boot code is retried without causing the entire
> container to exit, it may just be some noise in the logs?
>
> In my scenario there won't be a second job that runs on the same task
> manager, since we are planning to deploy Flink along with the application.
> But Flink in general also supports a "session" mode where multiple jobs can
> share the same set of task managers. In that case it would be necessary to
> isolate the SDK workers because they can only serve a single job (unless
> what you have listed under static information is identical).
>
> Looking at the current runner code there will be some work in the
> JobResourceManager/SingletonSdkHarnessManager neighborhood that I can pick
> up once we have the basics working in master. Currently SDK workers can
> only be distinguished by the port they connect to, the runner does not look
> at the worker ID or makes it available in any way. So the support to
> multiplex has to be added. Perhaps Ben/Alex can comment on this?
>
> Thanks,
> Thomas
>
>
> On Fri, Jun 8, 2018 at 10:19 AM, Henning Rohde <he...@google.com> wrote:
>
>> You're right. That is the idea.
>>
>> Two comments on the executable stage not being available yet:
>>   (1) An SDK harness may either retry or fail (exit) if it can't
>> connect/times out/gets an error. If it exits, the runner/environment is
>> responsible for restarting the process/container. So it will effectively
>> always retry. The boot code currently used tries to connect for 2 mins
>> after which it gives up (and in turn is restarted and tries again). The
>> 2min is set a bit arbitrarily, btw, so we can adjust it for the default
>> containers.
>>   (2) The 2 bootstrapping endpoints serve static information (pipeline
>> options, artifacts, and job metadata) that may not require an executable
>> stage -- for example, if the artifact service just serves data from HDFS.
>> The control endpoint is mainly driven by the runner side, so the
>> multiplexer could allow any SDK harness to connect, but it just wouldn't
>> send any actual instructions until the executable stages were ready. So for
>> 2nd jobs or if we have some global hooks into the TM (or deploy a separate
>> process -- provisioning and artifacts are separate services to make this
>> possible), it might be possible to allow the SDK harness to boot in
>> parallel with the TM being fully ready. Disclaimer: I have a limited
>> understanding of the Flink constraints here.
>>
>> Thanks,
>>  Henning
>>
>>
>>
>> On Fri, Jun 8, 2018 at 7:49 AM Thomas Weise <th...@apache.org> wrote:
>>
>>> Yes, it did not occur to me that we have the identifier available for
>>> this. I just took a fresh look at
>>> https://s.apache.org/beam-fn-api-container-contract
>>>
>>> So it should be possible to start a pool of containers with pre-assigned
>>> IDs in the pod, communicate the same set of IDs to the runner (via it's
>>> configuration) and then come up with some mechanism to assign executable
>>> stages to worker IDs as part of the Flink operator initialization.
>>>
>>> By the time the SDK boot code calls the provisioning service to fetch
>>> the pipeline options, the runner wouldn't be ready (either since the TM
>>> isn't running or the executable stages were not deployed into it yet). So
>>> will that call just retry until the endpoint becomes available? On the
>>> runner side, the endpoint can only be activated (under the fixed address)
>>> when the task slots are assigned.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Jun 6, 2018 at 3:19 PM, Henning Rohde <he...@google.com>
>>> wrote:
>>>
>>>> Thanks Thomas. The id provided to the SDK harness must be sent as a
>>>> gRPC header when it connects to the TM. The TM can use a fixed port and
>>>> multiplex requests based on that id - to match the SDK harness with the
>>>> appropriate job/slot/whatnot. The relationship between SDK harness and TM
>>>> is not limited to 1:1, but rather many:1. We'll likely need that for
>>>> cross-language as well. Wouldn't multiplexing on a single port for the
>>>> control plane be the easiest solution for both #1 and #2? The data plane
>>>> can still use various dynamically-allocated ports.
>>>>
>>>> On Kubernetes, we're somewhat constrained by the pod lifetime and
>>>> multi-job TMs might not be as natural to achieve.
>>>>
>>>> Thanks,
>>>>  Henning
>>>>
>>>> On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> Hi Henning,
>>>>>
>>>>> Here is a page that explains the scheduling and overall functioning of
>>>>> the task manager in Flink:
>>>>>
>>>>>
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html
>>>>>
>>>>> Here are the 2 issues:
>>>>>
>>>>> #1 each task manager process get assigned multiple units of execution
>>>>> into task slots. So when we deploy a Beam pipeline, we can end up with
>>>>> multiple executable stages running in a single TM JVM.
>>>>>
>>>>> This where a 1-to-1 relationship between TM and SDK harness can lead
>>>>> to a bottleneck (all task slots of a single TM push their work to a single
>>>>> SDK container).
>>>>>
>>>>> #2 in a deployment where multiple pipelines share a Flink cluster, the
>>>>> SDK harness per TM approach wouldn't work logically. We would need to have
>>>>> multiple SDK containers, not just for efficiency reasons.
>>>>>
>>>>> This would not be an issue for the deployment scenario I'm looking at,
>>>>> but it needs to be considered for general Flink runner deployment.
>>>>>
>>>>> Regarding the assignment of fixed endpoints within the TM, that is
>>>>> possible but it doesn't address #1 and #2.
>>>>>
>>>>> I hope this clarifies?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <he...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Thanks for writing down and explaining the problem, Thomas. Let me
>>>>>> try to tease some of the topics apart.
>>>>>>
>>>>>> First, the basic setup is currently as follows: there are 2 worker
>>>>>> processes (A) "SDK harness" and (B) "Runner harness" that needs to
>>>>>> communicate. A connects to B. The fundamental endpoint(s) of B as well as
>>>>>> an id -- logging, provisioning, artifacts and control -- are provided to A
>>>>>> via command line parameters. A is not expected to be able to connect to the
>>>>>> control port without first obtaining pipeline options (from provisioning)
>>>>>> and staged files (from artifacts). As an side, this is where the separate
>>>>>> boot.go code comes in handy. A can assume it will be restarted, if it
>>>>>> exits. A does not assume the given endpoints are up when started and should
>>>>>> make blocking calls with timeout (but if not and exits, it is restarted
>>>>>> anyway and will retry). Note that the data plane endpoints are part of the
>>>>>> control instructions and need not be known or allocated at startup or even
>>>>>> be served by the same TM.
>>>>>>
>>>>>> Second, whether or not docker is used is rather an implementation
>>>>>> detail, but if we use Kubernetes (or other such options) then some
>>>>>> constraints come into play.
>>>>>>
>>>>>> Either way, two scenarios work well:
>>>>>>    (1) B starts A: The ULR and Flink prototype does this. B will
>>>>>> delay starting A until it has decided which endpoints to use. This approach
>>>>>> requires B to do process/container management, which we'd rather not have
>>>>>> to do at scale. But it's convenient for local runners.
>>>>>>    (2) B has its (local) endpoints configured or fixed: A and B can
>>>>>> be started concurrently. Dataflow does this. Kubernetes lends itself well
>>>>>> to this approach (and handles container management for us).
>>>>>>
>>>>>> The Flink on Kubernetes scenario described above doesn't:
>>>>>>    (3) B must use randomized (local) endpoints _and_ A and B are
>>>>>> started concurrently: A would not know where to connect.
>>>>>>
>>>>>> Perhaps I'm not understanding the constraints of the TM well enough,
>>>>>> but can we really not open a configured/fixed port from the TM --
>>>>>> especially in a network-isolated Kubernetes pod? Adding a third process (C)
>>>>>> "proxy" to the pod might by an alternative option and morph (3) into (2). B
>>>>>> would configure C when it's ready. A would connect to C, but be blocked
>>>>>> until B has configured it. C could perhaps even serve logging,
>>>>>> provisioning, and artifacts without B. And the data plane would not go over
>>>>>> C anyway. If control proxy'ing is a concern, then alternatively we would
>>>>>> add an indirection to the container contract and provide the control
>>>>>> endpoint in the provisioning api, say, or even a new discovery service.
>>>>>>
>>>>>> There are of course other options and tradeoffs, but having Flink
>>>>>> work on Kubernetes and not go against the grain seems desirable to me.
>>>>>>
>>>>>> Thanks,
>>>>>>  Henning
>>>>>>
>>>>>>
>>>>>> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <th...@apache.org> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> The current plan for running the SDK harness is to execute docker to
>>>>>>> launch SDK containers with service endpoints provided by the runner in the
>>>>>>> docker command line.
>>>>>>>
>>>>>>> In the case of Flink runner (prototype), the service endpoints are
>>>>>>> dynamically allocated per executable stage. There is typically one Flink
>>>>>>> task manager running per machine. Each TM has multiple task slots. A subset
>>>>>>> of these task slots will run the Beam executable stages. Flink allows
>>>>>>> multiple jobs in one TM, so we could have executable stages of different
>>>>>>> pipelines running in a single TM, depending on how users deploy. The
>>>>>>> prototype also has no cleanup for the SDK containers, they remain running
>>>>>>> and orphaned once the runner is gone.
>>>>>>>
>>>>>>> I'm trying to find out how this approach can be augmented for
>>>>>>> deployment on Kubernetes. Our deployments won't allow multiple jobs per
>>>>>>> task manager, so all task slots will belong to the same pipeline context.
>>>>>>> The intent is to deploy SDK harness containers along with TMs in the same
>>>>>>> pod. No assumption can be made about the order in which the containers are
>>>>>>> started, and the SDK container wouldn't know the connect address at startup
>>>>>>> (it can only be discovered after the pipeline gets deployed into the TMs).
>>>>>>>
>>>>>>> I talked about that a while ago with Henning and one idea was to set
>>>>>>> a fixed endpoint address so that the boot code in the SDK container knows
>>>>>>> upfront where to connect to, even when that endpoint isn't available yet.
>>>>>>> This approach may work with minimal changes to runner and little or no
>>>>>>> change to SDK container (as long as the SDK is prepared to retry). The
>>>>>>> downside is that all (parallel) task slots of the TM will use the same SDK
>>>>>>> worker, which will likely lead to performance issues, at least with the
>>>>>>> Python SDK that we are planning to use.
>>>>>>>
>>>>>>> An alternative may be to define an SDK worker pool per pod, with a
>>>>>>> discovery mechanism for workers to find the runner endpoints and a
>>>>>>> coordination mechanism that distributes the dynamically allocated endpoints
>>>>>>> that are provided by the executable stage task slots over the available
>>>>>>> workers.
>>>>>>>
>>>>>>> Any thoughts on this? Is anyone else looking at a docker free
>>>>>>> deployment?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>
>>>
>

Re: SDK Harness Deployment

Posted by Thomas Weise <th...@apache.org>.
Sounds good.

Regarding (2), the bootstrapping endpoints serve job specific info, but a
Flink task manager lifecycle isn't tied to a single job (it is a worker
that exists even before the pipeline was submitted). Therefore we will need
to fall back to (1). Boot code would fail to connect until the job was
deployed and the Flink runner has established the endpoint. This should
work fine, as long as the boot code is retried without causing the entire
container to exit, it may just be some noise in the logs?

In my scenario there won't be a second job that runs on the same task
manager, since we are planning to deploy Flink along with the application.
But Flink in general also supports a "session" mode where multiple jobs can
share the same set of task managers. In that case it would be necessary to
isolate the SDK workers because they can only serve a single job (unless
what you have listed under static information is identical).

Looking at the current runner code there will be some work in the
JobResourceManager/SingletonSdkHarnessManager neighborhood that I can pick
up once we have the basics working in master. Currently SDK workers can
only be distinguished by the port they connect to, the runner does not look
at the worker ID or makes it available in any way. So the support to
multiplex has to be added. Perhaps Ben/Alex can comment on this?

Thanks,
Thomas


On Fri, Jun 8, 2018 at 10:19 AM, Henning Rohde <he...@google.com> wrote:

> You're right. That is the idea.
>
> Two comments on the executable stage not being available yet:
>   (1) An SDK harness may either retry or fail (exit) if it can't
> connect/times out/gets an error. If it exits, the runner/environment is
> responsible for restarting the process/container. So it will effectively
> always retry. The boot code currently used tries to connect for 2 mins
> after which it gives up (and in turn is restarted and tries again). The
> 2min is set a bit arbitrarily, btw, so we can adjust it for the default
> containers.
>   (2) The 2 bootstrapping endpoints serve static information (pipeline
> options, artifacts, and job metadata) that may not require an executable
> stage -- for example, if the artifact service just serves data from HDFS.
> The control endpoint is mainly driven by the runner side, so the
> multiplexer could allow any SDK harness to connect, but it just wouldn't
> send any actual instructions until the executable stages were ready. So for
> 2nd jobs or if we have some global hooks into the TM (or deploy a separate
> process -- provisioning and artifacts are separate services to make this
> possible), it might be possible to allow the SDK harness to boot in
> parallel with the TM being fully ready. Disclaimer: I have a limited
> understanding of the Flink constraints here.
>
> Thanks,
>  Henning
>
>
>
> On Fri, Jun 8, 2018 at 7:49 AM Thomas Weise <th...@apache.org> wrote:
>
>> Yes, it did not occur to me that we have the identifier available for
>> this. I just took a fresh look at https://s.apache.org/beam-
>> fn-api-container-contract
>>
>> So it should be possible to start a pool of containers with pre-assigned
>> IDs in the pod, communicate the same set of IDs to the runner (via it's
>> configuration) and then come up with some mechanism to assign executable
>> stages to worker IDs as part of the Flink operator initialization.
>>
>> By the time the SDK boot code calls the provisioning service to fetch the
>> pipeline options, the runner wouldn't be ready (either since the TM isn't
>> running or the executable stages were not deployed into it yet). So will
>> that call just retry until the endpoint becomes available? On the runner
>> side, the endpoint can only be activated (under the fixed address) when the
>> task slots are assigned.
>>
>> Thanks,
>> Thomas
>>
>>
>>
>>
>>
>> On Wed, Jun 6, 2018 at 3:19 PM, Henning Rohde <he...@google.com> wrote:
>>
>>> Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC
>>> header when it connects to the TM. The TM can use a fixed port and
>>> multiplex requests based on that id - to match the SDK harness with the
>>> appropriate job/slot/whatnot. The relationship between SDK harness and TM
>>> is not limited to 1:1, but rather many:1. We'll likely need that for
>>> cross-language as well. Wouldn't multiplexing on a single port for the
>>> control plane be the easiest solution for both #1 and #2? The data plane
>>> can still use various dynamically-allocated ports.
>>>
>>> On Kubernetes, we're somewhat constrained by the pod lifetime and
>>> multi-job TMs might not be as natural to achieve.
>>>
>>> Thanks,
>>>  Henning
>>>
>>> On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Hi Henning,
>>>>
>>>> Here is a page that explains the scheduling and overall functioning of
>>>> the task manager in Flink:
>>>>
>>>> https://ci.apache.org/projects/flink/flink-docs-
>>>> release-1.5/internals/job_scheduling.html
>>>>
>>>> Here are the 2 issues:
>>>>
>>>> #1 each task manager process get assigned multiple units of execution
>>>> into task slots. So when we deploy a Beam pipeline, we can end up with
>>>> multiple executable stages running in a single TM JVM.
>>>>
>>>> This where a 1-to-1 relationship between TM and SDK harness can lead to
>>>> a bottleneck (all task slots of a single TM push their work to a single SDK
>>>> container).
>>>>
>>>> #2 in a deployment where multiple pipelines share a Flink cluster, the
>>>> SDK harness per TM approach wouldn't work logically. We would need to have
>>>> multiple SDK containers, not just for efficiency reasons.
>>>>
>>>> This would not be an issue for the deployment scenario I'm looking at,
>>>> but it needs to be considered for general Flink runner deployment.
>>>>
>>>> Regarding the assignment of fixed endpoints within the TM, that is
>>>> possible but it doesn't address #1 and #2.
>>>>
>>>> I hope this clarifies?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <he...@google.com>
>>>> wrote:
>>>>
>>>>> Thanks for writing down and explaining the problem, Thomas. Let me try
>>>>> to tease some of the topics apart.
>>>>>
>>>>> First, the basic setup is currently as follows: there are 2 worker
>>>>> processes (A) "SDK harness" and (B) "Runner harness" that needs to
>>>>> communicate. A connects to B. The fundamental endpoint(s) of B as well as
>>>>> an id -- logging, provisioning, artifacts and control -- are provided to A
>>>>> via command line parameters. A is not expected to be able to connect to the
>>>>> control port without first obtaining pipeline options (from provisioning)
>>>>> and staged files (from artifacts). As an side, this is where the separate
>>>>> boot.go code comes in handy. A can assume it will be restarted, if it
>>>>> exits. A does not assume the given endpoints are up when started and should
>>>>> make blocking calls with timeout (but if not and exits, it is restarted
>>>>> anyway and will retry). Note that the data plane endpoints are part of the
>>>>> control instructions and need not be known or allocated at startup or even
>>>>> be served by the same TM.
>>>>>
>>>>> Second, whether or not docker is used is rather an implementation
>>>>> detail, but if we use Kubernetes (or other such options) then some
>>>>> constraints come into play.
>>>>>
>>>>> Either way, two scenarios work well:
>>>>>    (1) B starts A: The ULR and Flink prototype does this. B will delay
>>>>> starting A until it has decided which endpoints to use. This approach
>>>>> requires B to do process/container management, which we'd rather not have
>>>>> to do at scale. But it's convenient for local runners.
>>>>>    (2) B has its (local) endpoints configured or fixed: A and B can be
>>>>> started concurrently. Dataflow does this. Kubernetes lends itself well to
>>>>> this approach (and handles container management for us).
>>>>>
>>>>> The Flink on Kubernetes scenario described above doesn't:
>>>>>    (3) B must use randomized (local) endpoints _and_ A and B are
>>>>> started concurrently: A would not know where to connect.
>>>>>
>>>>> Perhaps I'm not understanding the constraints of the TM well enough,
>>>>> but can we really not open a configured/fixed port from the TM --
>>>>> especially in a network-isolated Kubernetes pod? Adding a third process (C)
>>>>> "proxy" to the pod might by an alternative option and morph (3) into (2). B
>>>>> would configure C when it's ready. A would connect to C, but be blocked
>>>>> until B has configured it. C could perhaps even serve logging,
>>>>> provisioning, and artifacts without B. And the data plane would not go over
>>>>> C anyway. If control proxy'ing is a concern, then alternatively we would
>>>>> add an indirection to the container contract and provide the control
>>>>> endpoint in the provisioning api, say, or even a new discovery service.
>>>>>
>>>>> There are of course other options and tradeoffs, but having Flink work
>>>>> on Kubernetes and not go against the grain seems desirable to me.
>>>>>
>>>>> Thanks,
>>>>>  Henning
>>>>>
>>>>>
>>>>> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <th...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> The current plan for running the SDK harness is to execute docker to
>>>>>> launch SDK containers with service endpoints provided by the runner in the
>>>>>> docker command line.
>>>>>>
>>>>>> In the case of Flink runner (prototype), the service endpoints are
>>>>>> dynamically allocated per executable stage. There is typically one Flink
>>>>>> task manager running per machine. Each TM has multiple task slots. A subset
>>>>>> of these task slots will run the Beam executable stages. Flink allows
>>>>>> multiple jobs in one TM, so we could have executable stages of different
>>>>>> pipelines running in a single TM, depending on how users deploy. The
>>>>>> prototype also has no cleanup for the SDK containers, they remain running
>>>>>> and orphaned once the runner is gone.
>>>>>>
>>>>>> I'm trying to find out how this approach can be augmented for
>>>>>> deployment on Kubernetes. Our deployments won't allow multiple jobs per
>>>>>> task manager, so all task slots will belong to the same pipeline context.
>>>>>> The intent is to deploy SDK harness containers along with TMs in the same
>>>>>> pod. No assumption can be made about the order in which the containers are
>>>>>> started, and the SDK container wouldn't know the connect address at startup
>>>>>> (it can only be discovered after the pipeline gets deployed into the TMs).
>>>>>>
>>>>>> I talked about that a while ago with Henning and one idea was to set
>>>>>> a fixed endpoint address so that the boot code in the SDK container knows
>>>>>> upfront where to connect to, even when that endpoint isn't available yet.
>>>>>> This approach may work with minimal changes to runner and little or no
>>>>>> change to SDK container (as long as the SDK is prepared to retry). The
>>>>>> downside is that all (parallel) task slots of the TM will use the same SDK
>>>>>> worker, which will likely lead to performance issues, at least with the
>>>>>> Python SDK that we are planning to use.
>>>>>>
>>>>>> An alternative may be to define an SDK worker pool per pod, with a
>>>>>> discovery mechanism for workers to find the runner endpoints and a
>>>>>> coordination mechanism that distributes the dynamically allocated endpoints
>>>>>> that are provided by the executable stage task slots over the available
>>>>>> workers.
>>>>>>
>>>>>> Any thoughts on this? Is anyone else looking at a docker free
>>>>>> deployment?
>>>>>>
>>>>>> Thanks,
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>
>>

Re: SDK Harness Deployment

Posted by Henning Rohde <he...@google.com>.
You're right. That is the idea.

Two comments on the executable stage not being available yet:
  (1) An SDK harness may either retry or fail (exit) if it can't
connect/times out/gets an error. If it exits, the runner/environment is
responsible for restarting the process/container. So it will effectively
always retry. The boot code currently used tries to connect for 2 mins
after which it gives up (and in turn is restarted and tries again). The
2min is set a bit arbitrarily, btw, so we can adjust it for the default
containers.
  (2) The 2 bootstrapping endpoints serve static information (pipeline
options, artifacts, and job metadata) that may not require an executable
stage -- for example, if the artifact service just serves data from HDFS.
The control endpoint is mainly driven by the runner side, so the
multiplexer could allow any SDK harness to connect, but it just wouldn't
send any actual instructions until the executable stages were ready. So for
2nd jobs or if we have some global hooks into the TM (or deploy a separate
process -- provisioning and artifacts are separate services to make this
possible), it might be possible to allow the SDK harness to boot in
parallel with the TM being fully ready. Disclaimer: I have a limited
understanding of the Flink constraints here.

Thanks,
 Henning



On Fri, Jun 8, 2018 at 7:49 AM Thomas Weise <th...@apache.org> wrote:

> Yes, it did not occur to me that we have the identifier available for
> this. I just took a fresh look at
> https://s.apache.org/beam-fn-api-container-contract
>
> So it should be possible to start a pool of containers with pre-assigned
> IDs in the pod, communicate the same set of IDs to the runner (via it's
> configuration) and then come up with some mechanism to assign executable
> stages to worker IDs as part of the Flink operator initialization.
>
> By the time the SDK boot code calls the provisioning service to fetch the
> pipeline options, the runner wouldn't be ready (either since the TM isn't
> running or the executable stages were not deployed into it yet). So will
> that call just retry until the endpoint becomes available? On the runner
> side, the endpoint can only be activated (under the fixed address) when the
> task slots are assigned.
>
> Thanks,
> Thomas
>
>
>
>
>
> On Wed, Jun 6, 2018 at 3:19 PM, Henning Rohde <he...@google.com> wrote:
>
>> Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC
>> header when it connects to the TM. The TM can use a fixed port and
>> multiplex requests based on that id - to match the SDK harness with the
>> appropriate job/slot/whatnot. The relationship between SDK harness and TM
>> is not limited to 1:1, but rather many:1. We'll likely need that for
>> cross-language as well. Wouldn't multiplexing on a single port for the
>> control plane be the easiest solution for both #1 and #2? The data plane
>> can still use various dynamically-allocated ports.
>>
>> On Kubernetes, we're somewhat constrained by the pod lifetime and
>> multi-job TMs might not be as natural to achieve.
>>
>> Thanks,
>>  Henning
>>
>> On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <th...@apache.org> wrote:
>>
>>> Hi Henning,
>>>
>>> Here is a page that explains the scheduling and overall functioning of
>>> the task manager in Flink:
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html
>>>
>>> Here are the 2 issues:
>>>
>>> #1 each task manager process get assigned multiple units of execution
>>> into task slots. So when we deploy a Beam pipeline, we can end up with
>>> multiple executable stages running in a single TM JVM.
>>>
>>> This where a 1-to-1 relationship between TM and SDK harness can lead to
>>> a bottleneck (all task slots of a single TM push their work to a single SDK
>>> container).
>>>
>>> #2 in a deployment where multiple pipelines share a Flink cluster, the
>>> SDK harness per TM approach wouldn't work logically. We would need to have
>>> multiple SDK containers, not just for efficiency reasons.
>>>
>>> This would not be an issue for the deployment scenario I'm looking at,
>>> but it needs to be considered for general Flink runner deployment.
>>>
>>> Regarding the assignment of fixed endpoints within the TM, that is
>>> possible but it doesn't address #1 and #2.
>>>
>>> I hope this clarifies?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <he...@google.com>
>>> wrote:
>>>
>>>> Thanks for writing down and explaining the problem, Thomas. Let me try
>>>> to tease some of the topics apart.
>>>>
>>>> First, the basic setup is currently as follows: there are 2 worker
>>>> processes (A) "SDK harness" and (B) "Runner harness" that needs to
>>>> communicate. A connects to B. The fundamental endpoint(s) of B as well as
>>>> an id -- logging, provisioning, artifacts and control -- are provided to A
>>>> via command line parameters. A is not expected to be able to connect to the
>>>> control port without first obtaining pipeline options (from provisioning)
>>>> and staged files (from artifacts). As an side, this is where the separate
>>>> boot.go code comes in handy. A can assume it will be restarted, if it
>>>> exits. A does not assume the given endpoints are up when started and should
>>>> make blocking calls with timeout (but if not and exits, it is restarted
>>>> anyway and will retry). Note that the data plane endpoints are part of the
>>>> control instructions and need not be known or allocated at startup or even
>>>> be served by the same TM.
>>>>
>>>> Second, whether or not docker is used is rather an implementation
>>>> detail, but if we use Kubernetes (or other such options) then some
>>>> constraints come into play.
>>>>
>>>> Either way, two scenarios work well:
>>>>    (1) B starts A: The ULR and Flink prototype does this. B will delay
>>>> starting A until it has decided which endpoints to use. This approach
>>>> requires B to do process/container management, which we'd rather not have
>>>> to do at scale. But it's convenient for local runners.
>>>>    (2) B has its (local) endpoints configured or fixed: A and B can be
>>>> started concurrently. Dataflow does this. Kubernetes lends itself well to
>>>> this approach (and handles container management for us).
>>>>
>>>> The Flink on Kubernetes scenario described above doesn't:
>>>>    (3) B must use randomized (local) endpoints _and_ A and B are
>>>> started concurrently: A would not know where to connect.
>>>>
>>>> Perhaps I'm not understanding the constraints of the TM well enough,
>>>> but can we really not open a configured/fixed port from the TM --
>>>> especially in a network-isolated Kubernetes pod? Adding a third process (C)
>>>> "proxy" to the pod might by an alternative option and morph (3) into (2). B
>>>> would configure C when it's ready. A would connect to C, but be blocked
>>>> until B has configured it. C could perhaps even serve logging,
>>>> provisioning, and artifacts without B. And the data plane would not go over
>>>> C anyway. If control proxy'ing is a concern, then alternatively we would
>>>> add an indirection to the container contract and provide the control
>>>> endpoint in the provisioning api, say, or even a new discovery service.
>>>>
>>>> There are of course other options and tradeoffs, but having Flink work
>>>> on Kubernetes and not go against the grain seems desirable to me.
>>>>
>>>> Thanks,
>>>>  Henning
>>>>
>>>>
>>>> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <th...@apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> The current plan for running the SDK harness is to execute docker to
>>>>> launch SDK containers with service endpoints provided by the runner in the
>>>>> docker command line.
>>>>>
>>>>> In the case of Flink runner (prototype), the service endpoints are
>>>>> dynamically allocated per executable stage. There is typically one Flink
>>>>> task manager running per machine. Each TM has multiple task slots. A subset
>>>>> of these task slots will run the Beam executable stages. Flink allows
>>>>> multiple jobs in one TM, so we could have executable stages of different
>>>>> pipelines running in a single TM, depending on how users deploy. The
>>>>> prototype also has no cleanup for the SDK containers, they remain running
>>>>> and orphaned once the runner is gone.
>>>>>
>>>>> I'm trying to find out how this approach can be augmented for
>>>>> deployment on Kubernetes. Our deployments won't allow multiple jobs per
>>>>> task manager, so all task slots will belong to the same pipeline context.
>>>>> The intent is to deploy SDK harness containers along with TMs in the same
>>>>> pod. No assumption can be made about the order in which the containers are
>>>>> started, and the SDK container wouldn't know the connect address at startup
>>>>> (it can only be discovered after the pipeline gets deployed into the TMs).
>>>>>
>>>>> I talked about that a while ago with Henning and one idea was to set a
>>>>> fixed endpoint address so that the boot code in the SDK container knows
>>>>> upfront where to connect to, even when that endpoint isn't available yet.
>>>>> This approach may work with minimal changes to runner and little or no
>>>>> change to SDK container (as long as the SDK is prepared to retry). The
>>>>> downside is that all (parallel) task slots of the TM will use the same SDK
>>>>> worker, which will likely lead to performance issues, at least with the
>>>>> Python SDK that we are planning to use.
>>>>>
>>>>> An alternative may be to define an SDK worker pool per pod, with a
>>>>> discovery mechanism for workers to find the runner endpoints and a
>>>>> coordination mechanism that distributes the dynamically allocated endpoints
>>>>> that are provided by the executable stage task slots over the available
>>>>> workers.
>>>>>
>>>>> Any thoughts on this? Is anyone else looking at a docker free
>>>>> deployment?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>
>

Re: SDK Harness Deployment

Posted by Thomas Weise <th...@apache.org>.
Yes, it did not occur to me that we have the identifier available for this.
I just took a fresh look at
https://s.apache.org/beam-fn-api-container-contract

So it should be possible to start a pool of containers with pre-assigned
IDs in the pod, communicate the same set of IDs to the runner (via it's
configuration) and then come up with some mechanism to assign executable
stages to worker IDs as part of the Flink operator initialization.

By the time the SDK boot code calls the provisioning service to fetch the
pipeline options, the runner wouldn't be ready (either since the TM isn't
running or the executable stages were not deployed into it yet). So will
that call just retry until the endpoint becomes available? On the runner
side, the endpoint can only be activated (under the fixed address) when the
task slots are assigned.

Thanks,
Thomas





On Wed, Jun 6, 2018 at 3:19 PM, Henning Rohde <he...@google.com> wrote:

> Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC
> header when it connects to the TM. The TM can use a fixed port and
> multiplex requests based on that id - to match the SDK harness with the
> appropriate job/slot/whatnot. The relationship between SDK harness and TM
> is not limited to 1:1, but rather many:1. We'll likely need that for
> cross-language as well. Wouldn't multiplexing on a single port for the
> control plane be the easiest solution for both #1 and #2? The data plane
> can still use various dynamically-allocated ports.
>
> On Kubernetes, we're somewhat constrained by the pod lifetime and
> multi-job TMs might not be as natural to achieve.
>
> Thanks,
>  Henning
>
> On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <th...@apache.org> wrote:
>
>> Hi Henning,
>>
>> Here is a page that explains the scheduling and overall functioning of
>> the task manager in Flink:
>>
>> https://ci.apache.org/projects/flink/flink-docs-
>> release-1.5/internals/job_scheduling.html
>>
>> Here are the 2 issues:
>>
>> #1 each task manager process get assigned multiple units of execution
>> into task slots. So when we deploy a Beam pipeline, we can end up with
>> multiple executable stages running in a single TM JVM.
>>
>> This where a 1-to-1 relationship between TM and SDK harness can lead to a
>> bottleneck (all task slots of a single TM push their work to a single SDK
>> container).
>>
>> #2 in a deployment where multiple pipelines share a Flink cluster, the
>> SDK harness per TM approach wouldn't work logically. We would need to have
>> multiple SDK containers, not just for efficiency reasons.
>>
>> This would not be an issue for the deployment scenario I'm looking at,
>> but it needs to be considered for general Flink runner deployment.
>>
>> Regarding the assignment of fixed endpoints within the TM, that is
>> possible but it doesn't address #1 and #2.
>>
>> I hope this clarifies?
>>
>> Thanks,
>> Thomas
>>
>>
>> On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <he...@google.com>
>> wrote:
>>
>>> Thanks for writing down and explaining the problem, Thomas. Let me try
>>> to tease some of the topics apart.
>>>
>>> First, the basic setup is currently as follows: there are 2 worker
>>> processes (A) "SDK harness" and (B) "Runner harness" that needs to
>>> communicate. A connects to B. The fundamental endpoint(s) of B as well as
>>> an id -- logging, provisioning, artifacts and control -- are provided to A
>>> via command line parameters. A is not expected to be able to connect to the
>>> control port without first obtaining pipeline options (from provisioning)
>>> and staged files (from artifacts). As an side, this is where the separate
>>> boot.go code comes in handy. A can assume it will be restarted, if it
>>> exits. A does not assume the given endpoints are up when started and should
>>> make blocking calls with timeout (but if not and exits, it is restarted
>>> anyway and will retry). Note that the data plane endpoints are part of the
>>> control instructions and need not be known or allocated at startup or even
>>> be served by the same TM.
>>>
>>> Second, whether or not docker is used is rather an implementation
>>> detail, but if we use Kubernetes (or other such options) then some
>>> constraints come into play.
>>>
>>> Either way, two scenarios work well:
>>>    (1) B starts A: The ULR and Flink prototype does this. B will delay
>>> starting A until it has decided which endpoints to use. This approach
>>> requires B to do process/container management, which we'd rather not have
>>> to do at scale. But it's convenient for local runners.
>>>    (2) B has its (local) endpoints configured or fixed: A and B can be
>>> started concurrently. Dataflow does this. Kubernetes lends itself well to
>>> this approach (and handles container management for us).
>>>
>>> The Flink on Kubernetes scenario described above doesn't:
>>>    (3) B must use randomized (local) endpoints _and_ A and B are started
>>> concurrently: A would not know where to connect.
>>>
>>> Perhaps I'm not understanding the constraints of the TM well enough, but
>>> can we really not open a configured/fixed port from the TM -- especially in
>>> a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to
>>> the pod might by an alternative option and morph (3) into (2). B would
>>> configure C when it's ready. A would connect to C, but be blocked until B
>>> has configured it. C could perhaps even serve logging, provisioning, and
>>> artifacts without B. And the data plane would not go over C anyway. If
>>> control proxy'ing is a concern, then alternatively we would add an
>>> indirection to the container contract and provide the control endpoint in
>>> the provisioning api, say, or even a new discovery service.
>>>
>>> There are of course other options and tradeoffs, but having Flink work
>>> on Kubernetes and not go against the grain seems desirable to me.
>>>
>>> Thanks,
>>>  Henning
>>>
>>>
>>> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <th...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> The current plan for running the SDK harness is to execute docker to
>>>> launch SDK containers with service endpoints provided by the runner in the
>>>> docker command line.
>>>>
>>>> In the case of Flink runner (prototype), the service endpoints are
>>>> dynamically allocated per executable stage. There is typically one Flink
>>>> task manager running per machine. Each TM has multiple task slots. A subset
>>>> of these task slots will run the Beam executable stages. Flink allows
>>>> multiple jobs in one TM, so we could have executable stages of different
>>>> pipelines running in a single TM, depending on how users deploy. The
>>>> prototype also has no cleanup for the SDK containers, they remain running
>>>> and orphaned once the runner is gone.
>>>>
>>>> I'm trying to find out how this approach can be augmented for
>>>> deployment on Kubernetes. Our deployments won't allow multiple jobs per
>>>> task manager, so all task slots will belong to the same pipeline context.
>>>> The intent is to deploy SDK harness containers along with TMs in the same
>>>> pod. No assumption can be made about the order in which the containers are
>>>> started, and the SDK container wouldn't know the connect address at startup
>>>> (it can only be discovered after the pipeline gets deployed into the TMs).
>>>>
>>>> I talked about that a while ago with Henning and one idea was to set a
>>>> fixed endpoint address so that the boot code in the SDK container knows
>>>> upfront where to connect to, even when that endpoint isn't available yet.
>>>> This approach may work with minimal changes to runner and little or no
>>>> change to SDK container (as long as the SDK is prepared to retry). The
>>>> downside is that all (parallel) task slots of the TM will use the same SDK
>>>> worker, which will likely lead to performance issues, at least with the
>>>> Python SDK that we are planning to use.
>>>>
>>>> An alternative may be to define an SDK worker pool per pod, with a
>>>> discovery mechanism for workers to find the runner endpoints and a
>>>> coordination mechanism that distributes the dynamically allocated endpoints
>>>> that are provided by the executable stage task slots over the available
>>>> workers.
>>>>
>>>> Any thoughts on this? Is anyone else looking at a docker free
>>>> deployment?
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>

Re: SDK Harness Deployment

Posted by Henning Rohde <he...@google.com>.
Thanks Thomas. The id provided to the SDK harness must be sent as a gRPC
header when it connects to the TM. The TM can use a fixed port and
multiplex requests based on that id - to match the SDK harness with the
appropriate job/slot/whatnot. The relationship between SDK harness and TM
is not limited to 1:1, but rather many:1. We'll likely need that for
cross-language as well. Wouldn't multiplexing on a single port for the
control plane be the easiest solution for both #1 and #2? The data plane
can still use various dynamically-allocated ports.

On Kubernetes, we're somewhat constrained by the pod lifetime and multi-job
TMs might not be as natural to achieve.

Thanks,
 Henning

On Wed, Jun 6, 2018 at 2:28 PM Thomas Weise <th...@apache.org> wrote:

> Hi Henning,
>
> Here is a page that explains the scheduling and overall functioning of the
> task manager in Flink:
>
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html
>
> Here are the 2 issues:
>
> #1 each task manager process get assigned multiple units of execution into
> task slots. So when we deploy a Beam pipeline, we can end up with multiple
> executable stages running in a single TM JVM.
>
> This where a 1-to-1 relationship between TM and SDK harness can lead to a
> bottleneck (all task slots of a single TM push their work to a single SDK
> container).
>
> #2 in a deployment where multiple pipelines share a Flink cluster, the SDK
> harness per TM approach wouldn't work logically. We would need to have
> multiple SDK containers, not just for efficiency reasons.
>
> This would not be an issue for the deployment scenario I'm looking at, but
> it needs to be considered for general Flink runner deployment.
>
> Regarding the assignment of fixed endpoints within the TM, that is
> possible but it doesn't address #1 and #2.
>
> I hope this clarifies?
>
> Thanks,
> Thomas
>
>
> On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <he...@google.com> wrote:
>
>> Thanks for writing down and explaining the problem, Thomas. Let me try to
>> tease some of the topics apart.
>>
>> First, the basic setup is currently as follows: there are 2 worker
>> processes (A) "SDK harness" and (B) "Runner harness" that needs to
>> communicate. A connects to B. The fundamental endpoint(s) of B as well as
>> an id -- logging, provisioning, artifacts and control -- are provided to A
>> via command line parameters. A is not expected to be able to connect to the
>> control port without first obtaining pipeline options (from provisioning)
>> and staged files (from artifacts). As an side, this is where the separate
>> boot.go code comes in handy. A can assume it will be restarted, if it
>> exits. A does not assume the given endpoints are up when started and should
>> make blocking calls with timeout (but if not and exits, it is restarted
>> anyway and will retry). Note that the data plane endpoints are part of the
>> control instructions and need not be known or allocated at startup or even
>> be served by the same TM.
>>
>> Second, whether or not docker is used is rather an implementation detail,
>> but if we use Kubernetes (or other such options) then some constraints come
>> into play.
>>
>> Either way, two scenarios work well:
>>    (1) B starts A: The ULR and Flink prototype does this. B will delay
>> starting A until it has decided which endpoints to use. This approach
>> requires B to do process/container management, which we'd rather not have
>> to do at scale. But it's convenient for local runners.
>>    (2) B has its (local) endpoints configured or fixed: A and B can be
>> started concurrently. Dataflow does this. Kubernetes lends itself well to
>> this approach (and handles container management for us).
>>
>> The Flink on Kubernetes scenario described above doesn't:
>>    (3) B must use randomized (local) endpoints _and_ A and B are started
>> concurrently: A would not know where to connect.
>>
>> Perhaps I'm not understanding the constraints of the TM well enough, but
>> can we really not open a configured/fixed port from the TM -- especially in
>> a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to
>> the pod might by an alternative option and morph (3) into (2). B would
>> configure C when it's ready. A would connect to C, but be blocked until B
>> has configured it. C could perhaps even serve logging, provisioning, and
>> artifacts without B. And the data plane would not go over C anyway. If
>> control proxy'ing is a concern, then alternatively we would add an
>> indirection to the container contract and provide the control endpoint in
>> the provisioning api, say, or even a new discovery service.
>>
>> There are of course other options and tradeoffs, but having Flink work on
>> Kubernetes and not go against the grain seems desirable to me.
>>
>> Thanks,
>>  Henning
>>
>>
>> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <th...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> The current plan for running the SDK harness is to execute docker to
>>> launch SDK containers with service endpoints provided by the runner in the
>>> docker command line.
>>>
>>> In the case of Flink runner (prototype), the service endpoints are
>>> dynamically allocated per executable stage. There is typically one Flink
>>> task manager running per machine. Each TM has multiple task slots. A subset
>>> of these task slots will run the Beam executable stages. Flink allows
>>> multiple jobs in one TM, so we could have executable stages of different
>>> pipelines running in a single TM, depending on how users deploy. The
>>> prototype also has no cleanup for the SDK containers, they remain running
>>> and orphaned once the runner is gone.
>>>
>>> I'm trying to find out how this approach can be augmented for deployment
>>> on Kubernetes. Our deployments won't allow multiple jobs per task manager,
>>> so all task slots will belong to the same pipeline context. The intent is
>>> to deploy SDK harness containers along with TMs in the same pod. No
>>> assumption can be made about the order in which the containers are started,
>>> and the SDK container wouldn't know the connect address at startup (it can
>>> only be discovered after the pipeline gets deployed into the TMs).
>>>
>>> I talked about that a while ago with Henning and one idea was to set a
>>> fixed endpoint address so that the boot code in the SDK container knows
>>> upfront where to connect to, even when that endpoint isn't available yet.
>>> This approach may work with minimal changes to runner and little or no
>>> change to SDK container (as long as the SDK is prepared to retry). The
>>> downside is that all (parallel) task slots of the TM will use the same SDK
>>> worker, which will likely lead to performance issues, at least with the
>>> Python SDK that we are planning to use.
>>>
>>> An alternative may be to define an SDK worker pool per pod, with a
>>> discovery mechanism for workers to find the runner endpoints and a
>>> coordination mechanism that distributes the dynamically allocated endpoints
>>> that are provided by the executable stage task slots over the available
>>> workers.
>>>
>>> Any thoughts on this? Is anyone else looking at a docker free deployment?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>

Re: SDK Harness Deployment

Posted by Thomas Weise <th...@apache.org>.
Hi Henning,

Here is a page that explains the scheduling and overall functioning of the
task manager in Flink:

https://ci.apache.org/projects/flink/flink-docs-release-1.5/internals/job_scheduling.html

Here are the 2 issues:

#1 each task manager process get assigned multiple units of execution into
task slots. So when we deploy a Beam pipeline, we can end up with multiple
executable stages running in a single TM JVM.

This where a 1-to-1 relationship between TM and SDK harness can lead to a
bottleneck (all task slots of a single TM push their work to a single SDK
container).

#2 in a deployment where multiple pipelines share a Flink cluster, the SDK
harness per TM approach wouldn't work logically. We would need to have
multiple SDK containers, not just for efficiency reasons.

This would not be an issue for the deployment scenario I'm looking at, but
it needs to be considered for general Flink runner deployment.

Regarding the assignment of fixed endpoints within the TM, that is possible
but it doesn't address #1 and #2.

I hope this clarifies?

Thanks,
Thomas


On Wed, Jun 6, 2018 at 12:31 PM, Henning Rohde <he...@google.com> wrote:

> Thanks for writing down and explaining the problem, Thomas. Let me try to
> tease some of the topics apart.
>
> First, the basic setup is currently as follows: there are 2 worker
> processes (A) "SDK harness" and (B) "Runner harness" that needs to
> communicate. A connects to B. The fundamental endpoint(s) of B as well as
> an id -- logging, provisioning, artifacts and control -- are provided to A
> via command line parameters. A is not expected to be able to connect to the
> control port without first obtaining pipeline options (from provisioning)
> and staged files (from artifacts). As an side, this is where the separate
> boot.go code comes in handy. A can assume it will be restarted, if it
> exits. A does not assume the given endpoints are up when started and should
> make blocking calls with timeout (but if not and exits, it is restarted
> anyway and will retry). Note that the data plane endpoints are part of the
> control instructions and need not be known or allocated at startup or even
> be served by the same TM.
>
> Second, whether or not docker is used is rather an implementation detail,
> but if we use Kubernetes (or other such options) then some constraints come
> into play.
>
> Either way, two scenarios work well:
>    (1) B starts A: The ULR and Flink prototype does this. B will delay
> starting A until it has decided which endpoints to use. This approach
> requires B to do process/container management, which we'd rather not have
> to do at scale. But it's convenient for local runners.
>    (2) B has its (local) endpoints configured or fixed: A and B can be
> started concurrently. Dataflow does this. Kubernetes lends itself well to
> this approach (and handles container management for us).
>
> The Flink on Kubernetes scenario described above doesn't:
>    (3) B must use randomized (local) endpoints _and_ A and B are started
> concurrently: A would not know where to connect.
>
> Perhaps I'm not understanding the constraints of the TM well enough, but
> can we really not open a configured/fixed port from the TM -- especially in
> a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to
> the pod might by an alternative option and morph (3) into (2). B would
> configure C when it's ready. A would connect to C, but be blocked until B
> has configured it. C could perhaps even serve logging, provisioning, and
> artifacts without B. And the data plane would not go over C anyway. If
> control proxy'ing is a concern, then alternatively we would add an
> indirection to the container contract and provide the control endpoint in
> the provisioning api, say, or even a new discovery service.
>
> There are of course other options and tradeoffs, but having Flink work on
> Kubernetes and not go against the grain seems desirable to me.
>
> Thanks,
>  Henning
>
>
> On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <th...@apache.org> wrote:
>
>> Hi,
>>
>> The current plan for running the SDK harness is to execute docker to
>> launch SDK containers with service endpoints provided by the runner in the
>> docker command line.
>>
>> In the case of Flink runner (prototype), the service endpoints are
>> dynamically allocated per executable stage. There is typically one Flink
>> task manager running per machine. Each TM has multiple task slots. A subset
>> of these task slots will run the Beam executable stages. Flink allows
>> multiple jobs in one TM, so we could have executable stages of different
>> pipelines running in a single TM, depending on how users deploy. The
>> prototype also has no cleanup for the SDK containers, they remain running
>> and orphaned once the runner is gone.
>>
>> I'm trying to find out how this approach can be augmented for deployment
>> on Kubernetes. Our deployments won't allow multiple jobs per task manager,
>> so all task slots will belong to the same pipeline context. The intent is
>> to deploy SDK harness containers along with TMs in the same pod. No
>> assumption can be made about the order in which the containers are started,
>> and the SDK container wouldn't know the connect address at startup (it can
>> only be discovered after the pipeline gets deployed into the TMs).
>>
>> I talked about that a while ago with Henning and one idea was to set a
>> fixed endpoint address so that the boot code in the SDK container knows
>> upfront where to connect to, even when that endpoint isn't available yet.
>> This approach may work with minimal changes to runner and little or no
>> change to SDK container (as long as the SDK is prepared to retry). The
>> downside is that all (parallel) task slots of the TM will use the same SDK
>> worker, which will likely lead to performance issues, at least with the
>> Python SDK that we are planning to use.
>>
>> An alternative may be to define an SDK worker pool per pod, with a
>> discovery mechanism for workers to find the runner endpoints and a
>> coordination mechanism that distributes the dynamically allocated endpoints
>> that are provided by the executable stage task slots over the available
>> workers.
>>
>> Any thoughts on this? Is anyone else looking at a docker free deployment?
>>
>> Thanks,
>> Thomas
>>
>>

Re: SDK Harness Deployment

Posted by Henning Rohde <he...@google.com>.
Thanks for writing down and explaining the problem, Thomas. Let me try to
tease some of the topics apart.

First, the basic setup is currently as follows: there are 2 worker
processes (A) "SDK harness" and (B) "Runner harness" that needs to
communicate. A connects to B. The fundamental endpoint(s) of B as well as
an id -- logging, provisioning, artifacts and control -- are provided to A
via command line parameters. A is not expected to be able to connect to the
control port without first obtaining pipeline options (from provisioning)
and staged files (from artifacts). As an side, this is where the separate
boot.go code comes in handy. A can assume it will be restarted, if it
exits. A does not assume the given endpoints are up when started and should
make blocking calls with timeout (but if not and exits, it is restarted
anyway and will retry). Note that the data plane endpoints are part of the
control instructions and need not be known or allocated at startup or even
be served by the same TM.

Second, whether or not docker is used is rather an implementation detail,
but if we use Kubernetes (or other such options) then some constraints come
into play.

Either way, two scenarios work well:
   (1) B starts A: The ULR and Flink prototype does this. B will delay
starting A until it has decided which endpoints to use. This approach
requires B to do process/container management, which we'd rather not have
to do at scale. But it's convenient for local runners.
   (2) B has its (local) endpoints configured or fixed: A and B can be
started concurrently. Dataflow does this. Kubernetes lends itself well to
this approach (and handles container management for us).

The Flink on Kubernetes scenario described above doesn't:
   (3) B must use randomized (local) endpoints _and_ A and B are started
concurrently: A would not know where to connect.

Perhaps I'm not understanding the constraints of the TM well enough, but
can we really not open a configured/fixed port from the TM -- especially in
a network-isolated Kubernetes pod? Adding a third process (C) "proxy" to
the pod might by an alternative option and morph (3) into (2). B would
configure C when it's ready. A would connect to C, but be blocked until B
has configured it. C could perhaps even serve logging, provisioning, and
artifacts without B. And the data plane would not go over C anyway. If
control proxy'ing is a concern, then alternatively we would add an
indirection to the container contract and provide the control endpoint in
the provisioning api, say, or even a new discovery service.

There are of course other options and tradeoffs, but having Flink work on
Kubernetes and not go against the grain seems desirable to me.

Thanks,
 Henning


On Wed, Jun 6, 2018 at 10:11 AM Thomas Weise <th...@apache.org> wrote:

> Hi,
>
> The current plan for running the SDK harness is to execute docker to
> launch SDK containers with service endpoints provided by the runner in the
> docker command line.
>
> In the case of Flink runner (prototype), the service endpoints are
> dynamically allocated per executable stage. There is typically one Flink
> task manager running per machine. Each TM has multiple task slots. A subset
> of these task slots will run the Beam executable stages. Flink allows
> multiple jobs in one TM, so we could have executable stages of different
> pipelines running in a single TM, depending on how users deploy. The
> prototype also has no cleanup for the SDK containers, they remain running
> and orphaned once the runner is gone.
>
> I'm trying to find out how this approach can be augmented for deployment
> on Kubernetes. Our deployments won't allow multiple jobs per task manager,
> so all task slots will belong to the same pipeline context. The intent is
> to deploy SDK harness containers along with TMs in the same pod. No
> assumption can be made about the order in which the containers are started,
> and the SDK container wouldn't know the connect address at startup (it can
> only be discovered after the pipeline gets deployed into the TMs).
>
> I talked about that a while ago with Henning and one idea was to set a
> fixed endpoint address so that the boot code in the SDK container knows
> upfront where to connect to, even when that endpoint isn't available yet.
> This approach may work with minimal changes to runner and little or no
> change to SDK container (as long as the SDK is prepared to retry). The
> downside is that all (parallel) task slots of the TM will use the same SDK
> worker, which will likely lead to performance issues, at least with the
> Python SDK that we are planning to use.
>
> An alternative may be to define an SDK worker pool per pod, with a
> discovery mechanism for workers to find the runner endpoints and a
> coordination mechanism that distributes the dynamically allocated endpoints
> that are provided by the executable stage task slots over the available
> workers.
>
> Any thoughts on this? Is anyone else looking at a docker free deployment?
>
> Thanks,
> Thomas
>
>