You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by NicoK <gi...@git.apache.org> on 2017/08/25 13:04:07 UTC

[GitHub] flink pull request #4594: [FLINK-7517][network] let NettyBufferPool extend P...

GitHub user NicoK opened a pull request:

    https://github.com/apache/flink/pull/4594

    [FLINK-7517][network] let NettyBufferPool extend PooledByteBufAllocator

    ## What is the purpose of the change
    
    `NettyBufferPool` wraps `PooledByteBufAllocator` but due to this, any allocated buffer's `alloc()` method is returning the wrapped `PooledByteBufAllocator` which allows heap buffers again. By extending the `PooledByteBufAllocator` instead, we prevent this loop hole and also fix Netty's invariant that a copy of a buffer should have the same allocator.
    
    ## Brief change log
    
    - change `NettyBufferPool` from wrapping `PooledByteBufAllocator` into extending it
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as `NettyBufferPoolTest` since the behaviour does not change.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (yes)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/NicoK/flink flink-7517

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4594.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4594
    
----
commit ec0db2bedffbe67b9a5e08c577a1b74f74c061ac
Author: Nico Kruber <ni...@data-artisans.com>
Date:   2017-08-23T10:04:28Z

    [FLINK-7517][network] let NettyBufferPool extend PooledByteBufAllocator
    
    Previously, NettyBufferPool only wrapped PooledByteBufAllocator but then, any
    allocated buffer's alloc() method was returning the wrapped
    PooledByteBufAllocator which allowed heap buffers again. By extending the
    PooledByteBufAllocator, we prevent this loop hole.
    
    This also fixes the invariant that a copy of a buffer should have the same
    allocator.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4594: [FLINK-7517][network] let NettyBufferPool extend PooledBy...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4594
  
    This improvement is indeed very clean and reduce many redundant `Override` methods by extending `PooledByteBufAllocator` directly. 
    
    But I am still confused of one thing. In previous way, we created the `PooledByteBufAllocator` with `preferDirect` parameter as true, and the created allocator will be used in client and server. So it seems no chance to allow the heap buffer in original allocator in netty code. Or I missed or mis-understood some key informations?


---

[GitHub] flink issue #4594: [FLINK-7517][network] let NettyBufferPool extend PooledBy...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4594
  
    Yes @zhijiangW, I kept the original logic.
    I guess, the reasoning behind using off-heap netty buffers only was to reduce the overhead before transmitting messages over the wire: 1) we reduce GC overhead somewhat and 2) at some point we need the memory to be off-heap and put into kernel space anyway - depending on netty, this may be optimised if it is already off-heap.
    
    Also, starting with #4481 we will only be using off-heap network buffers anyway.


---

[GitHub] flink pull request #4594: [FLINK-7517][network] let NettyBufferPool extend P...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4594#discussion_r149886365
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java ---
    @@ -52,51 +48,61 @@
     	/** Configured chunk size for the arenas. */
     	private final int chunkSize;
     
    +	/** We strictly prefer direct buffers and disallow heap allocations. */
    +	private static final boolean PREFER_DIRECT = true;
    +
    +	/**
    +	 * Arenas allocate chunks of pageSize << maxOrder bytes. With these defaults, this results in
    +	 * chunks of 16 MB.
    +	 *
    +	 * @see #MAX_ORDER
    +	 */
    +	private static final int PAGE_SIZE = 8192;
    +
    +	/**
    +	 * Arenas allocate chunks of pageSize << maxOrder bytes. With these defaults, this results in
    +	 * chunks of 16 MB.
    +	 *
    +	 * @see #PAGE_SIZE
    +	 */
    +	private static final int MAX_ORDER = 11;
    +
     	/**
     	 * Creates Netty's buffer pool with the specified number of direct arenas.
     	 *
     	 * @param numberOfArenas Number of arenas (recommended: 2 * number of task
     	 *                       slots)
     	 */
     	public NettyBufferPool(int numberOfArenas) {
    +		super(
    +			PREFER_DIRECT,
    +			// No heap arenas, please.
    +			0,
    +			// Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e.
    +			// we allocate numDirectArenas * 16 MB of direct memory. This can grow
    +			// to multiple chunks per arena during runtime, but this should only
    +			// happen with a large amount of connections per task manager. We
    +			// control the memory allocations with low/high watermarks when writing
    +			// to the TCP channels. Chunks are allocated lazily.
    +			numberOfArenas,
    +			PAGE_SIZE,
    +			MAX_ORDER);
    +
     		checkArgument(numberOfArenas >= 1, "Number of arenas");
    --- End diff --
    
    Is it better to checkArgument before call super method?


---

[GitHub] flink pull request #4594: [FLINK-7517][network] let NettyBufferPool extend P...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4594#discussion_r149889426
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java ---
    @@ -52,51 +48,61 @@
     	/** Configured chunk size for the arenas. */
     	private final int chunkSize;
     
    +	/** We strictly prefer direct buffers and disallow heap allocations. */
    +	private static final boolean PREFER_DIRECT = true;
    +
    +	/**
    +	 * Arenas allocate chunks of pageSize << maxOrder bytes. With these defaults, this results in
    +	 * chunks of 16 MB.
    +	 *
    +	 * @see #MAX_ORDER
    +	 */
    +	private static final int PAGE_SIZE = 8192;
    +
    +	/**
    +	 * Arenas allocate chunks of pageSize << maxOrder bytes. With these defaults, this results in
    +	 * chunks of 16 MB.
    +	 *
    +	 * @see #PAGE_SIZE
    +	 */
    +	private static final int MAX_ORDER = 11;
    +
     	/**
     	 * Creates Netty's buffer pool with the specified number of direct arenas.
     	 *
     	 * @param numberOfArenas Number of arenas (recommended: 2 * number of task
     	 *                       slots)
     	 */
     	public NettyBufferPool(int numberOfArenas) {
    +		super(
    +			PREFER_DIRECT,
    +			// No heap arenas, please.
    +			0,
    +			// Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e.
    +			// we allocate numDirectArenas * 16 MB of direct memory. This can grow
    +			// to multiple chunks per arena during runtime, but this should only
    +			// happen with a large amount of connections per task manager. We
    +			// control the memory allocations with low/high watermarks when writing
    +			// to the TCP channels. Chunks are allocated lazily.
    +			numberOfArenas,
    +			PAGE_SIZE,
    +			MAX_ORDER);
    +
     		checkArgument(numberOfArenas >= 1, "Number of arenas");
    --- End diff --
    
    Yes, it would be nice to be able to do so but since this is the constructor, it is not possible. I guess that `super()` may fail itself with an invalid parameter - if not, we fail a bit afterwards.


---

[GitHub] flink pull request #4594: [FLINK-7517][network] let NettyBufferPool extend P...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4594


---