You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhijiangW <gi...@git.apache.org> on 2017/09/27 14:47:06 UTC

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

GitHub user zhijiangW opened a pull request:

    https://github.com/apache/flink/pull/4735

    [FLINK-7699][core] Define the BufferListener interface to replace EventlListener in BufferProvider

    ## What is the purpose of the change
    
    *Currently the EventListener is used in BufferProvider to be notified of buffer available or destroyed pool.*
    
    *To be semantic clearly, we define a new BufferListener interface which can opt for a one-time only notification or to be notified repeatedly. And we can also notify the pool destroyed via explicitly method notifyBufferDestroyed.*
    
    This RP is based on #4499 whose commit is also included for passing travis. Review the third commit for this PR change.
    
    ## Brief change log
    
      - *The `RemoteInputChannel` will implement the `BufferListener` to wait for floating buffers from `BufferProvider`.*
    
    ## Verifying this change
    
    This change is already covered by existing tests, such as *LocalBufferPoolTest*.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhijiangW/flink FLINK-7699

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4735.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4735
    
----

----


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141393650
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---
    @@ -87,6 +88,12 @@
     	/** The number of available buffers that have not been announced to the producer yet. */
     	private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
     
    +	/** The number of unsent buffers in the producer's sub partition. */
    +	private final AtomicInteger senderBacklog = new AtomicInteger(0);
    +
    +	/** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */
    +	private final AtomicBoolean isWaitingFloatingBuffers = new AtomicBoolean(false);
    --- End diff --
    
    but in general, do we need this field? For now, I only see this being used to verify correct use and I'm not sure whether the costs verify this. Also, nothing in this PR sets it to `true`.


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141387553
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---
    @@ -313,6 +320,49 @@ public int getNumberOfAvailableBuffers() {
     		}
     	}
     
    +	/**
    +	 * Buffer pool notifies this channel of an available floating buffer. If the channel is released or not
    +	 * needing extra buffers currently, the buffer should be recycled to buffer pool. Otherwise, the buffer
    +	 * will be added into the available queue and the unannounced credit is increased by one.
    +	 *
    +	 * @param buffer Buffer that becomes available in buffer pool.
    +	 * @return True when this channel is waiting for more floating buffers, otherwise false.
    +	 */
    +	@Override
    +	public boolean notifyBufferAvailable(Buffer buffer) {
    +		checkState(isWaitingFloatingBuffers.get(), "This channel should be waiting for floating buffers currently.");
    --- End diff --
    
    `"This channel should be waiting for floating buffers."`


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141659141
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java ---
    @@ -209,31 +209,30 @@ public void testSetLessThanRequiredNumBuffers() throws IOException {
     	// ------------------------------------------------------------------------
     
     	@Test
    -	public void testPendingRequestWithListenerAfterRecycle() throws Exception {
    -		BufferListener listener = spy(new BufferListener() {
    -			@Override
    -			public boolean notifyBufferAvailable(Buffer buffer) {
    -				buffer.recycle();
    -				return false;
    -			}
    +	public void testPendingRequestWithListenersAfterRecycle() throws Exception {
    +		BufferListener twoTimesListener = createBufferListener(2);
    +		BufferListener oneTimeListener = createBufferListener(1);
     
    -			@Override
    -			public void notifyBufferDestroyed() {
    -			}
    -		});
    +		localBufferPool.setNumBuffers(2);
     
    -		localBufferPool.setNumBuffers(1);
    +		Buffer available1 = localBufferPool.requestBuffer();
    +		Buffer available2 = localBufferPool.requestBuffer();
     
    -		Buffer available = localBufferPool.requestBuffer();
    -		Buffer unavailable = localBufferPool.requestBuffer();
    +		assertNull(localBufferPool.requestBuffer());
     
    -		assertNull(unavailable);
    +		assertTrue(localBufferPool.addBufferListener(twoTimesListener));
    +		assertTrue(localBufferPool.addBufferListener(oneTimeListener));
     
    -		assertTrue(localBufferPool.addBufferListener(listener));
    +		// Recycle the first buffer to notify both of the above listeners and the
    +		// <<tt>twoTimesListener</tt> will be added into the <<tt>registeredListeners</tt>
    --- End diff --
    
    you don't need the HTML tags here (I suggested them only for the Javadoc - some use `{@code }` instead), please remove them here


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141388702
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java ---
    @@ -211,10 +210,15 @@ public void testSetLessThanRequiredNumBuffers() throws IOException {
     
     	@Test
     	public void testPendingRequestWithListenerAfterRecycle() throws Exception {
    -		EventListener<Buffer> listener = spy(new EventListener<Buffer>() {
    +		BufferListener listener = spy(new BufferListener() {
     			@Override
    -			public void onEvent(Buffer buffer) {
    +			public boolean notifyBufferAvailable(Buffer buffer) {
     				buffer.recycle();
    +				return false;
    --- End diff --
    
    can you also add a test which verifies that requests for multiple buffers, i.e. returning `true` here, works?


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141660332
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java ---
    @@ -209,31 +209,30 @@ public void testSetLessThanRequiredNumBuffers() throws IOException {
     	// ------------------------------------------------------------------------
     
     	@Test
    -	public void testPendingRequestWithListenerAfterRecycle() throws Exception {
    -		BufferListener listener = spy(new BufferListener() {
    -			@Override
    -			public boolean notifyBufferAvailable(Buffer buffer) {
    -				buffer.recycle();
    -				return false;
    -			}
    +	public void testPendingRequestWithListenersAfterRecycle() throws Exception {
    +		BufferListener twoTimesListener = createBufferListener(2);
    +		BufferListener oneTimeListener = createBufferListener(1);
     
    -			@Override
    -			public void notifyBufferDestroyed() {
    -			}
    -		});
    +		localBufferPool.setNumBuffers(2);
     
    -		localBufferPool.setNumBuffers(1);
    +		Buffer available1 = localBufferPool.requestBuffer();
    +		Buffer available2 = localBufferPool.requestBuffer();
     
    -		Buffer available = localBufferPool.requestBuffer();
    -		Buffer unavailable = localBufferPool.requestBuffer();
    +		assertNull(localBufferPool.requestBuffer());
     
    -		assertNull(unavailable);
    +		assertTrue(localBufferPool.addBufferListener(twoTimesListener));
    +		assertTrue(localBufferPool.addBufferListener(oneTimeListener));
     
    -		assertTrue(localBufferPool.addBufferListener(listener));
    +		// Recycle the first buffer to notify both of the above listeners and the
    +		// <<tt>twoTimesListener</tt> will be added into the <<tt>registeredListeners</tt>
    +		// queue of buffer pool again
    +		available1.recycle();
    --- End diff --
    
    can you also verify the notification right after this `recycle()` call? By adding:
    `verify(oneTimeListener, times(1)).notifyBufferAvailable(any(Buffer.class));`


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141657809
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---
    @@ -87,6 +88,12 @@
     	/** The number of available buffers that have not been announced to the producer yet. */
     	private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
     
    +	/** The number of unsent buffers in the producer's sub partition. */
    +	private final AtomicInteger senderBacklog = new AtomicInteger(0);
    +
    +	/** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */
    +	private final AtomicBoolean isWaitingFloatingBuffers = new AtomicBoolean(false);
    --- End diff --
    
    ok, fine - so I'll see more details in the next PR and we get this in here


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141392162
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---
    @@ -313,6 +320,49 @@ public int getNumberOfAvailableBuffers() {
     		}
     	}
     
    +	/**
    +	 * Buffer pool notifies this channel of an available floating buffer. If the channel is released or not
    +	 * needing extra buffers currently, the buffer should be recycled to buffer pool. Otherwise, the buffer
    +	 * will be added into the available queue and the unannounced credit is increased by one.
    --- End diff --
    
    `The Buffer pool notifies this channel of an available floating buffer. If the channel is released or currently does not need extra buffers, the buffer should be recycled to the buffer pool. Otherwise, the buffer will be added into the <tt>availableBuffers</tt> queue and the unannounced credit is increased by one.`


---

[GitHub] flink issue #4735: [FLINK-7699][core] Define the BufferListener interface to...

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/4735
  
    merging.


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141660623
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java ---
    @@ -400,6 +399,23 @@ private int getNumRequestedFromMemorySegmentPool() {
     		return networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments();
     	}
     
    +	private BufferListener createBufferListener(int notificationTimes) {
    +		return spy(new BufferListener() {
    +			int times = 0;
    +
    +			@Override
    +			public boolean notifyBufferAvailable(Buffer buffer) {
    +				times++;
    +				buffer.recycle();
    --- End diff --
    
    nice trick with the `recycle()` call here leading to another invocation or the release of the buffer


---

[GitHub] flink issue #4735: [FLINK-7699][core] Define the BufferListener interface to...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4735
  
    @NicoK , I have submitted the updates based on your comments!


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4735


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141392529
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---
    @@ -87,6 +88,12 @@
     	/** The number of available buffers that have not been announced to the producer yet. */
     	private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
     
    +	/** The number of unsent buffers in the producer's sub partition. */
    +	private final AtomicInteger senderBacklog = new AtomicInteger(0);
    +
    +	/** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */
    +	private final AtomicBoolean isWaitingFloatingBuffers = new AtomicBoolean(false);
    --- End diff --
    
    better: `isWaitingForFloatingBuffers`


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141398906
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java ---
    @@ -87,6 +88,12 @@
     	/** The number of available buffers that have not been announced to the producer yet. */
     	private final AtomicInteger unannouncedCredit = new AtomicInteger(0);
     
    +	/** The number of unsent buffers in the producer's sub partition. */
    +	private final AtomicInteger senderBacklog = new AtomicInteger(0);
    +
    +	/** The tag indicates whether this channel is waiting for additional floating buffers from the buffer pool. */
    +	private final AtomicBoolean isWaitingFloatingBuffers = new AtomicBoolean(false);
    --- End diff --
    
    These two fields are currently used in `notifyBufferAvailable()` logic, so we have to define them in this PR.
    In next PR #4509 , `isWaitingForFloatingBuffers` field will be set  true by two conditions:
    1. The number of current available buffers is less than sender backlog
    2. There are no available floating buffers in `BufferProvider`
    And this field also used to avoid register listener in `BufferProvider` multi times.
    
    It may confuse you to only see this PR change, sorry for that.


---

[GitHub] flink pull request #4735: [FLINK-7699][core] Define the BufferListener inter...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4735#discussion_r141400676
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java ---
    @@ -211,10 +210,15 @@ public void testSetLessThanRequiredNumBuffers() throws IOException {
     
     	@Test
     	public void testPendingRequestWithListenerAfterRecycle() throws Exception {
    -		EventListener<Buffer> listener = spy(new EventListener<Buffer>() {
    +		BufferListener listener = spy(new BufferListener() {
     			@Override
    -			public void onEvent(Buffer buffer) {
    +			public boolean notifyBufferAvailable(Buffer buffer) {
     				buffer.recycle();
    +				return false;
    --- End diff --
    
    Yes, I also consider the issue of verifying notification multi times. 
    And I guess you may point out it. :)
    It can not work to return true for that, and I will think of proper way of verifying it.


---

[GitHub] flink issue #4735: [FLINK-7699][core] Define the BufferListener interface to...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4735
  
    Already removed the tags and added the `verify`


---