You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/06/11 12:13:00 UTC

[jira] [Commented] (FLINK-9280) Extend JobSubmitHandler to accept jar files

    [ https://issues.apache.org/jira/browse/FLINK-9280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16507950#comment-16507950 ] 

ASF GitHub Bot commented on FLINK-9280:
---------------------------------------

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/6147

    [FLINK-9280][rest] Rework JobSubmitHandler to accept jar/artifact files

    ## What is the purpose of the change
    
    This PR reworks the `JobSubmitHandler` to also accept jar/artifact files. Previously these files had to be uploaded preemptively to the blob-service by the client. With this change the entire job submission goes through REST.
    
    This PR addresses 3 JIRAs in total:
    
    **FLINK-9382**
    Directories given to the blob-service (primarily a use-case for the distributed cache) are currently silently zipped, and later unzipped by the `FileCache`. This tightly coupled the zipping logic in the blob-service to the unzipping logic of the `FileCache`. The blob-service neither unzipped the directory if the blob was requested, nor did it provide any means of doing so manually, nor did it inform the user as to whether the requested blob is a zip or not.
    
    My conclusion in `FLINK-9382` is that the blob-service should not support directories _for now_, and that instead directories for the `distributed cache` should be explicitly zipped beforehand, given that this is the only use-case we have at the moment.
    
    This JIRA is related to FLINK-9280 as the zipping logic was necessary for the upload of directories from the client via REST. Since the server thus receives all artifacts already in zipped form we can them in zipped form to the blob-service , making the blob-service support for directories obsolete.
    
    The zipping is now done in `JobGraph#uploadUserArtifacts` with utilities provided by the `FileCache` class. 
    The unzipping is still done by the `FileCache`. Furthermore, we now no longer delete the zip after processing, as this file is managed by the blob-service.
    
    **FLINK-9500**
    In some cases (I don't know exactly when) it can happen that en empty `LastHttpContent` is sent at the end of a FileUpload. This currently leads to an exception in the `FileUploadHandler` when calling `currentHttpPostRequestDecoder.hasNext()`.
    
    The `LastHttpContent` message is fortunately a singleton, which allows us to easily check for it in the `FileUploadHandler`. If detected we skip the payload processing.
    Note that we still `offer` this content to the encoder, as this part is still handled without exception and appears to follow an expected life-cycle.
    
    This issue was also triggered by FLINK-9280, which now serves as verification for the fix.
    
    **FLINK-9280**
    
    This issue is addressed in 5 commits that must be squashed before a merge.
    
    The commit `Move channel setup into utility method` is a simple refactoring to allow re-using code.
    The commit `Remove BlobServer port handler` removes various classes related to requesting the blobserver port via REST, which is now obsolete.
    The commit `add new constructor for DCEntry` adds another constructor to the `DistributedCacheEntry` class for setting the `isZipped` flag on the client-side. The documentation was also extended to cover the life-cycle of entries for directories.
    
    The last 2 commits contain the actual implementation and are separated by client/server.
    
    The following is an outline of the events after `RestClusterClient#submitJob` has been called:
    * directories registered for the distributed cache are zipped, and dc entries are updated using the newly added constructor
    * the jobgraph, jars and **local** artifacts (dc files) are sent to the Dispatcher by the `RestClient` as a multi-part request
      * the jobgraph is contained in a `JobSubmitRequestBody` and stored as an Attribute
      * each jar/artifact is stored as a separate `FileUpload`
    * the `FileUploadHandler` receives the request and stores the received parts in a `JobSubmitRequestBodyBuffer`. Once the request is fully read the buffer is converted into a proper `JobSubmitRequestBody` and passed to the rest of the pipeline as an attribute. In other words we inject the paths to uploaded jars/artifacts into the submitted `JobSubmitRequestBody`. Unfortunately we are also parsing the original json payload here, which ideally should be done by the handler for consistency.
    * the modified `JobSubmitRequestBody` is read in `AbstractHandler#respondAsLeader`, cast, and passed on in place of the original request
    * The `JobSubmitHandler` modifies the JobGraph to no longer refer to client-local jars/artifacts, in preparation for the job-submission. Jar entries are categorically overridden as jars are always uploaded. Artifacts are only overridden if an uploaded file exists, identified by the file name.
    * jars/artifacts are uploaded to the BlobService; this was previously done in the ClusterClient
    * job is submitted, as before
    
    ## Verifying this change
    
    FLINK-9382 is covered by added tests (see the relevant commit)a and the existing distributed-cache and python E2E tests.
    
    FLINK-9500 is implicitly tested by FLINK-9280.
    
    FLINK-9280:
    * job-submission as a whole is tested by existing E2E tests and `RestClusterClientTest`.
    * changes to the `JobSubmitHandler` are covered in `JobSubmitHandlerTest`.
    
    ## Brief change log
    * extend `RestClient` to support sending jobgraph, jars and artifacts as multipart http request
    * modify `FileUploadHandler` to handler job-submission specific multipart request
    * modify `JobSubmitHandler` to override jar/artifact entries pointing to client-local files to instead point to uploaded files
    * move jar/artifact blob-service upload logic from `RestClusterClient` to `JobSubmitHandler`
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink 9280

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6147.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6147
    
----
commit 45bb07aac047b52675857ab0f18848aa3c20d42d
Author: zentol <ch...@...>
Date:   2018-06-04T11:50:44Z

    [FLINK-9382][dc] Consolidate zipping logic in FileCache

commit 9fdc0d9c47abff314068a8f7c04952ff6775dea9
Author: zentol <ch...@...>
Date:   2018-06-04T12:13:35Z

    [FLINK-9500][rest] Properly handle EmptyLastHttpContent

commit 8cb0de747931c10c60bc756925c697c222201e5b
Author: zentol <ch...@...>
Date:   2018-06-04T12:39:53Z

    [FLINK-9280][rest] Move channel setup into untility method

commit 3418eb6aa65249fed6d1530b6a9699b68da1a7f7
Author: zentol <ch...@...>
Date:   2018-06-11T09:45:12Z

    [FLINK-9280][rest] Remove BlobServer port handler

commit 8917004634f074e48af3024633ab6c5e3a294e4f
Author: zentol <ch...@...>
Date:   2018-06-04T12:00:21Z

    [FLINK-9280][rest] add new constructor for DCEntry

commit 1bea8e6dfee1586c9dcc4e1442e8389359eb075a
Author: zentol <ch...@...>
Date:   2018-06-04T12:55:19Z

    [FLINK-9280][rest] client modifications

commit 42db1bffeb7d8d49f0a0c0280f9c3110b8587dc1
Author: zentol <ch...@...>
Date:   2018-06-04T13:43:11Z

    [FLINK-9280][rest] server modifications

----


> Extend JobSubmitHandler to accept jar files
> -------------------------------------------
>
>                 Key: FLINK-9280
>                 URL: https://issues.apache.org/jira/browse/FLINK-9280
>             Project: Flink
>          Issue Type: New Feature
>          Components: Job-Submission, REST
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Critical
>             Fix For: 1.6.0, 1.5.1
>
>
> The job submission through the CLI first uploads all require jars to the blob server, sets the blob keys in the jobgraph, and then uploads this graph to The {{JobSubmitHandler}} which submits it to the Dispatcher.
> This process has the downside that it requires jars to be uploaded to the blobserver before submitting the job graph, which does not happen via REST.
> I propose an extension to the the {{JobSubmitHandler}} to also accept an optional list of jar files, that were previously uploaded through the {{JarUploadHandler}}. If present, the handler would upload these jars to the blobserver and set the blob keys.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)