You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/08/29 12:43:30 UTC

[GitHub] flink pull request #4615: [FLINK-7518][network] pass our own NetworkBuffer t...

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/4615

    [FLINK-7518][network] pass our own NetworkBuffer to Netty

    ## What is the purpose of the change
    
    With this PR, based on #4613, we finally pass our own `NetworkBuffer` class to Netty and remove one buffer copy while transferring data. Note that this applies to the sender side only.
    
    ## Brief change log
    
    - extend `NettyMessage#allocateBuffer()` to allow allocation for the header only
    - extend `NettyMessage.BufferResponse` to assemble a composite buffer based on a (pooled) header buffer and our `NetworkBuffer` instance.
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
    - existing `NettyMessageSerializationTest` for the immediate encode-decode path of the new buffer
    - any other (integration) test that uses the network stack for the full stack with something else than an `EmbeddedChannel`
    - manually verified a streaming program (WordCount) on a 4 node local setup (different processes) with 1 JobManager and 4 TaskManagers
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-7518

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4615.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4615
    
----
commit 2ae08d79712235a965db45ee739076cd6a3601fa
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-07-31T10:06:14Z

    [hotfix] fix some typos

commit cda26a0d8e6d07c48ac03ee4aab74c8699a04428
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-02T09:35:16Z

    [hotfix][tests] add missing test descriptions

commit 3b921d60c1ff969874363c75916a1d40fcc99847
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-02T09:34:54Z

    [FLINK-7310][core] always use the HybridMemorySegment
    
    Since we'd like to use our own off-heap buffers for network communication, we
    cannot use HeapMemorySegment anymore and need to rely on HybridMemorySegment.
    We thus drop any code that loads the HeapMemorySegment (it is still available
    if needed) in favour of the HybridMemorySegment which is able to work on both
    heap and off-heap memory.
    
    For the performance penalty of this change compared to using HeapMemorySegment
    alone, see this interesting blob article (from 2015):
    https://flink.apache.org/news/2015/09/16/off-heap-memory.html

commit 3bdd01454dae9eafbd220a5d5554d402e12b8d9f
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-02T09:27:49Z

    [hotfix][core] add additional final methods in final classes
    
    This applies the scheme of HeapMemorySegment to HybridMemorySegment where core
    methods are also marked "final" to be more future-proof.

commit 1f33ec0df5b83135256538132b0de58c3bd86402
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:15:32Z

    [FLINK-7312][checkstyle] remove trailing whitespace

commit 679793f478a3f79c61dec9d5c424c748e2a5d6ed
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:20:28Z

    [FLINK-7312][checkstyle] organise imports

commit 6fe487a2e929fe3aaf1d6a1d5ef3070d6263caad
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:24:16Z

    [FLINK-7312][checkstyle] add, adapt and improve comments

commit d4b77dc006f833b08ebf5e6324cfc53ca754c254
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:26:40Z

    [FLINK-7312][checkstyle] remove redundant "public" keyword in interfaces

commit 2ce3703c41161a00c7e749f45f11f654e3183e52
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:27:36Z

    [FLINK-7312][checkstyle] ignore some spurious warnings

commit 987f8a41c034b39d14b5c00d6ecc91ef3c157c62
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:35:15Z

    [FLINK-7312][checkstyle] enable checkstyle for `flink/core/memory/*`
    
    We deliberately ignore redundant modifiers for now since we want `final`
    modifiers on `final` classes for increased future-proofness.

commit 6ce7b17f6c645a1a1ec136a307ce83f02b21eb7f
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:35:15Z

    [FLINK-7400][cluster] fix cut-off memory not used for off-heap reserve as intended
    
    + fix description of `containerized.heap-cutoff-ratio`

commit cda9f0b9aab154d12315c09f22f5dbd8da791f72
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-09T14:16:31Z

    [FLINK-7400][yarn] add an integration test for yarn container memory restrictions using off-heap memory

commit c3648e1c7486cb40a7948b7e2449b4fb82524a60
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-01T11:24:00Z

    [FLINK-7316][network] always use off-heap network buffers
    
    This is another step at using or own (off-heap) buffers for network
    communication that we pass through netty in order to avoid unnecessary buffer
    copies.

commit f68d42dcd6920a7439d9861c40604e5c8755eeba
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-04T13:59:48Z

    [FLINK-7316][docs] add a note of network buffers always being off-heap

commit 228ca0a857b310db4e89e41e73fee57702394524
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-08T15:39:31Z

    [FLINK-7316][network] remove a dead code path and adapt a unit test still relying on it
    
    - remove the code path for `offHeapSize == -1` which does not exist anymore
    - just in case, make sure by adding some additional checks
    - fix a unit test not accomodating for this (introduced by the new off-heap network buffers)

commit 38eec713b1e474f52031751d8423eca2971949cb
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-10T09:49:11Z

    [FLINK-7316][tests] fix `ContaineredTaskManagerParametersTest` to include the cutoff

