You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zhijiangW <gi...@git.apache.org> on 2017/08/17 16:23:07 UTC

[GitHub] flink pull request #4559: [FLINK-7468][network]Implement Netty sender backlo...

GitHub user zhijiangW opened a pull request:

    https://github.com/apache/flink/pull/4559

    [FLINK-7468][network]Implement Netty sender backlog logic for credit-based

    ## What is the purpose of the change
    
    This PR is based on #4533 whose commits are also included for passing travis. Review the last commit for this PR change.
    
    Receivers should know how many buffers are available on the sender side (the backlog). The receivers use this information to decide how to distribute floating buffers. So the backlog is attached in `BufferResponse` by sender as an absolute value after the buffer being transferred.
    
    ## Brief change log
    
      - *Adds the `getBacklog` method in `ResultSubpartitionView`*
      - *The `ResultSubpartition maintains the backlog to increase value when adding buffer to it and decrease value when polling buffer from it*
    
    ## Verifying this change
    
    This change will be covered by the test case in next PR.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)
      - If yes, how is the feature documented? (not applicable)
    


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zhijiangW/flink FLINK-7468

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4559.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4559
    
----
commit e35c1ff8066bf44344495d132a1092b9db3ef182
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-07T09:31:17Z

    [FLINK-7378][core]Create a fix size (non rebalancing) buffer pool type for the floating buffers

commit 969c24d3bf80c1ff89ada11e81b9bf4fea14066f
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-14T06:30:47Z

    [FLINK-7394][core]Implement basic InputChannel with free buffers,credit and backlog

commit 15fa828449d73f53042c57e9c5494d75ddee575f
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-10T05:29:13Z

    [FLINK-7406][network]Implement Netty receiver incoming pipeline for credit-based

commit d0674244f15701863a5dd3f68b7274b3bd49c64d
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-12T14:13:25Z

    [FLINK-7416][network] Implement Netty receiver outgoing pipeline for credit-based

commit 102f80916534719465cbbaf788ef09a57ca22879
Author: Zhijiang <wa...@aliyun.com>
Date:   2017-08-17T11:38:45Z

    [FLINK-7468][network]Implement Netty sender backlog logic for credit-based

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    good catch, I completely missed the code path in `SpillableSubpartition` - I agree with putting it under `synchronized(buffers)`


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @NicoK @pnowojski , for backlog thread-safe issue, the current implementation is not thread-safe and we should restore my previous implementation that calls `decreaseStatistics` inside the method of `getNextBuffer` to make it inside the `synchronized` region. What do you think?


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @NicoK , thanks for your reviews and I have updated the codes for above comments.
    
    Regarding the tests for backlog statistics, do you think I should add new tests in this PR or update your added tests later after your commit is merged?


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157548033
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---
    @@ -52,6 +54,10 @@
     	/** Flag indicating whether the subpartition has been released. */
     	private volatile boolean isReleased;
     
    +	/** The number of non-event buffers currently in this subpartition */
    +	@GuardedBy("buffers")
    +	private volatile int buffersInBacklog;
    --- End diff --
    
    I shortly thought about relying on `buffers.size()` here to reduce complexity and code, but `ArrayDeque#size()` (for `getBuffersInBacklog()`) may show some race conditions then without synchronisation. However, if we picked up the idea again of returning the backlog size with the buffer itself (which is retrieved under the lock), i.e. similar to `BufferAndAvailability` being returned by the `SequenceNumberingViewReader`, this would work and we would not need the `volatile` here. Since you split the implementations into `PipelinedSubpartition` and `SpillableSubpartition` anyway, this would be a viable approach again.
    What do you think? What would you prefer?


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157706995
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---
    @@ -52,6 +54,10 @@
     	/** Flag indicating whether the subpartition has been released. */
     	private volatile boolean isReleased;
     
    +	/** The number of non-event buffers currently in this subpartition */
    +	@GuardedBy("buffers")
    +	private volatile int buffersInBacklog;
    --- End diff --
    
    Your absolutely right about not counting events . Therefore, we cannot use the queue's size  as I suggested.
    
    Yes, `BufferAndAvailability` would need to be extended as well.
    
    This integration/split of the spillable/spilled subpartitions and subpartition views and both of them working on the same structures requiring the same synchronisation pattern is imho really not nice and highly fragile. @pnowojski and me are currently re-designing the synchronisation in these parts of the code and are a bit sensitive to it now so let's drag him into this discussion as well: I would consider `PipelinedSubpartition` the hot path where we need to optimise most - spillable subpartitions are used in batch mode and have higher tolerances, especially when spilling to disk. if you returned the new backlog counter in `SpillableSubpartition#decreaseBuffersInBacklog()` however (retrieved under the `synchronized (buffers)` section), then you would not need the `volatile` either since you are already under the lock.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r152193809
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, InterruptedException {
     					listener.notifyBuffersAvailable(1);
     				}
     
    +				if (current.isBuffer()) {
    --- End diff --
    
    That is a good idea, and I already addressed this issue as you suggested.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r154343656
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---
    @@ -22,32 +22,57 @@
     
     import java.io.IOException;
     
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
     /**
      * A view to consume a {@link ResultSubpartition} instance.
      */
    -public interface ResultSubpartitionView {
    +public abstract class ResultSubpartitionView {
    +
    +	/** The parent subpartition this view belongs to. */
    +	private final ResultSubpartition parent;
    +
    +	public ResultSubpartitionView(ResultSubpartition parent) {
    +		this.parent = checkNotNull(parent);
    +	}
    +
    +	/**
    +	 * Returns the next {@link Buffer} instance of this queue iterator and also
    +	 * decreases the related statistics.
    +	 */
    +	public Buffer getNextBuffer() throws IOException, InterruptedException {
    +		Buffer buffer = getNextBufferInternal();
    +		if (buffer != null) {
    +			parent.decreaseStatistics(buffer);
    +		}
    +		return buffer;
    +	}
    +
    +	public int getBuffersInBacklog() {
    +		return parent.getBuffersInBacklog();
    +	}
     
     	/**
    -	 * Returns the next {@link Buffer} instance of this queue iterator.
    -	 * <p>
    -	 * If there is currently no instance available, it will return <code>null</code>.
    +	 * The internal method used by {@link ResultSubpartitionView#getNextBuffer()}
    +	 * to return the next {@link Buffer} instance of this queue iterator.
    +	 *
    +	 * <p>If there is currently no instance available, it will return <code>null</code>.
     	 * This might happen for example when a pipelined queue producer is slower
     	 * than the consumer or a spilled queue needs to read in more data.
    -	 * <p>
    -	 * <strong>Important</strong>: The consumer has to make sure that each
    +	 *
    +	 * <p><strong>Important</strong>: The consumer has to make sure that each
     	 * buffer instance will eventually be recycled with {@link Buffer#recycle()}
     	 * after it has been consumed.
     	 */
    -	Buffer getNextBuffer() throws IOException, InterruptedException;
    -
    -	void notifyBuffersAvailable(long buffers) throws IOException;
    +	protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException;
    --- End diff --
    
    `@Nullable`


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r153563915
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---
    @@ -39,13 +39,15 @@
     	private final AtomicBoolean isReleased;
     
     	PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) {
    +		super(parent);
    +
     		this.parent = checkNotNull(parent);
     		this.availabilityListener = checkNotNull(listener);
     		this.isReleased = new AtomicBoolean();
     	}
     
     	@Override
    -	public Buffer getNextBuffer() {
    +	public Buffer getNextBufferInternal() {
    --- End diff --
    
    make this `protected`


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157544965
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---
    @@ -161,6 +172,29 @@ public boolean isReleased() {
     		return isReleased;
     	}
     
    +	@Override
    +	public int getBuffersInBacklog() {
    +		return buffersInBacklog;
    +	}
    +
    +	@Override
    +	public void decreaseBuffersInBacklog(Buffer buffer) {
    +		assert Thread.holdsLock(buffers);
    +
    +		if (buffer != null && buffer.isBuffer()) {
    +			buffersInBacklog--;
    +		}
    +	}
    +
    +	@Override
    +	public void increaseBuffersInBacklog(Buffer buffer) {
    +		assert Thread.holdsLock(buffers);
    +
    +		if (buffer != null && buffer.isBuffer()) {
    +			buffersInBacklog++;
    +		}
    +	}
    --- End diff --
    
    please check the access-level (the latter two could be private)


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r154343510
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---
    @@ -39,13 +39,15 @@
     	private final AtomicBoolean isReleased;
     
     	PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) {
    +		super(parent);
    +
     		this.parent = checkNotNull(parent);
     		this.availabilityListener = checkNotNull(listener);
     		this.isReleased = new AtomicBoolean();
     	}
     
     	@Override
    -	public Buffer getNextBuffer() {
    +	protected Buffer getNextBufferInternal() {
    --- End diff --
    
    Actually, a lot of the methods along these calls should probably be marked `@Nullable`. Since you touched the `getNextBufferInternal()`, can you at least mark this (and maybe some calls along the call stack if I say pretty please?). You can do so in a separate `[hotfix]` commit to keep this separate


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157544794
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---
    @@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
     
     	abstract public boolean isReleased();
     
    +	/**
    +	 * Gets the number of non-event buffers in this subpartition.
    +	 */
    +	abstract public int getBuffersInBacklog();
    +
    +	/**
    +	 * Decreases the number of non-event buffers by one after fetching a non-event
    +	 * buffer from this subpartition.
    +	 */
    +	abstract public void decreaseBuffersInBacklog(Buffer buffer);
    +
    +	/**
    +	 * Increases the number of non-event buffers by one after adding a non-event
    +	 * buffer into this subpartition.
    +	 */
    +	abstract public void increaseBuffersInBacklog(Buffer buffer);
    --- End diff --
    
    I'm not quite sure the latter two methods should be in `ResultSubpartition` now since they are quite internal. `increaseBuffersInBacklog()` is only called by `PipelinedSubpartition` and `SpillableSubpartition`. `decreaseBuffersInBacklog()` is (additionally) only by spilled/spillable subpartition views and therefore could be package-private in `SpillableSubpartition` only.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r154344942
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---
    @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T
     
     	// ---------------------------------------------------------------------------------------------
     
    -	static class InfiniteSubpartitionView implements ResultSubpartitionView {
    +	static class InfiniteSubpartitionView extends ResultSubpartitionView {
     
     		private final BufferProvider bufferProvider;
     
     		private final CountDownLatch sync;
     
     		public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) {
    +			super(mock(ResultSubpartition.class));
    +
     			this.bufferProvider = checkNotNull(bufferProvider);
     			this.sync = checkNotNull(sync);
     		}
     
     		@Override
    -		public Buffer getNextBuffer() throws IOException, InterruptedException {
    +		protected Buffer getNextBufferInternal() throws IOException, InterruptedException {
    --- End diff --
    
    `@Nullable`


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r154344924
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -114,7 +116,7 @@ public void onNotification() {
     	}
     
     	@Override
    -	public Buffer getNextBuffer() throws IOException, InterruptedException {
    +	protected Buffer getNextBufferInternal() throws IOException, InterruptedException {
    --- End diff --
    
    `@Nullable`


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157693477
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---
    @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception {
     		partition.add(buffer);
     		partition.add(buffer);
     
    +		assertEquals(3, partition.getTotalNumberOfBuffers());
    +		assertEquals(3, partition.getBuffersInBacklog());
    +		assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
    +
    +		assertFalse(buffer.isRecycled());
     		assertEquals(3, partition.releaseMemory());
     
    +		// now the buffer may be freed, depending on the timing of the write operation
    +		// -> let's do this check at the end of the test (to save some time)
    +		// still same statistics
    +		assertEquals(3, partition.getTotalNumberOfBuffers());
    +		assertEquals(3, partition.getBuffersInBacklog());
    +		assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
    +
     		partition.finish();
     
    +		// + one EndOfPartitionEvent
    +		assertEquals(4, partition.getTotalNumberOfBuffers());
    +		assertEquals(3, partition.getBuffersInBacklog());
    +		assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
    --- End diff --
    
    sure


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @NicoK , sorry I missed that synchronization comment, I may submit the updates tomorrow for the latest comments.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r153564111
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CancelPartitionRequestTest.java ---
    @@ -174,19 +175,21 @@ public ResultSubpartitionView answer(InvocationOnMock invocationOnMock) throws T
     
     	// ---------------------------------------------------------------------------------------------
     
    -	static class InfiniteSubpartitionView implements ResultSubpartitionView {
    +	static class InfiniteSubpartitionView extends ResultSubpartitionView {
     
     		private final BufferProvider bufferProvider;
     
     		private final CountDownLatch sync;
     
     		public InfiniteSubpartitionView(BufferProvider bufferProvider, CountDownLatch sync) {
    +			super(mock(ResultSubpartition.class));
    +
     			this.bufferProvider = checkNotNull(bufferProvider);
     			this.sync = checkNotNull(sync);
     		}
     
     		@Override
    -		public Buffer getNextBuffer() throws IOException, InterruptedException {
    +		public Buffer getNextBufferInternal() throws IOException, InterruptedException {
    --- End diff --
    
    make this `protected`


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157539147
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---
    @@ -239,6 +261,10 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
     
     		// Spill now
     		assertEquals(2, partition.releaseMemory());
    +		// still same statistics:
    +		assertEquals(4, partition.getTotalNumberOfBuffers());
    +		assertEquals(2, partition.getBuffersInBacklog());
    +		assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
    --- End diff --
    
    same here - please add the checks to the `reader.getNextBuffer()` lines below


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r155458048
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, InterruptedException {
     					listener.notifyBuffersAvailable(1);
     				}
     
    +				if (current.isBuffer()) {
    --- End diff --
    
    I think the `decreaseStatistics` should be inside the `getNextBufferInternal`, otherwise the backlog value is not thread-safe. The previous implementation can make the 'decreaseStatistics` inside the synchronized part.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157709904
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---
    @@ -52,6 +54,10 @@
     	/** Flag indicating whether the subpartition has been released. */
     	private volatile boolean isReleased;
     
    +	/** The number of non-event buffers currently in this subpartition */
    +	@GuardedBy("buffers")
    +	private volatile int buffersInBacklog;
    --- End diff --
    
    Yes, I totally agree with your point of current status of spillable/spilled subpartitions and subpartition views.
    
    And I also think that the `PipelinedSubpartition` is the most important path and the `SpillableSubpartition` should not be very sensitive. I think we already reach a consensus for the way of `SpillableSubpartition` and I will do for that later. :)


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157703075
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---
    @@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
     
     	abstract public boolean isReleased();
     
    +	/**
    +	 * Gets the number of non-event buffers in this subpartition.
    +	 */
    +	abstract public int getBuffersInBacklog();
    +
    +	/**
    +	 * Decreases the number of non-event buffers by one after fetching a non-event
    +	 * buffer from this subpartition.
    +	 */
    +	abstract public void decreaseBuffersInBacklog(Buffer buffer);
    +
    +	/**
    +	 * Increases the number of non-event buffers by one after adding a non-event
    +	 * buffer into this subpartition.
    +	 */
    +	abstract public void increaseBuffersInBacklog(Buffer buffer);
    --- End diff --
    
    `package-private`, e.g. `abstract void increaseBuffersInBacklog(Buffer buffer);`, already works without changing anything since `SpilledSubpartitionView` is in the same package


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r155454935
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---
    @@ -22,32 +22,57 @@
     
     import java.io.IOException;
     
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
     /**
      * A view to consume a {@link ResultSubpartition} instance.
      */
    -public interface ResultSubpartitionView {
    +public abstract class ResultSubpartitionView {
    +
    +	/** The parent subpartition this view belongs to. */
    +	private final ResultSubpartition parent;
    +
    +	public ResultSubpartitionView(ResultSubpartition parent) {
    +		this.parent = checkNotNull(parent);
    +	}
    +
    +	/**
    +	 * Returns the next {@link Buffer} instance of this queue iterator and also
    +	 * decreases the related statistics.
    +	 */
    +	public Buffer getNextBuffer() throws IOException, InterruptedException {
    --- End diff --
    
    i think it makes sense.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157706951
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---
    @@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
     
     	abstract public boolean isReleased();
     
    +	/**
    +	 * Gets the number of non-event buffers in this subpartition.
    +	 */
    +	abstract public int getBuffersInBacklog();
    +
    +	/**
    +	 * Decreases the number of non-event buffers by one after fetching a non-event
    +	 * buffer from this subpartition.
    +	 */
    +	abstract public void decreaseBuffersInBacklog(Buffer buffer);
    +
    +	/**
    +	 * Increases the number of non-event buffers by one after adding a non-event
    +	 * buffer into this subpartition.
    +	 */
    +	abstract public void increaseBuffersInBacklog(Buffer buffer);
    --- End diff --
    
    Sorry my expression is not correct above. I mean we do not need `decreaseBuffersInBacklog` method in `ResultSubPartition` after modifying the `parent` as `SpillableSubpartition` in  `SpilledSubpartitionView`.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157540910
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---
    @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
     		try {
     			subpartition.finish();
     
    +			assertEquals(1, subpartition.getTotalNumberOfBuffers());
    +			assertEquals(0, subpartition.getBuffersInBacklog());
    +			assertEquals(4, subpartition.getTotalNumberOfBytes());
    +
     			assertFalse(subpartition.add(mock(Buffer.class)));
    +			assertEquals(1, subpartition.getTotalNumberOfBuffers());
    +			assertEquals(0, subpartition.getBuffersInBacklog());
    --- End diff --
    
    Actually, this never increases the backlog, even if the subpartition is not finished, since `buffer.isBuffer()` for a `mock(Buffer.class)` returns `false`. Can you test with a real `Buffer` instead?


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r152008524
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---
    @@ -246,9 +246,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
     	@Override
     	public String toString() {
     		return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
    -						"finished? %s, read view? %s, spilled? %s]",
    -				getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
    -				spillWriter != null);
    +				"%d backlog, finished? %s, read view? %s, spilled? %s]",
    +			getTotalNumberOfBuffers(), getTotalNumberOfBytes(),
    +			backlog, isFinished, readView != null, spillWriter != null);
    --- End diff --
    
    `getBacklog`?


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157686388
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---
    @@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
     
     	abstract public boolean isReleased();
     
    +	/**
    +	 * Gets the number of non-event buffers in this subpartition.
    +	 */
    +	abstract public int getBuffersInBacklog();
    +
    +	/**
    +	 * Decreases the number of non-event buffers by one after fetching a non-event
    +	 * buffer from this subpartition.
    +	 */
    +	abstract public void decreaseBuffersInBacklog(Buffer buffer);
    +
    +	/**
    +	 * Increases the number of non-event buffers by one after adding a non-event
    +	 * buffer into this subpartition.
    +	 */
    +	abstract public void increaseBuffersInBacklog(Buffer buffer);
    --- End diff --
    
    The current `parent` in `SpilledSubpartitionView` is `ResultSubpartition` not `SpillableSubpartition`, after replacing the `ResultSubpartition` by `SpillableSubpartition`, we can make these methods package-private as you suggest. I will do that.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r152010406
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---
    @@ -41,12 +41,19 @@
     	/** The total number of bytes (both data and event buffers) */
     	private long totalNumberOfBytes;
     
    +	/** The number of non-event buffers currently in this subpartition */
    +	protected int backlog;
    --- End diff --
    
    rename `backlog` to `buffersInBacklog`/`accumulatedBuffers`/`backloggedBuffers`? 


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4559


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @zentol , the failure travis seems no related with my codes. Do I need to restart the travis to pass the tests before merging this PR?


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    Regarding b): I just checked with #4552 and it looks as if the `backlog` is increased from the `ResultWriter` / task thread, and then both read and decreased from netty's IO thread. Therefore, we'll not only have a visibility problem here, but also a synchronisation problem with the increment/decrement, won't we? 


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r153564080
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -114,7 +116,7 @@ public void onNotification() {
     	}
     
     	@Override
    -	public Buffer getNextBuffer() throws IOException, InterruptedException {
    +	public Buffer getNextBufferInternal() throws IOException, InterruptedException {
    --- End diff --
    
    make this `protected`


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @pnowojski , I have fixed all the comments. The failed test I checked seems no related with my updates.


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @NicoK , thanks for suggestions.
    
    I understand your point of wrapping the buffer and backlog together in a new structure returned by `getNextBuffer()` and it really makes sense for `PipelinedSubpartition`. But for `SpillableSubpartition`, when it begins to write the buffer to disk, we can not get the total backlog from that. We can only get the precise backlog by decreasing one for `getNextBuffer()` and increasing one for `add(Buffer)` .
    
    So I think we can put the `decreaseStatistics` under the lock which can cover all the subpartitions.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r154343587
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---
    @@ -22,32 +22,57 @@
     
     import java.io.IOException;
     
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
     /**
      * A view to consume a {@link ResultSubpartition} instance.
      */
    -public interface ResultSubpartitionView {
    +public abstract class ResultSubpartitionView {
    +
    +	/** The parent subpartition this view belongs to. */
    +	private final ResultSubpartition parent;
    +
    +	public ResultSubpartitionView(ResultSubpartition parent) {
    +		this.parent = checkNotNull(parent);
    +	}
    +
    +	/**
    +	 * Returns the next {@link Buffer} instance of this queue iterator and also
    +	 * decreases the related statistics.
    +	 */
    +	public Buffer getNextBuffer() throws IOException, InterruptedException {
    --- End diff --
    
    `@Nullable` and (I'm not sure whether @pnowojski agrees) maybe make this method `final` (any subclass should  only override `getNextBufferInternal`)?


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157541024
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---
    @@ -62,7 +70,14 @@ public void testAddAfterRelease() throws Exception {
     		try {
     			subpartition.release();
     
    +			assertEquals(0, subpartition.getTotalNumberOfBuffers());
    +			assertEquals(0, subpartition.getBuffersInBacklog());
    +			assertEquals(0, subpartition.getTotalNumberOfBytes());
    +
     			assertFalse(subpartition.add(mock(Buffer.class)));
    +			assertEquals(0, subpartition.getTotalNumberOfBuffers());
    +			assertEquals(0, subpartition.getBuffersInBacklog());
    --- End diff --
    
    same here - please test with a real `Buffer` instance


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @NicoK, I have submitted two `[hotfix]` commits for the above issues.
    
    One is for `Nullable` annotation and tests of backlog statistics. And the other is for updating backlog in thread-safe.  
    
    For updating backlog, I think it should be done in `PipelinedSubpartition` and `SpillableSubpartition` separately in order to under synchronized region, although it seems somewhat redundant.  But I notice that the `isReleased()` method in `ResultSubpartition` is also implemented in this way.



---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157538818
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---
    @@ -181,10 +182,27 @@ public void testConsumeSpilledPartition() throws Exception {
     		partition.add(buffer);
     		partition.add(buffer);
     
    +		assertEquals(3, partition.getTotalNumberOfBuffers());
    +		assertEquals(3, partition.getBuffersInBacklog());
    +		assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
    +
    +		assertFalse(buffer.isRecycled());
     		assertEquals(3, partition.releaseMemory());
     
    +		// now the buffer may be freed, depending on the timing of the write operation
    +		// -> let's do this check at the end of the test (to save some time)
    +		// still same statistics
    +		assertEquals(3, partition.getTotalNumberOfBuffers());
    +		assertEquals(3, partition.getBuffersInBacklog());
    +		assertEquals(4096 * 3, partition.getTotalNumberOfBytes());
    +
     		partition.finish();
     
    +		// + one EndOfPartitionEvent
    +		assertEquals(4, partition.getTotalNumberOfBuffers());
    +		assertEquals(3, partition.getBuffersInBacklog());
    +		assertEquals(4096 * 3 + 4, partition.getTotalNumberOfBytes());
    --- End diff --
    
    good, can you also add the backlog correctness checks to the `reader.getNextBuffer()` lines below to ensure they are correct after taking buffers out?


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r152008221
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -145,6 +145,10 @@ public Buffer getNextBuffer() throws IOException, InterruptedException {
     					listener.notifyBuffersAvailable(1);
     				}
     
    +				if (current.isBuffer()) {
    --- End diff --
    
    This logic is a copy/paste with `SpilledSubpartitionView` and `PipelinedSubpartition`. It gets even more complicated in next PR. 
    
    How about changing `ResultSubpartitionView` to an abstract class with `ResultSubpartition parent` field and following methods:
    ```
    Buffer getNextBuffer() throws IOException, InterruptedException {
    	Buffer next = getNextBufferInternal();
    	if (buffer != null) {
    		parent.decreaseStatistics(buffer);
    	}
    	return next;
    }
    
    protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException;
    ``` 
    And rename all current implementations of `getNextBuffer` to `getNextBufferInternal`.
    
    Thus:
    1. You wouldn't have to reimplement and handle decrementing in many places, but only one
    2. `protected int backlog;` field from `ResultSubpartition` could be made private.


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    Or we make the backlog as `AtomicInteger` to keep the current process, otherwise we may need to call `decreaseStatistics` in different parts in the region of `synchronized(buffers)` . What do you think?


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157694294
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SubpartitionTestBase.java ---
    @@ -47,7 +48,14 @@ public void testAddAfterFinish() throws Exception {
     		try {
     			subpartition.finish();
     
    +			assertEquals(1, subpartition.getTotalNumberOfBuffers());
    +			assertEquals(0, subpartition.getBuffersInBacklog());
    +			assertEquals(4, subpartition.getTotalNumberOfBytes());
    +
     			assertFalse(subpartition.add(mock(Buffer.class)));
    +			assertEquals(1, subpartition.getTotalNumberOfBuffers());
    +			assertEquals(0, subpartition.getBuffersInBacklog());
    --- End diff --
    
    sure


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r153564859
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartitionView.java ---
    @@ -22,32 +22,52 @@
     
     import java.io.IOException;
     
    +import static org.apache.flink.util.Preconditions.checkNotNull;
    +
     /**
      * A view to consume a {@link ResultSubpartition} instance.
      */
    -public interface ResultSubpartitionView {
    +public abstract class ResultSubpartitionView {
    +
    +	/** The parent subpartition this view belongs to. */
    +	private final ResultSubpartition parent;
    +
    +	public ResultSubpartitionView(ResultSubpartition parent) {
    +		this.parent = checkNotNull(parent);
    +	}
     
     	/**
     	 * Returns the next {@link Buffer} instance of this queue iterator.
    -	 * <p>
    -	 * If there is currently no instance available, it will return <code>null</code>.
    +	 *
    +	 * <p>If there is currently no instance available, it will return <code>null</code>.
     	 * This might happen for example when a pipelined queue producer is slower
     	 * than the consumer or a spilled queue needs to read in more data.
    -	 * <p>
    -	 * <strong>Important</strong>: The consumer has to make sure that each
    +	 *
    +	 * <p><strong>Important</strong>: The consumer has to make sure that each
     	 * buffer instance will eventually be recycled with {@link Buffer#recycle()}
     	 * after it has been consumed.
     	 */
    -	Buffer getNextBuffer() throws IOException, InterruptedException;
    +	public Buffer getNextBuffer() throws IOException, InterruptedException {
    +		Buffer buffer = getNextBufferInternal();
    +		if (buffer != null) {
    +			parent.decreaseStatistics(buffer);
    +		}
    +		return buffer;
    +	}
    +
    +	public int getBuffersInBacklog() {
    +		return parent.getBuffersInBacklog();
    +	}
     
    -	void notifyBuffersAvailable(long buffers) throws IOException;
    +	protected abstract Buffer getNextBufferInternal() throws IOException, InterruptedException;
    --- End diff --
    
    please add a javadoc with the intended relation to `getNextBuffer`


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157545208
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---
    @@ -237,6 +243,29 @@ public boolean isReleased() {
     		return isReleased;
     	}
     
    +	@Override
    +	public int getBuffersInBacklog() {
    +		return buffersInBacklog;
    +	}
    +
    +	@Override
    +	public void decreaseBuffersInBacklog(Buffer buffer) {
    +		if (buffer != null && buffer.isBuffer()) {
    +			synchronized (buffers) {
    +				buffersInBacklog--;
    +			}
    +		}
    +	}
    +
    +	@Override
    +	public void increaseBuffersInBacklog(Buffer buffer) {
    +		assert Thread.holdsLock(buffers);
    +
    +		if (buffer != null && buffer.isBuffer()) {
    +			buffersInBacklog++;
    +		}
    +	}
    --- End diff --
    
    please check the access-level (the latter two could be private)


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @NicoK , I have submitted the `hotfix` commit to address above comments.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157707628
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultSubpartition.java ---
    @@ -99,6 +82,23 @@ protected Throwable getFailureCause() {
     
     	abstract public boolean isReleased();
     
    +	/**
    +	 * Gets the number of non-event buffers in this subpartition.
    +	 */
    +	abstract public int getBuffersInBacklog();
    +
    +	/**
    +	 * Decreases the number of non-event buffers by one after fetching a non-event
    +	 * buffer from this subpartition.
    +	 */
    +	abstract public void decreaseBuffersInBacklog(Buffer buffer);
    +
    +	/**
    +	 * Increases the number of non-event buffers by one after adding a non-event
    +	 * buffer into this subpartition.
    +	 */
    +	abstract public void increaseBuffersInBacklog(Buffer buffer);
    --- End diff --
    
    yes, that would be nice


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r153564062
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
     	}
     
     	@Override
    -	public Buffer getNextBuffer() throws IOException, InterruptedException {
    +	public Buffer getNextBufferInternal() throws IOException, InterruptedException {
    --- End diff --
    
    make this `protected`


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157691096
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---
    @@ -52,6 +54,10 @@
     	/** Flag indicating whether the subpartition has been released. */
     	private volatile boolean isReleased;
     
    +	/** The number of non-event buffers currently in this subpartition */
    +	@GuardedBy("buffers")
    +	private volatile int buffersInBacklog;
    --- End diff --
    
    The way of  `ArrayDeque#size()` for `getBuffersInBacklog()` may be not feasible because we do not know how many events in the `ArrayDeque` and they should not be considered as backlog length.
    
    For the new API, we may need to modify the `ResultSubpartitionView#getNextBuffer` to return `BufferAndBacklog` wrapping structure instead of `Buffer`, and do we also need to extend the `BufferAndAvailability` to add backlog in it?  By this way, it can get benefits for `PipelinedSubpartition` to reduce 'volatile`, but for `SpillableSubpartition`, the `volatile` may still be needed? Because the `getNextBuffer` and `decreaseBacklog` are in different parts for `SpillableSubpartitionView/SpilledSubpartitionView`.



---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r155454886
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java ---
    @@ -39,13 +39,15 @@
     	private final AtomicBoolean isReleased;
     
     	PipelinedSubpartitionView(PipelinedSubpartition parent, BufferAvailabilityListener listener) {
    +		super(parent);
    +
     		this.parent = checkNotNull(parent);
     		this.availabilityListener = checkNotNull(listener);
     		this.isReleased = new AtomicBoolean();
     	}
     
     	@Override
    -	public Buffer getNextBuffer() {
    +	protected Buffer getNextBufferInternal() {
    --- End diff --
    
    I will add the hotfix commit for it.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157538061
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionTest.java ---
    @@ -103,16 +104,35 @@ public void testBasicPipelinedProduceConsumeLogic() throws Exception {
     		// Add data to the queue...
     		subpartition.add(createBuffer());
     
    +		assertEquals(1, subpartition.getTotalNumberOfBuffers());
    +		assertEquals(1, subpartition.getBuffersInBacklog());
    +		assertEquals(BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
    +
     		// ...should have resulted in a notification
     		verify(listener, times(1)).notifyBuffersAvailable(eq(1L));
     
     		// ...and one available result
     		assertNotNull(view.getNextBuffer());
     		assertNull(view.getNextBuffer());
    +		assertEquals(0, subpartition.getBuffersInBacklog());
     
     		// Add data to the queue...
     		subpartition.add(createBuffer());
    +
    +		assertEquals(2, subpartition.getTotalNumberOfBuffers());
    +		assertEquals(1, subpartition.getBuffersInBacklog());
    +		assertEquals(2 * BUFFER_SIZE, subpartition.getTotalNumberOfBytes());
     		verify(listener, times(2)).notifyBuffersAvailable(eq(1L));
    +
    +		// Add event to the queue...
    +		Buffer event = createBuffer();
    +		event.tagAsEvent();
    +		subpartition.add(event);
    +
    +		assertEquals(3, subpartition.getTotalNumberOfBuffers());
    +		assertEquals(1, subpartition.getBuffersInBacklog());
    --- End diff --
    
    good catch - the event-adding path was not tested yet


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    I'd opt for decreasing under the lock as well, or (even better?) the following alternative:
    
    We remove the `buffersInBacklog` member and return the size of the current backlog along the `getNextBuffer()` calls just like `BufferAndAvailability` is returned by the `SequenceNumberingViewReader`. This `getNextBuffer()` call is under the lock already and (except for `toString()` methods in `PipelinedSubpartition` and `SpillableSubpartition`) the backlog is only requested along the `getNextBuffer()` calls.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r152008483
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---
    @@ -246,9 +246,9 @@ public int unsynchronizedGetNumberOfQueuedBuffers() {
     	@Override
     	public String toString() {
     		return String.format("SpillableSubpartition [%d number of buffers (%d bytes)," +
    -						"finished? %s, read view? %s, spilled? %s]",
    -				getTotalNumberOfBuffers(), getTotalNumberOfBytes(), isFinished, readView != null,
    -				spillWriter != null);
    +				"%d backlog, finished? %s, read view? %s, spilled? %s]",
    --- End diff --
    
    `"%d buffers in backlog, finished (...)"` ? 
    `"backlog = %d, finished (...)"` ? 
    `"%d in backlog, finished (...)"` ? 
    
    `%d backlog` is a little bit cryptic.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r157548895
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---
    @@ -77,6 +78,10 @@
     	/** Flag indicating whether the subpartition has been released. */
     	private volatile boolean isReleased;
     
    +	/** The number of non-event buffers currently in this subpartition */
    +	@GuardedBy("buffers")
    +	private volatile int buffersInBacklog;
    --- End diff --
    
    If the interface of `getNextBuffer()` was changed as suggested above, we could remove the `volatile` here as well.


---

[GitHub] flink pull request #4559: [FLINK-7468][network] Implement sender backlog log...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4559#discussion_r154344889
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -133,7 +135,7 @@ int releaseMemory() throws IOException {
     	}
     
     	@Override
    -	public Buffer getNextBuffer() throws IOException, InterruptedException {
    +	protected Buffer getNextBufferInternal() throws IOException, InterruptedException {
    --- End diff --
    
    `@Nullable`


---

[GitHub] flink issue #4559: [FLINK-7468][network] Implement sender backlog logic for ...

Posted by zhijiangW <gi...@git.apache.org>.
Github user zhijiangW commented on the issue:

    https://github.com/apache/flink/pull/4559
  
    @pnowojski , this PR is ready for review. :)


---