You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/04/19 16:25:42 UTC

[jira] [Commented] (FLINK-6046) Add support for oversized messages during deployment

    [ https://issues.apache.org/jira/browse/FLINK-6046?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15975005#comment-15975005 ] 

ASF GitHub Bot commented on FLINK-6046:
---------------------------------------

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/3742

    [FLINK-6046] Add support for oversized messages during deployment

    (builds upon #3512)
    
    This adds offloading of large data from the `TaskDeploymentDescriptor` to the `BlobServer`, i.e. `serializedJobInformation` and `serializedTaskInformation`, in case they are larger than the new `akka.rpc.offload.minsize` config parameter. Both fields are offloaded only once for all parallel instances which does not only frees use from the akka message size restriction but should also speed up massive deployments by leveraging any distributed file system used for the blob server.
    
    Future extensions may keep this information stored in the blob store (and not clean up immediately after a job finishes in `BlobLibraryCacheManager`) and use the same blobs for re-deployments, e.g. in case of failures.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-6046

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3742.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3742
    
----
commit 946976d115e604379ec4d048a76108821126dfaf
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-20T15:49:57Z

    [FLINK-6008][docs] minor improvements in the BlobService docs

commit a726c535dd593f161e460a0a9fb3ade184878c85
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-20T17:27:13Z

    [FLINK-6008] refactor BlobCache#getURL() for cleaner code

commit 4cebddb234611bcbac48f8a60f5bbf9d19b3f949
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-20T17:52:19Z

    [FLINK-6008] extend the BlobService to the NAME_ADDRESSABLE blobs
    
    These blobs are referenced by the job ID and a selected name instead of the
    hash sum of the blob's contents. Some code was already prepared but lacked
    the proper additions in further APIs. This commit adds some.

commit 4be247778a81227477889c4067747f6b3c882755
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-21T15:23:29Z

    [FLINK-6008] promote BlobStore#deleteAll(JobID) to the BlobService

commit 382de74a3bbcc1280f79632d54bd7c67f3eaf36d
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-21T16:59:27Z

    [FLINK-6008] properly remove NAME_ADDRESSABLE blobs after job/task termination

commit b94556e530dfa361455846b7f9fb5136b9e4eebf
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-01-06T17:42:58Z

    [FLINK-6008][docs] update some config options to the new, non-deprecated ones

commit 0d2e4a77ac43b875d3348186b10188cba6c9b5b6
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-09T17:14:02Z

    [FLINK-6008] use Preconditions.checkArgument in BlobClient

commit 84d4970ed36484aec2b28c0dd981c4b06c0efbb7
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-09T17:15:08Z

    [FLINK-6008] do not fail the BlobServer when delete fails
    
    This also enables us to reuse some more code between BlobServerConnection and
    BlobServer.

commit b5ed8a4bdaec5b9eb07584a7f3b60bb0efb7deda
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-09T17:32:14Z

    [FLINK-6008] refactor BlobCache#deleteGlobal() for cleaner code

commit 3bfa07613aa0f586679f1cfc65356a087b4afbd1
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-09T18:14:52Z

    [FLINK-6008] more unit tests for NAME_ADDRESSABLE and BlobService access
    
    NAME_ADDRESSABLE blobs were not that thouroughly tested before and also the
    access methods that the BlobService implementations provide. This adds tests
    covering both.

commit e1bf62d311ef0ddc7fa3b1bdafbce3dcd9f7b978
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-17T15:21:40Z

    [FLINK-6008] fix concurrent job directory creation
    
    also add according unit tests

commit 78d0948ef0929adabd7fb8208a76ad4bb60843fc
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-17T17:27:38Z

    [FLINK-6008] address some of the PR comments by @StephanEwen

commit 123db0e94c2353df037e9bedcb9a3644720165d0
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-18T14:37:37Z

    [FLINK-6008] some comments about BlobLibraryCacheManager cleanup

commit 9dccc0c041149f335f79bc3adcc3a0f7ca811364
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-19T13:39:03Z

    [hotfix] minor typos

commit e7377e2cab3f3e66db1db55c91717ccc570cee65
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-19T13:40:25Z

    [FLINK-6008] add retrieval and proper cleanup of name-addressable blobs at the BlobLibraryCacheManager

commit 2921c747964c2fb335ab9d494c5e42a131eff637
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-19T14:10:16Z

    [FLINK-6008] further cleanup tests for BlobLibraryCacheManager

commit 7616418dd26efe977cbc9cd6914d5cc7d9a95562
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-04-19T14:16:40Z

    [FLINK-6008] remove the exposal of the undelying blob service in LibraryCacheManager
    
    This may actually change in future.

commit 0555d054c131ddf48373d25f82899e1d33e3b89f
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-21T14:45:07Z

    [hotfix] fix wrong LOG.info call in Execution#transitionState()

commit a1b8c6c8ebf70d1c305658842880357607f01d5f
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2016-12-20T11:34:44Z

    [FLINK-6046] offload large data from the TaskDeploymentDescriptor to the BlobServer
    
    This only includes potentially big parts, i.e. serializedJobInformation and
    serializedTaskInformation, which are both offloaded only once for all parallel
    instances.

commit 4c8707c869491e4506eae00c3808590452e5b0f0
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-15T18:08:04Z

    [FLINK-6046] add a configurable akka.rpc.offload.minsize threshold for large data
    
    Larger blobs may be offloaded to the BLOB server, starting with the job and task
    information for the TaskDeploymentDescriptor.

commit 64f9be17e1f00f14bde320d6a8c5d1a090259657
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-16T10:21:32Z

    [FLINK-6046] additional tests for deployments with offloaded data

commit adaf83b10095415ef11715e8cd163de0ae937eff
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-16T15:44:10Z

    [FLINK-6046] do not overwrite existing offloaded job and task info files
    
    This will speed up recovery since file upload is unnecessary in this case.

commit b9635c994221be2a7d6e62ca53f8400f56060c3e
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-16T16:14:10Z

    [FLINK-6046] add tests for BlobServer#putObject()

commit 23047b419b5699b3b9fde5c7d455940f82588595
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-17T15:40:54Z

    [FLINK-6046] fix concurrent calls to BlobServer#putObject()
    
    A race condition for the final move of the incomingFile was possible, especially
    since all concurrent calls used the same file to write to. Using a unique
    temporary file solves this race.
    -> also add according unit tests
    
    Note that it is still possible (with any use of com.google.common.io.Files#move)
    that a failing rename may delete the target file since it falls back to copying
    the file and if removing the original file fails, it tries to remove the target,
    too.

commit 1b5c319a20799490064abc1dc40df440931a9b5e
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-03-17T17:30:44Z

    [FLINK-6046] add a concurrency note/TODO in ExecutionJobVertex#getSerializedTaskInformation()

----


> Add support for oversized messages during deployment
> ----------------------------------------------------
>
>                 Key: FLINK-6046
>                 URL: https://issues.apache.org/jira/browse/FLINK-6046
>             Project: Flink
>          Issue Type: New Feature
>          Components: Distributed Coordination
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>
> This is the non-FLIP6 version of FLINK-4346, restricted to deployment messages:
> Currently, messages larger than the maximum Akka Framesize cause an error when being transported. We should add a way to pass messages that are larger than {{akka.framesize}} as may happen for task deployments via the {{TaskDeploymentDescriptor}}.
> We should use the {{BlobServer}} to offload big data items (if possible) and make use of any potential distributed file system behind. This way, not only do we avoid the akka framesize restriction, but may also be able to speed up deployment.
> I suggest the following changes:
>   - the sender, i.e. the {{Execution}} class, tries to store the serialized job information and serialized task information (if oversized) from the {{TaskDeploymentDescriptor}} (tdd) on the {{BlobServer}} as a single {{NAME_ADDRESSABLE}} blob under its job ID (if this does not work, we send the whole tdd as usual via akka)
>   - if stored in a blob, these data items are removed from the tdd
>   - the receiver, i.e. the {{TaskManager}} class, tries to retrieve any offloaded data after receiving the {{TaskDeploymentDescriptor}} from akka; it re-assembles the original tdd
>   - the stored blob may be deleted after re-assembly of the tdd
> Further (future) changes may include:
>   - separating the serialized job information and serialized task information into two files and re-use the first one for all tasks
>   - not re-deploying these two during job recovery (if possible)
>   - then, as all other {{NAME_ADDRESSABLE}} blobs, these offloaded blobs may be removed when the job enters a final state instead



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)