You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Rong Rong <wa...@gmail.com> on 2019/11/01 03:01:50 UTC

Re: [DISCUSS] Semantic and implementation of per-job mode

Hi All,

Thanks @Tison for starting the discussion and I think we have very similar
scenario with Theo's use cases.
In our case we also generates the job graph using a client service (which
serves multiple job graph generation from multiple user code) and we've
found that managing the upload/download between the cluster and the DFS to
be trick and error-prone. In addition, the management of different
environment and requirement from different user in a single service posts
even more trouble for us.

However, shifting the job graph generation towards the cluster side also
requires some thoughts regarding how to manage the driver-job as well as
some dependencies conflicts - In the case for shipping the job graph
generation to the cluster, some unnecessary dependencies for the runtime
will be pulled in by the driver-job (correct me if I were wrong Theo)

I think in general I agree with @Gyula's main point: unless there is a very
strong reason, it is better if we put the driver-mode as an opt-in (at
least at the beginning).
I left some comments on the document as well. Please kindly take a look.

Thanks,
Rong

On Thu, Oct 31, 2019 at 9:26 AM Chan, Regina <Re...@gs.com> wrote:

> Yeah just chiming in this conversation as well. We heavily use multiple
> job graphs to get isolation around retry logic and resource allocation
> across the job graphs. Putting all these parallel flows into a single graph
> would mean sharing of TaskManagers across what was meant to be truly
> independent.
>
>
>
> We also build our job graphs dynamically based off of the state of the
> world at the start of the job. While we’ve had a share of the pain
> described, my understanding is that there would be a tradeoff in number of
> jobs being submitted to the cluster and corresponding resource allocation
> requests. In the model with multiple jobs in a program, there’s at least
> the opportunity to reuse idle taskmanagers.
>
>
>
>
>
>
>
>
>
> *From:* Theo Diefenthal <th...@scoop-software.de>
> *Sent:* Thursday, October 31, 2019 10:56 AM
> *To:* user@flink.apache.org
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> I agree with Gyula Fora,
>
>
>
> In our case, we have a client-machine in the middle between our YARN
> cluster and some backend services, which can not be reached directly from
> the cluster nodes. On application startup, we connect to some external
> systems, get some information crucial for the job runtime and finally build
> up the job graph to be committed.
>
>
>
> It is true that we could workaround this, but it would be pretty annoying
> to connect to the remote services, collect the data, upload it to HDFS,
> start the job and make sure, housekeeping of those files is also done at
> some later time.
>
>
>
> The current behavior also corresponds to the behavior of Sparks driver
> mode, which made the transition from Spark to Flink easier for us.
>
>
>
> But I see the point, especially in terms of Kubernetes and would thus also
> vote for an opt-in solution, being the client compilation the default and
> having an option for the per-program mode as well.
>
>
>
> Best regards
>
>
> ------------------------------
>
> *Von: *"Flavio Pompermaier" <po...@okkam.it>
> *An: *"Yang Wang" <da...@gmail.com>
> *CC: *"tison" <wa...@gmail.com>, "Newport, Billy" <
> Billy.Newport@gs.com>, "Paul Lam" <pa...@gmail.com>, "SHI Xiaogang"
> <sh...@gmail.com>, "dev" <de...@flink.apache.org>, "user" <
> user@flink.apache.org>
> *Gesendet: *Donnerstag, 31. Oktober 2019 10:45:36
> *Betreff: *Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> Hi all,
>
> we're using a lot the multiple jobs in one program and this is why: when
> you fetch data from a huge number of sources and, for each source, you do
> some transformation and then you want to write into a single directory the
> union of all outputs (this assumes you're doing batch). When the number of
> sources is large, if you want to do this in a single job, the graph becomes
> very big and this is a problem for several reasons:
>
>    - too many substasks /threadsi per slot
>    - increase of back pressure
>    - if a single "sub-job" fails all the job fails..this is very annoying
>    if this happens after a half a day for example
>    - In our use case, the big-graph mode takes much longer than running
>    each job separately (but maybe this is true only if you don't have much
>    hardware resources)
>    - debugging the cause of a fail could become a daunting task if the
>    job graph is too large
>    - we faced may strange errors when trying to run the single big-job
>    mode (due to serialization corruption)
>
> So, summarizing our overall experience with Flink batch is: the easier is
> the job graph the better!
>
>
>
> Best,
>
> Flavio
>
>
>
>
>
> On Thu, Oct 31, 2019 at 10:14 AM Yang Wang <da...@gmail.com> wrote:
>
> Thanks for tison starting this exciting discussion. We also suffer a lot
> from the per job mode.
>
> I think the per-job cluster is a dedicated cluster for only one job and
> will not accept more other
>
> jobs. It has the advantage of one-step submission, do not need to start
> dispatcher first and
>
> then submit the job. And it does not matter where the job graph is
> generated and job is submitted.
>
> Now we have two cases.
>
>
> (1) Current Yarn detached cluster. The job graph is generated in client
> and then use distributed
>
> cache to flink master container. And the MiniDispatcher uses
> `FileJobGraphRetrieve` to get it.
>
> The job will be submitted at flink master side.
>
>
> (2) Standalone per job cluster. User jars are already built into image. So
> the job graph will be
>
> generated at flink master side and `ClasspathJobGraphRetriver` is used to
> get it. The job will
>
> also be submitted at flink master side.
>
>
> For the (1) and (2), only one job in user program could be supported. The
> per job means
>
> per job-graph, so it works just as expected.
>
>
>
> Tison suggests to add a new mode "per-program”. The user jar will be
> transferred to flink master
>
> container, and a local client will be started to generate job graph and
> submit job. I think it could
>
> cover all the functionality of current per job, both (1) and (2). Also the
> detach mode and attach
>
> mode could be unified. We do not need to start a session cluster to
> simulate per job for multiple parts.
>
>
> I am in favor of the “per-program” mode. Just two concerns.
> 1. How many users are using multiple jobs in one program?
> 2. Why do not always use session cluster to simulate per job? Maybe
> one-step submission
>
> is a convincing reason.
>
> Best,
>
> Yang
>
>
>
> tison <wa...@gmail.com> 于2019年10月31日周四 上午9:18写道:
>
> Thanks for your attentions!
>
>
>
> @shixiaogangg@gmail.com <sh...@gmail.com>
>
>
>
> Yes correct. We try to avoid jobs affect one another. Also a local
> ClusterClient
>
> in case saves the overhead about retry before leader elected and persist
>
> JobGraph before submission in RestClusterClient as well as the net cost.
>
>
>
> @Paul Lam <pa...@gmail.com>
>
>
>
> 1. Here is already a note[1] about multiple part jobs. I am also confused
> a bit
>
> on this concept at first :-) Things go in similar way if you program
> contains the
>
> only JobGraph so that I think per-program acts like per-job-graph in this
> case
>
> which provides compatibility for many of one job graph program.
>
>
>
> Besides, we have to respect user program which doesn't with current
>
> implementation because we return abruptly when calling env#execute which
>
> hijack user control so that they cannot deal with the job result or the
> future of
>
> it. I think this is why we have to add a detach/attach option.
>
>
>
> 2. For compilation part, I think it could be a workaround that you upload
> those
>
> resources in a commonly known address such as HDFS so that compilation
>
> can read from either client or cluster.
>
>
>
> Best,
>
> tison.
>
>
>
> [1]
> https://issues.apache.org/jira/browse/FLINK-14051?focusedCommentId=16927430&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16927430
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D14051-3FfocusedCommentId-3D16927430-26page-3Dcom.atlassian.jira.plugin.system.issuetabpanels-253Acomment-2Dtabpanel-23comment-2D16927430&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=vus_2CMQfE0wKmJ4Q_gOWWsBmKlgzMeEwtqShIeKvak&m=yc-Yzv-tHE6HrxNokngJS1rc9d43qyH8bA63kBsSj-Y&s=lZq8trXN1U301YYMxXKDXySRlDfsl8ewJNhDkYEegWw&e=>
>
>
>
>
>
> Newport, Billy <Bi...@gs.com> 于2019年10月30日周三 下午10:41写道:
>
> We execute multiple job graphs routinely because we cannot submit a single
> graph without it blowing up. I believe Regina spoke to this in Berlin
> during her talk. We instead if we are processing a database ingestion with
> 200 tables in it, we do a job graph per table rather than a single job
> graph that does all tables instead. A single job graph can be in the tens
> of thousands of nodes in our largest cases and we have found flink (as os
> 1.3/1.6.4) cannot handle graphs of that size. We’re currently testing 1.9.1
> but have not retested the large graph scenario.
>
>
>
> Billy
>
>
>
>
>
> *From:* Paul Lam [mailto:paullin3280@gmail.com]
> *Sent:* Wednesday, October 30, 2019 8:41 AM
> *To:* SHI Xiaogang
> *Cc:* tison; dev; user
> *Subject:* Re: [DISCUSS] Semantic and implementation of per-job mode
>
>
>
> Hi,
>
>
>
> Thanks for starting the discussion.
>
>
>
> WRT the per-job semantic, it looks natural to me that per-job means
> per-job-graph,
>
> because in my understanding JobGraph is the representation of a job. Could
> you
>
> share some use case in which a user program should contain multiple job
> graphs?
>
>
>
> WRT the per-program mode, I’m also in flavor of a unified cluster-side
> execution
>
> for user program, so +1 from my side.
>
>
>
> But I think there may be some values for the current per-job mode: we now
> have
>
> some common resources available on the client machine that would be read
> by main
>
> methods in user programs. If migrated to per-program mode, we must
> explicitly
>
> set the specific resources for each user program and ship them to the
> cluster,
>
> it would be a bit inconvenient.  Also, as the job graph is compiled at the
> client,
>
> we can recognize the errors caused by user code before starting the
> cluster
>
> and easily get access to the logs.
>
>
>
> Best,
>
> Paul Lam
>
>
>
> 在 2019年10月30日,16:22,SHI Xiaogang <sh...@gmail.com> 写道:
>
>
>
> Hi
>
>
>
> Thanks for bringing this.
>
>
>
> The design looks very nice to me in that
>
> 1. In the new per-job mode, we don't need to compile user programs in the
> client and can directly run user programs with user jars. That way, it's
> easier for resource isolation in multi-tenant platforms and is much safer.
>
> 2. The execution of user programs can be unified in session and per-job
> modes. In session mode, user jobs are submitted via a remote ClusterClient
> while in per-job mode user jobs are submitted via a local ClusterClient.
>
>
>
> Regards,
>
> Xiaogang
>
>
>
> tison <wa...@gmail.com> 于2019年10月30日周三 下午3:30写道:
>
> (CC user list because I think users may have ideas on how per-job mode
> should look like)
>
>
>
> Hi all,
>
> In the discussion about Flink on k8s[1] we encounter a problem that
> opinions
> diverge in how so-called per-job mode works. This thread is aimed at
> stating
> a dedicated discussion about per-job semantic and how to implement it.
>
> **The AS IS per-job mode**
>
> * in standalone deployment, we bundle user jar with Flink jar, retrieve
> JobGraph which is the very first JobGraph from user program in classpath,
> and then start a Dispatcher with this JobGraph preconfigured, which
> launches it as "recovered" job.
>
> * in YARN deployment, we accept submission via CliFrontend, extract
> JobGraph
> which is the very first JobGraph from user program submitted, serialize
> the JobGraph and upload it to YARN as resource, and then when AM starts,
> retrieve the JobGraph as resource and start Dispatcher with this JobGraph
> preconfigured, follows are the same.
>
> Specifically, in order to support multiple parts job, if YARN deployment
> configured as "attached", it starts a SessionCluster, proceeds the progress
> and shutdown the cluster on job finished.
>
> **Motivation**
>
> The implementation mentioned above, however, suffers from problems. The
> major
> two of them are 1. only respect the very first JobGraph from user program
> 2.
> compile job in client side
>
> 1. Only respect the very first JobGraph from user program
>
> There is already issue about this topic[2]. As we extract JobGraph from
> user
> program by hijacking Environment#execute we actually abort any execution
> after the first call to #execute. Besides it surprises users many times
> that
> any logic they write in the program is possibly never executed, here the
> problem is that the semantic of "job" from Flink perspective. I'd like to
> say
> in current implementation "per-job" is actually "per-job-graph". However,
> in practices since we support jar submission it is "per-program" semantic
> wanted.
>
> 2. Compile job in client side
>
> Well, standalone deployment is not in the case. But in YARN deployment, we
> compile job and get JobGraph in client side, and then upload it to YARN.
> This approach, however, somehow breaks isolation. We have observed that
> user
> program contains exception handling logic which call System.exit in main
> method, which causes a compilation of the job exit the whole client at
> once.
> It is a critical problem if we manage multiple Flink job in a unique
> platform.
> In this case, it shut down the whole service.
>
> Besides there are many times I was asked why per-job mode doesn't run
> "just like" session mode but with a dedicated cluster. It might imply that
> current implementation mismatches users' demand.
>
> **Proposal**
>
> In order to provide a "per-program" semantic mode which acts "just like"
> session
> mode but with a dedicated cluster, I propose a workflow as below. It acts
> like
> starting a drive on cluster but is not a general driver solution as
> proposed
> here[3], the main purpose of the workflow below is for providing a
> "per-program"
> semantic mode.
>
> *From CliFrontend*
>
> 1. CliFrontend receives submission, gathers all configuration and starts a
> corresponding ClusterDescriptor.
>
> 2. ClusterDescriptor deploys a cluster with main class
> ProgramClusterEntrypoint
> while shipping resources including user program.
>
> 3. ProgramClusterEntrypoint#main contains logic starting components
> including
> Standalone Dispatcher, configuring user program to start a
> RpcClusterClient,
> and then invoking main method of user program.
>
> 4. RpcClusterClient acts like MiniClusterClient which is able to submit the
> JobGraph after leader elected so that we don't fallback to round-robin or
> fail submission due to no leader.
>
> 5. Whether or not deliver job result depends on user program logic, since
> we
> can already get a JobClient from execute. ProgramClusterEntrypoint exits on
> user program exits and all jobs submitted globally terminate.
>
> This way fits in the direction of FLIP-73 because strategy starting a
> RpcClusterClient can be regarded as a special Executor. After
> ProgramClusterEntrypoint#main starts a Cluster, it generates and passes
> configuration to
> user program so that when Executor generated, it knows to use a
> RpcClusterClient
> for submission and the address of Dispatcher.
>
> **Compatibility**
>
> In my opinion this mode can be totally an add-on to current codebase. We
> actually don't replace current per-job mode with so-called "per-program"
> mode.
> It happens that current per-job mode would be useless if we have such
> "per-program" mode so that we possibly deprecate it for preferring the
> other.
>
> I'm glad to discuss more into details if you're interested in, but let's
> say
> we'd better first reach a consensus on the overall design :-)
>
> Looking forward to your reply!
>
> Best,
> tison.
>
> [1] https://issues.apache.org/jira/browse/FLINK-9953
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D9953&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=p428wH8eWmBwyjHaE0vClbGi51CQxgjJ6Js3X9Kyr04&e=>
> [2] https://issues.apache.org/jira/browse/FLINK-10879
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_FLINK-2D10879&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=mEzfvloedca1XW6pqI9LrR--IKhrkg-YmFMXRULqVSQ&e=>
> [3]
> https://docs.google.com/document/d/1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY/edit#
> <https://urldefense.proofpoint.com/v2/url?u=https-3A__docs.google.com_document_d_1dJnDOgRae9FzoBzXcsGoutGB1YjTi1iqG6d1Ht61EnY_edit-23&d=DwMFoQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=rlkM70D3djmDN7dGPzzbVKG26ShcTFDMKlX5AWucE5Q&m=fT0zUrRT-N5XEE85dO3q03TkGf3bN1V3el5frnzSQsg&s=XNVcSV52D3KneNkZgP7tgo9Y4uBm0jsN0RfYaelP7JM&e=>
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>