commit d015293e11b1fc894e7bc665c2e9aa544f0aaaad
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-10T10:42:09Z

    [FLINK-7316][network] partly revert a wrong change to TaskManagerServices#createMemoryManager()
    
    We still need the distinction between on-heap and off-heap there if fractions
    are used because the memory manager's size is based on different values in the
    two cases.

commit 52560d94e3df42499d62b41919b6102a9f57b58b
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-10T12:16:04Z

    [FLINK-7316][tests] further adapt YARNSessionCapacitySchedulerITCase to the changed memory settings

commit 2a43a05f67f987fbc47ca5c73941ff6107033a23
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-10T12:22:15Z

    [hotfix] adapt YARNSessionCapacitySchedulerITCase#testDetachedPerJobYarnCluster***() tests
    
    From the comments they should also test that a changed `yarn.heap-cutoff-ratio`
    is passed correctly but the value used (50%) is too low for 1024MB task manager
    memory limit to be larger than the `containerized.heap-cutoff-min` of 600MB.
    
    This increases it to 70% and adapts the expected memory settings.

commit 341acfba84515640d92dc0aa5e6ad1fe3ec4ffc8
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-07T15:38:36Z

    [FLINK-7411][network] minor (performance) improvements in NettyMessage
    
    * use a switch rather than multiple if conditions
    * use static `readFrom` methods to create instances of the message sub types

commit 5b16d3c36f6b6073af5b90d221089ddf29e17f5a
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-07T16:12:28Z

    [FLINK-7412][network] optimise NettyMessage.TaskEventRequest#readFrom() to read from netty buffers directly

commit dc1d29871040828108bc793ee90f7a867d9f40de
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-10T14:58:19Z

    [FLINK-7427][network] integrate PartitionRequestProtocol into NettyProtocol
    
    - removes one level of (unneeded) abstraction for clarity

commit 2ce22de5e288dab534a6cb5e0eb8edd3cb163619
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-24T10:17:08Z

    [FLINK-7499][io] also let AsynchronousBufferFileWriter#writeBlock() recycle the buffer in case of failures
    
    This fixes a double-recycle in SpillableSubpartitionView and also makes sure
    that even if adding the (asynchronous) write operation fails, the buffer is
    properly freed in code that did not perform this cleanup. It avoids code
    duplication of this cleanup and it is also more consistent to take over
    responsibility of the given buffer even if an exception is thrown.

commit 703c11bf5e8a5cfd53b2b878f5b66768ba718902
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-24T14:49:46Z

    [FLINK-7513][tests] remove TestBufferFactory#MOCK_BUFFER
    
    This static buffer did not allow proper reference counting and we should rather
    create test buffers in the tests which may also be released afterwards.

commit a9e85f4c1c4a2e64d7209c42d6aba73d78026de4
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-24T15:14:38Z

    [FLINK-7515][network] allow actual 0-length content in NettyMessage#allocateBuffer()
    
    Previously, length "0" meant "unknown content length" but there are cases where
    the actual length is 0 and so we use -1 for tagging the special case now.

commit b82cee16ccb36c29e0673a2b3c00dbc412ed08e8
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-25T07:49:44Z

    [FLINK-7514][tests] fix BackPressureStatsTrackerITCase releasing buffers twice

commit 7a970978aa3f02ca7b557fb4536dfa29374f0a09
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-22T16:33:55Z

    [FLINK-7516][memory] do not allow copies into a read-only ByteBuffer

commit add24bcab9bfde31a6d4e03483188a48196b321b
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-23T10:04:28Z

    [FLINK-7517][network] let NettyBufferPool extend PooledByteBufAllocator
    
    Previously, NettyBufferPool only wrapped PooledByteBufAllocator but then, any
    allocated buffer's alloc() method was returning the wrapped
    PooledByteBufAllocator which allowed heap buffers again. By extending the
    PooledByteBufAllocator, we prevent this loop hole.
    
    This also fixes the invariant that a copy of a buffer should have the same
    allocator.

commit 1e3511f2a921c70749c04b54a591a435cce5c476
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-22T15:45:45Z

    [FLINK-7520][network] let our Buffer class extend from netty's buffer class
    
    For this, use a common (flink) Buffer interface and an implementation
    (NetworkBuffer) that implements netty's buffer methods as well. In the future,
    with this, we are able to avoid unnecessary buffer copies when handing buffers
    over to netty while keeping our MemorySegment logic and configuration.
    
    For the netty-specific part, the NetworkBuffer also requires a ByteBuf allocator
    which is otherwise not needed in out use cases, so if the buffer is handed over
    to netty, it requires a byte buffer allocator to be set.

commit 626fbb4d8ef3287f0442e4652bcf2d0593145945
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-25T10:13:54Z

    [FLINK-7518][network] pass our own NetworkBuffer to netty
    
    This is using a composite buffer to assemble header+content and avoids an
    unnecessary buffer copy from our (Network)Buffer class backed by a MemorySegment
    to Netty's ByteBuf class.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4615: [FLINK-7518][network] pass our own NetworkBuffer t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4615


---