You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhijiangW <gi...@git.apache.org> on 2017/08/16 16:27:09 UTC

[GitHub] flink pull request #4552: [FLINK-7456][network]Implement Netty sender incomi...

GitHub user zhijiangW opened a pull request:

    https://github.com/apache/flink/pull/4552

    [FLINK-7456][network]Implement Netty sender incoming pipeline for credit-based

    ## What is the purpose of the change
    
    This PR is based on #4533 whose commits are also included for passing travis. Review the last commit for this PR change.
    
    On sender side, it maintains credit from receiver's `PartitionRequest` and `AddCredit` messages, then sends buffer based on credit and network capacity. This PR is mainly involved in incoming pipeline logic for credit-based.
    
    ## Brief change log
    
      - *Each subpartition view maintains current credit and a boolean field to mark whether it is already registered available for transfer*
      - *Update current credit in processing `PartitionRequest` and `AddCredit` messages*
      - *The mechanism of enqueue the subpartition view and update the registered status field*
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Added test to verify that current credit is updated correctly and subpartition view is enqueued when received `AddCredit` message*
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhijiangW/flink FLINK-7456

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4552.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4552
    
----
commit e35c1ff8066bf44344495d132a1092b9db3ef182
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-07T09:31:17Z

    [FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for the floating buffers

commit 969c24d3bf80c1ff89ada11e81b9bf4fea14066f
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-14T06:30:47Z

    [FLINK-7394][core]Implement basic InputChannel with free buffers,credit and backlog

commit 15fa828449d73f53042c57e9c5494d75ddee575f
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-10T05:29:13Z

    [FLINK-7406][network]Implement Netty receiver incoming pipeline for credit-based

commit d0674244f15701863a5dd3f68b7274b3bd49c64d
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-12T14:13:25Z

    [FLINK-7416][network] Implement Netty receiver outgoing pipeline for credit-based

commit 6eaff7877ad43eab674e184153365b50ec8e1559
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-16T13:24:53Z

    [FLINK-7456][network]Implement Netty sender incoming pipeline for credit-based

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161573976
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---
    @@ -238,6 +238,7 @@ public void testConsumeSpilledPartition() throws Exception {
     
     		verify(listener, times(1)).notifyBuffersAvailable(eq(4L));
     
    +		assertFalse(reader.nextBufferIsEvent());
    --- End diff --
    
    also test `read.buffer().isBuffer()`?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161450876
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -199,6 +199,19 @@ public boolean isReleased() {
     		}
     	}
     
    +	@Override
    +	public boolean nextBufferIsEvent() {
    +		if (nextBuffer != null) {
    +			return !nextBuffer.isBuffer();
    +		}
    --- End diff --
    
    agree with integration


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161567331
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		// The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available
    +		assertEquals(1, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +
    +		// Flush the buffer to make the channel writable again and see the final results
    +		channel.flush();
    +		assertSame(channelBlockingBuffer, channel.readOutbound());
    +
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(false);
    +		when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +		queue.notifyReaderCreated(reader);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify available buffers to trigger enqueue the reader
    +		final int notifyNumBuffers = 5;
    +		for (int i = 0; i < notifyNumBuffers; i++) {
    +			reader.notifyBuffersAvailable(1);
    +		}
    +
    +		channel.runPendingTasks();
    +
    +		// the reader is not enqueued in the pipeline because no credits are available
    +		// -> it should still have the same number of pending buffers
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
    +		assertFalse(reader.isRegisteredAsAvailable());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +
    +		// Notify available credits to trigger enqueue the reader again
    +		final int notifyNumCredits = 3;
    +		for (int i = 1; i <= notifyNumCredits; i++) {
    +			queue.addCredit(receiverId, 1);
    +
    +			// the reader is enqueued in the pipeline because it has both available buffers and credits
    +			// since the channel is blocked though, we will not process anything and only enqueue the
    +			// reader once
    +			assertTrue(reader.isRegisteredAsAvailable());
    +			assertEquals(1, queue.getAvailableReaders().size());
    +			assertEquals(i, reader.getNumCreditsAvailable());
    +			assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
    +		}
    +
    +		// Flush the buffer to make the channel writable again and see the final results
    +		channel.flush();
    +		assertSame(channelBlockingBuffer, channel.readOutbound());
    +
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +		assertEquals(notifyNumBuffers - notifyNumCredits, reader.getNumBuffersAvailable());
    +		assertFalse(reader.isRegisteredAsAvailable());
    --- End diff --
    
    let's end with `assertNull(channel.readOutbound());`


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    @NicoK , thanks for your reviews! 
    I have submitted all the patches you provided offline to address above issues.
    
    1. Remove `FLINK-8425` from this PR.
    2. Do you think I should add more tests for `nextBufferIsEvent`? Because I already verified that in previous related tests
    3. For adding the switch issue, I found some difficulties to leave messages for you offline. We can further confirm that.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161559926
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = spy(new PartitionRequestQueue());
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
    +		// The reader is enqueued in the pipeline because the next buffer is event, even though no available credits
    +		verify(queue, times(1)).enqueueAvailableReader(reader);
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(false);
    +		when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    --- End diff --
    
    let's remove that mock ...


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r157760482
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---
    @@ -94,6 +98,15 @@ void cancelRequestFor(InputChannelID inputChannelId) {
     		}
     	}
     
    +	void notifyCreditAvailable(final RemoteInputChannel inputChannel) {
    --- End diff --
    
    please re-add the comment here, too


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    one thing which we talked about offline: as a precaution, we should keep the old implementation around and allow the users to basically turn the credit-based flow control algorithm on/off (the accounting for the credits would mostly stay in that case but will simple not be used by the old non-existing flow control)


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161547041
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    --- End diff --
    
    nit: `is an event`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161578518
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java ---
    @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {
     			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
     			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
     			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
    -			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
    +			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
    --- End diff --
    
    Is that also the reason here? I see that otherwise we get into `Insufficient number of network buffers` but it does not look as if it was configured as tightly...
    (just want to rule out some memory leak with the new code)


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160714424
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio
     
     	private void releaseAllResources() throws IOException {
     		SequenceNumberingViewReader reader;
    -		while ((reader = nonEmptyReader.poll()) != null) {
    +		while ((reader = availableReaders.poll()) != null) {
    --- End diff --
    
    Previously, we did not have access to all views but only those with data, but shouldn't we release all views instead? Especially now since there are views with data but with not enough credit.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r159399376
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +93,35 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification form the consumer or receiving
    --- End diff --
    
    `from`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161247589
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---
    @@ -77,6 +92,46 @@ void requestSubpartitionView(
     		}
     	}
     
    +	/**
    +	 * The credits from consumer are added in incremental way.
    +	 *
    +	 * @param creditDeltas The credit deltas
    +	 */
    +	public void addCredit(int creditDeltas) {
    +		numCreditsAvailable += creditDeltas;
    +	}
    +
    +	/**
    +	 * Updates the value to indicate whether the reader is enqueued in the pipeline or not.
    +	 *
    +	 * @param isRegisteredAvailable True if this reader is already enqueued in the pipeline.
    +	 */
    +	public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
    +		this.isRegisteredAvailable = isRegisteredAvailable;
    +	}
    +
    +	public boolean isRegisteredAvailable() {
    +		return isRegisteredAvailable;
    +	}
    +
    +	/**
    +	 * Check whether this reader is available or not.
    +	 *
    +	 * <p>Return true only if the next buffer is event or the reader has both available
    +	 * credits and buffers.
    +	 */
    +	public boolean isAvailable() {
    +		if (numBuffersAvailable.get() <= 0) {
    +			return false;
    +		}
    +
    +		if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) {
    --- End diff --
    
    how about checking `numCreditsAvailable` first since that's the cheaper check?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162259781
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java ---
    @@ -164,11 +165,13 @@ private boolean dispose() {
     		private void handInChannel(Channel channel) {
     			synchronized (connectLock) {
     				try {
    -					PartitionRequestClientHandler requestHandler = channel.pipeline()
    -							.get(PartitionRequestClientHandler.class);
    +					NetworkClientHandler clientHandler = channel.pipeline().get(PartitionRequestClientHandler.class);
    +					if (clientHandler == null) {
    +						clientHandler = channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class);
    +					}
    --- End diff --
    
    good idea!


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160333068
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +93,35 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification form the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    This method is not used only for testing. It may be called via `addCredit` method.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162104968
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -125,11 +126,11 @@ private void enqueueAvailableReader(final SequenceNumberingViewReader reader) th
     	 * @return readers which are enqueued available for transferring data
     	 */
     	@VisibleForTesting
    -	ArrayDeque<SequenceNumberingViewReader> getAvailableReaders() {
    +	ArrayDeque<NetworkSequenceViewReader> getAvailableReaders() {
     		return availableReaders;
     	}
     
    -	void notifyReaderCreated(final SequenceNumberingViewReader reader) {
    +	public void notifyReaderCreated(final NetworkSequenceViewReader reader) {
    --- End diff --
    
    this could stay `package-private`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162047076
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java ---
    @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {
     			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
     			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
     			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
    -			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
    +			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
    --- End diff --
    
    Do you, by any chance, know why 800 worked and is not enough here anymore? I mean, why/how does this test need 800 buffers?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4552


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    @NicoK , thanks for focusing on the last PR.
    
    I am supposed to divide this into two separate ones as you said. But I am afraid some current tests may fail if I only modify and enable the credit-based process on sender side, otherwise I may need to create a temporary handler in parallel with `PartitionRequestQueue`.
    
    As you said, the latter mainly replaces the `PartitionRequestClientHandler` with the contents of `CreditBasedClientHandler`, also it replaces the `CreditBasedClientHandler` class name in previous added tests and remove previous temporary codes in `ResultPartitionType` to make fixed buffer pool work.
    
    If it still brings difficulty for your review, then I will try to divide it then. : )


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    @NicoK , I have rebased the latest codes. Wish your reviews!


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160695373
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -98,20 +135,35 @@ public void close() {
     		}
     	}
     
    +	/**
    +	 * Adds to maintain the unannounced credits from the consumer and it may trigger
    +	 * enqueue the corresponding reader for this consumer transferring data.
    +	 *
    +	 * @param receiverId The input channel id to identify consumer.
    +	 * @param credit The unannounced credits of the consumer.
    +	 */
    +	public void addCredit(InputChannelID receiverId, int credit) throws Exception {
    --- End diff --
    
    could be `package-private`


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    Ok, I think, I'll manage the review without the split.
    
    Since this is the last of the credit-based PRs though, can you rebase on top of the latest changes (preferably after addressing the comments in the other PRs)? This way, I'll have the full picture of this crucial change.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161296711
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -73,6 +73,9 @@
     	/** Flag indicating whether all resources have been released. */
     	private AtomicBoolean isReleased = new AtomicBoolean();
     
    +	/** The next buffer to hand out. */
    +	private Buffer nextBuffer;
    --- End diff --
    
    We need to protect this against race conditions with respect to `releaseAllResources()` as well.
    
    Actually, I'm surprised that nothing in this class is protected against concurrently releasing it. Although I have created a separate issue for this ([FLINK-8425](https://issues.apache.org/jira/browse/FLINK-8425)), I think we may need to solve this here for the new `nextBuffer` anyway.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161559913
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = spy(new PartitionRequestQueue());
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		verify(queue, times(1)).triggerEnqueueAvailableReader(reader);
    +		// The reader is enqueued in the pipeline because the next buffer is event, even though no available credits
    +		verify(queue, times(1)).enqueueAvailableReader(reader);
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(false);
    +		when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2));
    --- End diff --
    
    let's remove that mock ...


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    @NicoK , I have submitted the switch for keeping the old mode and the new credit-based mode.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161249082
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -199,6 +199,19 @@ public boolean isReleased() {
     		}
     	}
     
    +	@Override
    +	public boolean nextBufferIsEvent() {
    +		if (nextBuffer != null) {
    +			return !nextBuffer.isBuffer();
    +		}
    --- End diff --
    
    We probably need synchronization here to access `nextBuffer` and also check for `isReleased()` similar to `getNextBuffer`, since we are basically doing the same but without taking the buffer.
    
    Why not integrate this into `getNextBuffer` and the `BufferAndBacklog` returned there? Inside that method, gathering this additional info is basically for free and we may thus speed up some code paths.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161295428
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -163,6 +190,15 @@ public boolean isReleased() {
     		return parent.isReleased() || isReleased.get();
     	}
     
    +	@Override
    +	public boolean nextBufferIsEvent() {
    +		if (nextBuffer != null) {
    +			return !nextBuffer.isBuffer();
    +		}
    +
    +		return false;
    --- End diff --
    
    I'm afraid relying on `nextBuffer` won't be enough because it might not have been there during `getNextBuffer()` but may be there now.
    
    Please add a test for this case.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161569135
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---
    @@ -134,13 +135,22 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception {
     		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
     		verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
     
    +		assertFalse(view.nextBufferIsEvent());
    +		read = view.getNextBuffer();
    +		assertNotNull(read);
    +		assertEquals(0, subpartition.getBuffersInBacklog());
    +		assertEquals(subpartition.getBuffersInBacklog(), read.buffersInBacklog());
    +		assertNull(view.getNextBuffer());
    +		assertEquals(0, subpartition.getBuffersInBacklog());
    +
     		// Add event to the queue...
     		Buffer event = createBuffer();
     		event.tagAsEvent();
     		subpartition.add(event);
     
    +		assertTrue(view.nextBufferIsEvent());
     		assertEquals(3, subpartition.getTotalNumberOfBuffers());
    -		assertEquals(1, subpartition.getBuffersInBacklog());
    +		assertEquals(0, subpartition.getBuffersInBacklog());
     		assertEquals(3 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
     		verify(listener, times(3)).notifyBuffersAvailable(eq(1L));
     	}
    --- End diff --
    
    maybe verify that `nextBufferIsEvent()` returns the right thing after adding a real buffer now (with the event being next)


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161282684
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---
    @@ -77,6 +92,46 @@ void requestSubpartitionView(
     		}
     	}
     
    +	/**
    +	 * The credits from consumer are added in incremental way.
    +	 *
    +	 * @param creditDeltas The credit deltas
    +	 */
    +	public void addCredit(int creditDeltas) {
    +		numCreditsAvailable += creditDeltas;
    +	}
    +
    +	/**
    +	 * Updates the value to indicate whether the reader is enqueued in the pipeline or not.
    +	 *
    +	 * @param isRegisteredAvailable True if this reader is already enqueued in the pipeline.
    +	 */
    +	public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
    +		this.isRegisteredAvailable = isRegisteredAvailable;
    +	}
    +
    +	public boolean isRegisteredAvailable() {
    +		return isRegisteredAvailable;
    +	}
    +
    +	/**
    +	 * Check whether this reader is available or not.
    +	 *
    +	 * <p>Return true only if the next buffer is event or the reader has both available
    +	 * credits and buffers.
    +	 */
    +	public boolean isAvailable() {
    +		if (numBuffersAvailable.get() <= 0) {
    +			return false;
    +		}
    +
    +		if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) {
    --- End diff --
    
    also, why not simply do the following for this whole method?
    ```
    return numBuffersAvailable.get() > 0 &&
    			(numCreditsAvailable > 0 || subpartitionView.nextBufferIsEvent());
    ```


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160694722
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +94,37 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification from the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	@VisibleForTesting
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    +		if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
    +			enqueueAvailableReader(reader);
    +		}
    +	}
    +
    +	@VisibleForTesting
    +	void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    actually, we can inline this into the first one, make this `private`, and reduce `Mockito` use in the tests if we just made the `availableReaders` field accessible to the tests, e.g.
    ```
    	/**
    	 * Accesses internal state to verify reader registration in the unit tests.
    	 *
    	 * <p><strong>Do not use anywhere else!</strong>
    	 *
    	 * @return readers which are enqueued available for transferring data
    	 */
    	@VisibleForTesting
    	ArrayDeque<SequenceNumberingViewReader> getAvailableReaders() {
    		return availableReaders;
    	}
    ```


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161559690
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    --- End diff --
    
    let's remove that mock ...


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161667234
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java ---
    @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception {
     			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
     			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
     			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
    -			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
    +			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
    --- End diff --
    
    yes, i will set 9 for it.


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    @NicoK , I have submitted all the modifications based on the patches you provided.
    The tests for `nextBufferIsEvent` will be added in a new commit tomorrow.


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    @pnowojski , this PR is ready for review.
    
    It covers almost all the logics of credit-based on sender side. 
    In addition, I replace the current `PartitionRequestClientHandler` with `CreditBasedClientHandler` and remove previous temporary codes for making this feature work on both sides.
    
    It leaves a small work to do in this PR related with `SpilledSubpartitionView#nextBufferIsEvent` because the existing process in spilled sub-partition can not get next buffer directly. But the current default value for  `nextBufferIsEvent`` will not affect the core process, only results in wasting a unnecessary credit, then I will try to solve it in a lightweight way later. 


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160371761
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +93,35 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification form the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    Thanks for clarification. I misunderstood this tag from the comments, I will modify it after all the reviews together.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161567305
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		// The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available
    +		assertEquals(1, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +
    +		// Flush the buffer to make the channel writable again and see the final results
    +		channel.flush();
    +		assertSame(channelBlockingBuffer, channel.readOutbound());
    +
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    --- End diff --
    
    let's end with `assertNull(channel.readOutbound());`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160361957
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +93,35 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification form the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    I interpret this tag actually not only for methods that are *only* used by testing, but for those where we had to increase visibility, e.g. from `private` to `package-private` as here, to be usable in tests.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161545040
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java ---
    @@ -277,23 +277,31 @@ public void testNotifyCreditAvailable() throws Exception {
     			handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse1);
     			handler.channelRead(mock(ChannelHandlerContext.class), bufferResponse2);
     
    -			// The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we
    -			// have to notify credit available in CreditBasedClientHandler explicitly
    -			handler.notifyCreditAvailable(inputChannel1);
    -			handler.notifyCreditAvailable(inputChannel2);
    -
     			assertEquals(2, inputChannel1.getUnannouncedCredit());
     			assertEquals(2, inputChannel2.getUnannouncedCredit());
     
     			channel.runPendingTasks();
     
    -			// The two input channels should notify credits via writable channel
    +			// The two input channels should send partition requests and then notify credits via writable channel
     			assertTrue(channel.isWritable());
     			Object readFromOutbound = channel.readOutbound();
    +			assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
    +			assertEquals(inputChannel1.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId);
    +			assertEquals(2, ((PartitionRequest) readFromOutbound).credit);
    +
    +			readFromOutbound = channel.readOutbound();
    +			assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
    +			assertEquals(inputChannel2.getInputChannelId(), ((PartitionRequest) readFromOutbound).receiverId);
    +			assertEquals(2, ((PartitionRequest) readFromOutbound).credit);
    --- End diff --
    
    Let's verify those two `PartitionRequest` messages above since `inputChannel1.getUnannouncedCredit());` kind of relies on those being send (if we change the `initialCredit` to be included in the `unannouncedCredit`).


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161576253
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java ---
    @@ -84,7 +84,7 @@ public void testCancelAsyncProducerAndConsumer() throws Exception {
     			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
     			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
     			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
    -			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 8);
    +			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 16);
    --- End diff --
    
    just a note for the curious:  this test can cope with higher number of network buffers and is waiting for all of them to be blocked - increasing this to `9` would have been enough here though (we require 2 exclusive buffers now per default, while 1 was the minimum per incoming channel)


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    1. Thanks for you FLINK08425.
    2. I would have thought the tests for `ResultSubpartition#nextBufferIsEvent` which have already been covered before. The test for `BufferAndBacklog#nextBufferIsEvent()` is not included before, and thanks for providing the patch for it.
    3. I will create a separate JIRA for the switch and also include the commit in this PR. And check the travis fail.
    4. I will add the comment for the document you mentioned later.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161667346
  
    --- Diff: flink-tests/src/test/java/org/apache/flink/test/misc/SuccessAfterNetworkBuffersFailureITCase.java ---
    @@ -59,7 +59,7 @@ public void testSuccessfulProgramAfterFailure() {
     			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
     			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 80L);
     			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 8);
    -			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 800);
    +			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 1024);
    --- End diff --
    
    yes, the same reason as above


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161295951
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -199,6 +199,19 @@ public boolean isReleased() {
     		}
     	}
     
    +	@Override
    +	public boolean nextBufferIsEvent() {
    +		if (nextBuffer != null) {
    +			return !nextBuffer.isBuffer();
    +		}
    +
    +		if (spilledView != null) {
    --- End diff --
    
    `checkState(spilledView != null, "No in-memory buffers available, but also nothing spilled.");` just like in `getNextBuffer()`?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162106371
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java ---
    @@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws
     				LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);
     
     				try {
    -					SequenceNumberingViewReader reader = new SequenceNumberingViewReader(
    -						request.receiverId,
    -						request.credit,
    -						outboundQueue);
    +					NetworkSequenceViewReader reader;
    +					if (request.credit > 0) {
    +						reader = new CreditBasedSequenceNumberingViewReader(
    +							request.receiverId,
    +							request.credit,
    +							outboundQueue);
    +					} else {
    +						reader = new SequenceNumberingViewReader(
    +							request.receiverId,
    +							outboundQueue);
    +					}
    --- End diff --
    
    This seems a bit hacky since it does not rely on the configuration parameter directly but rather on the affect it has, i.e. that a `PartitionRequest` in credit-based flow control always has a non-zero credit (due to `NetworkBufferPool#requestMemorySegments()`).
    Relying on the configuration parameter would be nicer, but alternatively, you could add a comment clarifying this.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161568012
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---
    @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception {
     		verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
     
     		// ...and one available result
    +		assertFalse(view.nextBufferIsEvent());
    --- End diff --
    
    we should test this everywhere we access `getNextBuffer()` or add buffers via `add()` - also if `getNextBuffer()` is `null` or before even requesting anything


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161570121
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---
    @@ -119,6 +119,7 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception {
     		verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
     
     		// ...and one available result
    +		assertFalse(view.nextBufferIsEvent());
    --- End diff --
    
    also test `read.buffer().isBuffer()` then?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161242036
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---
    @@ -49,10 +50,24 @@
     
     	private volatile ResultSubpartitionView subpartitionView;
     
    +	/**
    +	 * The status indicating whether this reader is already enqueued in the pipeline for transferring
    +	 * data or not. It is mainly used for avoid registering this reader to the pipeline repeatedly.
    +	 */
    +	private boolean isRegisteredAvailable;
    --- End diff --
    
    
    
    I know it's default-initialized to false but let's make this explicit



---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161559642
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +77,95 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    --- End diff --
    
    let's remove that mock ...


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161565565
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		// The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available
    +		assertEquals(1, queue.getAvailableReaders().size());
    --- End diff --
    
    actually, let's use `assertThat(queue.getAvailableReaders(), contains(reader));` here which gives much nicer output in case something is wrong


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160333139
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +93,35 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification form the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    +		if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
    +			enqueueAvailableReader(reader);
    +		}
    +	}
    +
    +	void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    The same with `triggerEnqueueAvailableReader`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r157760366
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---
    @@ -41,30 +42,33 @@
     import java.io.IOException;
     import java.net.SocketAddress;
     import java.util.ArrayDeque;
    -import java.util.Queue;
     import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.ConcurrentMap;
     import java.util.concurrent.atomic.AtomicReference;
     
    +/**
    + * Channel handler to read the messages of buffer response or error response from the
    + * producer, to write and flush the unannounced credits for the producer.
    + */
     class PartitionRequestClientHandler extends ChannelInboundHandlerAdapter {
     
     	private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClientHandler.class);
     
    -	private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<InputChannelID, RemoteInputChannel>();
    -
    -	private final AtomicReference<Throwable> channelError = new AtomicReference<Throwable>();
    +	/** Channels, which already requested partitions from the producers. */
    +	private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<>();
     
    -	private final BufferListenerTask bufferListener = new BufferListenerTask();
    +	/** Channels, which will notify the producers about unannounced credit. */
    +	private final ArrayDeque<RemoteInputChannel> inputChannelsWithCredit = new ArrayDeque<>();
     
    -	private final Queue<Object> stagedMessages = new ArrayDeque<Object>();
    +	private final AtomicReference<Throwable> channelError = new AtomicReference<>();
     
    -	private final StagedMessagesHandlerTask stagedMessagesHandler = new StagedMessagesHandlerTask();
    +	private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
     
     	/**
     	 * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared
     	 * while data is still coming in for this channel.
     	 */
    -	private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = Maps.newConcurrentMap();
    +	private final ConcurrentMap<InputChannelID, InputChannelID> cancelled = new ConcurrentHashMap<>();
     
     	private volatile ChannelHandlerContext ctx;
    --- End diff --
    
    it looks like you missed to migrate some of the comments that were present in `CreditBasedClientHandler` but are not present here, e.g. for `ctx`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161546145
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java ---
    @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception {
     
     			assertEquals(2, inputChannel.getUnannouncedCredit());
     
    -			// The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we
    -			// have to notify credit available in CreditBasedClientHandler explicitly
    -			handler.notifyCreditAvailable(inputChannel);
    -
     			// Release the input channel
     			inputGate.releaseAllResources();
     
     			channel.runPendingTasks();
     
    -			// It will not notify credits for released input channel
    +			// It should send partition request first, and send close request after releasing input channel,
    +			// but will not notify credits for released input channel.
    +			Object readFromOutbound = channel.readOutbound();
    +			assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
    +			assertEquals(2, ((PartitionRequest) readFromOutbound).credit);
    --- End diff --
    
    similar here: verify `PartitionRequest` after `inputChannel.requestSubpartition`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r157760652
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---
    @@ -318,192 +307,56 @@ else if (bufferProvider.isDestroyed()) {
     				MemorySegment memSeg = MemorySegmentFactory.wrap(byteArray);
     				Buffer buffer = new Buffer(memSeg, FreeingBufferRecycler.INSTANCE, false);
     
    -				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, -1);
    -
    -				return true;
    -			}
    -		}
    -		finally {
    -			if (releaseNettyBuffer) {
    -				bufferOrEvent.releaseBuffer();
    +				inputChannel.onBuffer(buffer, bufferOrEvent.sequenceNumber, bufferOrEvent.backlog);
     			}
    +		} finally {
    +			bufferOrEvent.releaseBuffer();
     		}
     	}
     
    -	/**
    -	 * This class would be replaced by CreditBasedClientHandler in the final,
    -	 * so we only implement this method in CreditBasedClientHandler.
    -	 */
    -	void notifyCreditAvailable(RemoteInputChannel inputChannel) {
    -	}
    -
    -	private class AsyncErrorNotificationTask implements Runnable {
    -
    -		private final Throwable error;
    -
    -		public AsyncErrorNotificationTask(Throwable error) {
    -			this.error = error;
    -		}
    -
    -		@Override
    -		public void run() {
    -			notifyAllChannelsOfErrorAndClose(error);
    -		}
    -	}
    -
    -	/**
    -	 * A buffer availability listener, which subscribes/unsubscribes the NIO
    -	 * read event.
    -	 *
    -	 * <p>If no buffer is available, the channel read event will be unsubscribed
    -	 * until one becomes available again.
    -	 *
    -	 * <p>After a buffer becomes available again, the buffer is handed over by
    -	 * the thread calling {@link #notifyBufferAvailable(Buffer)} to the network I/O
    -	 * thread, which then continues the processing of the staged buffer.
    -	 */
    -	private class BufferListenerTask implements BufferListener, Runnable {
    -
    -		private final AtomicReference<Buffer> availableBuffer = new AtomicReference<Buffer>();
    -
    -		private NettyMessage.BufferResponse stagedBufferResponse;
    -
    -		private boolean waitForBuffer(BufferProvider bufferProvider, NettyMessage.BufferResponse bufferResponse) {
    -
    -			stagedBufferResponse = bufferResponse;
    -
    -			if (bufferProvider.addBufferListener(this)) {
    -				if (ctx.channel().config().isAutoRead()) {
    -					ctx.channel().config().setAutoRead(false);
    -				}
    -
    -				return true;
    -			}
    -			else {
    -				stagedBufferResponse = null;
    -
    -				return false;
    -			}
    -		}
    -
    -		private boolean hasStagedBufferOrEvent() {
    -			return stagedBufferResponse != null;
    -		}
    -
    -		public void notifyBufferDestroyed() {
    -			// The buffer pool has been destroyed
    -			stagedBufferResponse = null;
    -
    -			if (stagedMessages.isEmpty()) {
    -				ctx.channel().config().setAutoRead(true);
    -				ctx.channel().read();
    -			}
    -			else {
    -				ctx.channel().eventLoop().execute(stagedMessagesHandler);
    -			}
    +	private void writeAndFlushNextMessageIfPossible(Channel channel) {
    --- End diff --
    
    please re-add the comment here, too


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161485403
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java ---
    @@ -434,6 +443,29 @@ private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate) t
     			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
     	}
     
    +	/**
    +	 * Creates and returns a remote input channel for the specific input gate with specific partition request client.
    +	 *
    +	 * @param inputGate The input gate owns the created input channel.
    +	 * @param client The client is used to send partition request.
    +	 * @return The new created remote input channel.
    +	 */
    +	private RemoteInputChannel createRemoteInputChannel(SingleInputGate inputGate, PartitionRequestClient client) throws Exception {
    --- End diff --
    
    could you modify `PartitionRequestClientHandlerTest#createRemoteInputChannel(SingleInputGate)` to rely on this method, i.e. `return createRemoteInputChannel(inputGate, mock(PartitionRequestClient.class));`?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161248402
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---
    @@ -52,4 +52,9 @@
     	boolean isReleased();
     
     	Throwable getFailureCause();
    +
    +	/**
    +	 * Returns whether the next buffer is event or not.
    --- End diff --
    
    `is an event`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162323707
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -31,7 +31,6 @@
     import org.slf4j.LoggerFactory;
     
     import javax.annotation.Nullable;
    -import javax.annotation.concurrent.GuardedBy;
    --- End diff --
    
    no, you don't need to - I can create a separate PR for FLINK-8225


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    Yes, I am planing to rebase this with the latest changes of previous commits.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162259766
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -125,11 +126,11 @@ private void enqueueAvailableReader(final SequenceNumberingViewReader reader) th
     	 * @return readers which are enqueued available for transferring data
     	 */
     	@VisibleForTesting
    -	ArrayDeque<SequenceNumberingViewReader> getAvailableReaders() {
    +	ArrayDeque<NetworkSequenceViewReader> getAvailableReaders() {
     		return availableReaders;
     	}
     
    -	void notifyReaderCreated(final SequenceNumberingViewReader reader) {
    +	public void notifyReaderCreated(final NetworkSequenceViewReader reader) {
    --- End diff --
    
    yes, for my careless


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161565896
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueueTest.java ---
    @@ -71,4 +78,142 @@ public void testProducerFailedException() throws Exception {
     		NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse) msg;
     		assertTrue(err.cause instanceof CancelTaskException);
     	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline if the next sending buffer is event, even
    +	 * though it has no available credits.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingEventBuffer() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(true);
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify an available event buffer to trigger enqueue the reader
    +		reader.notifyBuffersAvailable(1);
    +
    +		channel.runPendingTasks();
    +
    +		// The reader is enqueued in the pipeline because the next buffer is an event, even though no credits are available
    +		assertEquals(1, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +
    +		// Flush the buffer to make the channel writable again and see the final results
    +		channel.flush();
    +		assertSame(channelBlockingBuffer, channel.readOutbound());
    +
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +	}
    +
    +	/**
    +	 * Tests {@link PartitionRequestQueue#enqueueAvailableReader(SequenceNumberingViewReader)},
    +	 * verifying the reader would be enqueued in the pipeline iff it has both available credits and buffers.
    +	 */
    +	@Test
    +	public void testEnqueueReaderByNotifyingBufferAndCredit() throws Exception {
    +		// setup
    +		final ResultSubpartitionView view = mock(ResultSubpartitionView.class);
    +		when(view.nextBufferIsEvent()).thenReturn(false);
    +		when(view.getNextBuffer()).thenReturn(new BufferAndBacklog(TestBufferFactory.createBuffer(), 2, false));
    +
    +		final ResultPartitionID partitionId = new ResultPartitionID();
    +		final ResultPartitionProvider partitionProvider = mock(ResultPartitionProvider.class);
    +		when(partitionProvider.createSubpartitionView(
    +			eq(partitionId),
    +			eq(0),
    +			any(BufferAvailabilityListener.class))).thenReturn(view);
    +
    +		final InputChannelID receiverId = new InputChannelID();
    +		final PartitionRequestQueue queue = new PartitionRequestQueue();
    +		final SequenceNumberingViewReader reader = new SequenceNumberingViewReader(receiverId, 0, queue);
    +		final EmbeddedChannel channel = new EmbeddedChannel(queue);
    +
    +		reader.requestSubpartitionView(partitionProvider, partitionId, 0);
    +		queue.notifyReaderCreated(reader);
    +
    +		// block the channel so that we see an intermediate state in the test
    +		ByteBuf channelBlockingBuffer = blockChannel(channel);
    +		assertNull(channel.readOutbound());
    +
    +		// Notify available buffers to trigger enqueue the reader
    +		final int notifyNumBuffers = 5;
    +		for (int i = 0; i < notifyNumBuffers; i++) {
    +			reader.notifyBuffersAvailable(1);
    +		}
    +
    +		channel.runPendingTasks();
    +
    +		// the reader is not enqueued in the pipeline because no credits are available
    +		// -> it should still have the same number of pending buffers
    +		assertEquals(0, queue.getAvailableReaders().size());
    +		assertEquals(notifyNumBuffers, reader.getNumBuffersAvailable());
    +		assertFalse(reader.isRegisteredAsAvailable());
    +		assertEquals(0, reader.getNumCreditsAvailable());
    +
    +		// Notify available credits to trigger enqueue the reader again
    +		final int notifyNumCredits = 3;
    +		for (int i = 1; i <= notifyNumCredits; i++) {
    +			queue.addCredit(receiverId, 1);
    +
    +			// the reader is enqueued in the pipeline because it has both available buffers and credits
    +			// since the channel is blocked though, we will not process anything and only enqueue the
    +			// reader once
    +			assertTrue(reader.isRegisteredAsAvailable());
    +			assertEquals(1, queue.getAvailableReaders().size());
    --- End diff --
    
    same here: `assertThat(queue.getAvailableReaders(), contains(reader));`


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r152985049
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException {
     				BufferPool bufferPool = null;
     
     				try {
    -					if (gate.getConsumedPartitionType().isCreditBased()) {
    -						// Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly
    -						bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate);
    -						gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
    -					} else {
    -						int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
    -							gate.getNumberOfInputChannels() * networkBuffersPerChannel +
    -								extraNetworkBuffersPerGate : Integer.MAX_VALUE;
    -						bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
    -							maxNumberOfMemorySegments);
    -					}
    +					// Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly
    +					bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate);
    +					gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
    --- End diff --
    
    What about the non-bounded partition type that we use for batch processing? Shouldn't we use an unbounded number of floating buffers there, as previously?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160849478
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +94,37 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification from the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	@VisibleForTesting
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    +		if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
    +			enqueueAvailableReader(reader);
    +		}
    +	}
    +
    +	@VisibleForTesting
    +	void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    agree


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r160849525
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -250,10 +304,12 @@ private void handleException(Channel channel, Throwable cause) throws IOExceptio
     
     	private void releaseAllResources() throws IOException {
     		SequenceNumberingViewReader reader;
    -		while ((reader = nonEmptyReader.poll()) != null) {
    +		while ((reader = availableReaders.poll()) != null) {
    --- End diff --
    
    yes, it should release all readers here.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162107348
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -31,7 +31,6 @@
     import org.slf4j.LoggerFactory;
     
     import javax.annotation.Nullable;
    -import javax.annotation.concurrent.GuardedBy;
    --- End diff --
    
    also found that relic of FLINK-8225 (removed in my patch for the previous commit, and it should be inside FLINK-7456, not the hotfix which we will revert later)


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162103769
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java ---
    @@ -164,11 +165,13 @@ private boolean dispose() {
     		private void handInChannel(Channel channel) {
     			synchronized (connectLock) {
     				try {
    -					PartitionRequestClientHandler requestHandler = channel.pipeline()
    -							.get(PartitionRequestClientHandler.class);
    +					NetworkClientHandler clientHandler = channel.pipeline().get(PartitionRequestClientHandler.class);
    +					if (clientHandler == null) {
    +						clientHandler = channel.pipeline().get(CreditBasedPartitionRequestClientHandler.class);
    +					}
    --- End diff --
    
    if you let `NetworkClientHandler` extend from `ChannelHandler`, then this can be simplified to
    ```
    NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);
    ```


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r153405989
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---
    @@ -224,17 +224,9 @@ public void registerTask(Task task) throws IOException {
     				BufferPool bufferPool = null;
     
     				try {
    -					if (gate.getConsumedPartitionType().isCreditBased()) {
    -						// Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly
    -						bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate);
    -						gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
    -					} else {
    -						int maxNumberOfMemorySegments = gate.getConsumedPartitionType().isBounded() ?
    -							gate.getNumberOfInputChannels() * networkBuffersPerChannel +
    -								extraNetworkBuffersPerGate : Integer.MAX_VALUE;
    -						bufferPool = networkBufferPool.createBufferPool(gate.getNumberOfInputChannels(),
    -							maxNumberOfMemorySegments);
    -					}
    +					// Create a fixed-size buffer pool for floating buffers and assign exclusive buffers to input channels directly
    +					bufferPool = networkBufferPool.createBufferPool(extraNetworkBuffersPerGate, extraNetworkBuffersPerGate);
    +					gate.assignExclusiveSegments(networkBufferPool, networkBuffersPerChannel);
    --- End diff --
    
    That makes sense, I will keep the previous behaviour for batch processing.


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r157760560
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---
    @@ -158,32 +164,44 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
     	@Override
     	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
     		try {
    -			if (!bufferListener.hasStagedBufferOrEvent() && stagedMessages.isEmpty()) {
    -				decodeMsg(msg, false);
    -			}
    -			else {
    -				stagedMessages.add(msg);
    -			}
    -		}
    -		catch (Throwable t) {
    +			decodeMsg(msg);
    +		} catch (Throwable t) {
     			notifyAllChannelsOfErrorAndClose(t);
     		}
     	}
     
    +	@Override
    +	public void userEventTriggered(ChannelHandlerContext ctx, Object msg) throws Exception {
    +		if (msg instanceof RemoteInputChannel) {
    +			boolean triggerWrite = inputChannelsWithCredit.isEmpty();
    --- End diff --
    
    please re-add the comment here, too


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162266243
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -31,7 +31,6 @@
     import org.slf4j.LoggerFactory;
     
     import javax.annotation.Nullable;
    -import javax.annotation.concurrent.GuardedBy;
    --- End diff --
    
    Should I pick the FLINK-8225 in this PR?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r159401061
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +93,35 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification form the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    `@VisibleForTesting`?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161546199
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java ---
    @@ -372,16 +379,18 @@ public void testNotifyCreditAvailableAfterReleased() throws Exception {
     
     			assertEquals(2, inputChannel.getUnannouncedCredit());
     
    -			// The PartitionRequestClient is tied to PartitionRequestClientHandler currently, so we
    -			// have to notify credit available in CreditBasedClientHandler explicitly
    -			handler.notifyCreditAvailable(inputChannel);
    -
     			// Release the input channel
     			inputGate.releaseAllResources();
     
     			channel.runPendingTasks();
     
    -			// It will not notify credits for released input channel
    +			// It should send partition request first, and send close request after releasing input channel,
    +			// but will not notify credits for released input channel.
    +			Object readFromOutbound = channel.readOutbound();
    +			assertThat(readFromOutbound, instanceOf(PartitionRequest.class));
    +			assertEquals(2, ((PartitionRequest) readFromOutbound).credit);
    +			readFromOutbound = channel.readOutbound();
    +			assertThat(readFromOutbound, instanceOf(CloseRequest.class));
    --- End diff --
    
    put these after `inputGate.releaseAllResources()`?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r159400371
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---
    @@ -77,6 +83,37 @@ void requestSubpartitionView(
     		}
     	}
     
    +	public void addCredit(int credit) {
    +		numCreditsAvailable += credit;
    +	}
    +
    +	public void notifyAvailableChanged(boolean update) {
    --- End diff --
    
    `notifyAvailabilityChanged` or actually rather `setAvailability`?
    also, please add a comment on what this is about / what "availability" means here
      


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r162259728
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java ---
    @@ -82,10 +83,17 @@ protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws
     				LOG.debug("Read channel on {}: {}.", ctx.channel().localAddress(), request);
     
     				try {
    -					SequenceNumberingViewReader reader = new SequenceNumberingViewReader(
    -						request.receiverId,
    -						request.credit,
    -						outboundQueue);
    +					NetworkSequenceViewReader reader;
    +					if (request.credit > 0) {
    +						reader = new CreditBasedSequenceNumberingViewReader(
    +							request.receiverId,
    +							request.credit,
    +							outboundQueue);
    +					} else {
    +						reader = new SequenceNumberingViewReader(
    +							request.receiverId,
    +							outboundQueue);
    +					}
    --- End diff --
    
    Yes, I actually took a hacky way to realize it for easy. :)
    I will consider whether it is feasible to get the config here. If so I will take the config way, otherwise I may add a comment clarifying this.


---

[GitHub] flink issue #4552: [FLINK-7456][network] Implement Netty sender incoming pip...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4552
  
    looks good - Let's start some cluster tests and then we're ready to merge


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r159400956
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestQueue.java ---
    @@ -88,6 +93,35 @@ public void run() {
     		});
     	}
     
    +	/**
    +	 * Try to enqueue the reader once receiving credit notification form the consumer or receiving
    +	 * non-empty reader notification from the producer. Only one thread would trigger the actual
    +	 * enqueue after checking the reader's availability, so there is no race condition here.
    +	 */
    +	void triggerEnqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    +		if (!reader.isRegisteredAvailable() && reader.isAvailable()) {
    +			enqueueAvailableReader(reader);
    +		}
    +	}
    +
    +	void enqueueAvailableReader(final SequenceNumberingViewReader reader) throws Exception {
    --- End diff --
    
    `@VisibleForTesting`?


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161287607
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---
    @@ -77,6 +92,46 @@ void requestSubpartitionView(
     		}
     	}
     
    +	/**
    +	 * The credits from consumer are added in incremental way.
    +	 *
    +	 * @param creditDeltas The credit deltas
    +	 */
    +	public void addCredit(int creditDeltas) {
    +		numCreditsAvailable += creditDeltas;
    +	}
    +
    +	/**
    +	 * Updates the value to indicate whether the reader is enqueued in the pipeline or not.
    +	 *
    +	 * @param isRegisteredAvailable True if this reader is already enqueued in the pipeline.
    +	 */
    +	public void notifyAvailabilityChanged(boolean isRegisteredAvailable) {
    +		this.isRegisteredAvailable = isRegisteredAvailable;
    +	}
    +
    +	public boolean isRegisteredAvailable() {
    +		return isRegisteredAvailable;
    +	}
    +
    +	/**
    +	 * Check whether this reader is available or not.
    +	 *
    +	 * <p>Return true only if the next buffer is event or the reader has both available
    +	 * credits and buffers.
    +	 */
    +	public boolean isAvailable() {
    +		if (numBuffersAvailable.get() <= 0) {
    +			return false;
    +		}
    +
    +		if (subpartitionView.nextBufferIsEvent() || numCreditsAvailable > 0) {
    --- End diff --
    
    one more thing: in the special case of using it inside `getNextBuffer()`, we already retrieved the `remaining` number of buffers so we can spare one lookup into an atomic integer here by passing this one in


---

[GitHub] flink pull request #4552: [FLINK-7456][network] Implement Netty sender incom...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4552#discussion_r161241148
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/SequenceNumberingViewReader.java ---
    @@ -49,10 +50,24 @@
     
     	private volatile ResultSubpartitionView subpartitionView;
     
    +	/**
    +	 * The status indicating whether this reader is already enqueued in the pipeline for transferring
    +	 * data or not. It is mainly used for avoid registering this reader to the pipeline repeatedly.
    +	 */
    +	private boolean isRegisteredAvailable;
    +
    +	/** The number of available buffers for holding data on the consumer side. */
    +	private int numCreditsAvailable;
    --- End diff --
    
    Just a note since I was wondering whether we need synchronization here (not needed after verifying the things below):
    
    1) `numCreditsAvailable` is increased via `PartitionRequestServerHandler#channelRead0` which is a separate channel handler than `PartitionRequestQueue` (see `NettyProtocol#getServerChannelHandlers`). According to [Netty's thread model](https://netty.io/wiki/new-and-noteworthy-in-4.0.html#wiki-h2-34), we should be safe though:
    
    > A user can specify an EventExecutor when he or she adds a handler to a ChannelPipeline.
    > - If specified, the handler methods of the ChannelHandler are always invoked by the specified EventExecutor.
    > - If unspecified, the handler methods are always invoked by the EventLoop that its associated Channel is registered to.
    
    2) `numCreditsAvailable` is read from `PartitionRequestQueue#enqueueAvailableReader()` and `SequenceNumberingViewReader#getNextBuffer()` which are both accessed by the channel's IO thread only.


---