You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2018/02/23 15:28:34 UTC

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/5572

    [FLINK-8694][runtime] Fix notifyDataAvailable race condition

    This fixes two bugs in network stack:
    https://issues.apache.org/jira/browse/FLINK-8760
    https://issues.apache.org/jira/browse/FLINK-8694
    
    ## Brief change log
    
    Please check individual commit messages.
    
    ## Verifying this change
    
    This PR adds new tests covering the previously bugged cases.
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f8694-proper-fix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5572.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5572
    
----
commit 388d16118763dddff7d4c3593572169ad3e65c0d
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:37:37Z

    [hotfix][tests] Deduplicate code in SingleInputGateTest

commit e22a44b24ab1e9f02c236440f899a1f4dfdfc873
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T11:11:14Z

    [hotfix][runtime] Remove duplicated check

commit 5c16e565c4a7f0ffdaec888696d98e3c2c221d99
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:20:21Z

    [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate
    
    Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable
    might incorrectly return false. This might caused some dead locks.

commit a451006fd2e38e478ef745fd9de0ebc5fb2fd5c2
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:27:54Z

    [hotfixu][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase

commit e70cd04424f0f92b9d5127e7c4a351d3823d20bd
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:28:20Z

    [FLINK-8694][runtime] Fix notifyDataAvailable race condition
    
    Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls.
    This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test
    for flushAlways (without this fix this test is dead locking).

----


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170589597
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---
    @@ -332,8 +332,8 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
     		//TODO: re-enable this?
     //		assertEquals(BUFFER_DATA_SIZE * 4 + 4, partition.getTotalNumberOfBytes());
     
    -		listener.awaitNotifications(3, 30_000);
    -		assertEquals(3, listener.getNumNotifications());
    +		listener.awaitNotifications(2, 30_000);
    +		assertEquals(2, listener.getNumNotifications());
    --- End diff --
    
    note that we will be notified once the spilled writer completes


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170560655
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---
    @@ -48,6 +48,8 @@
     	/** Flag indicating whether the subpartition has been finished. */
     	private boolean isFinished;
     
    +	private boolean flushRequested;
    --- End diff --
    
    add `@GuardedBy("buffers")`?


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170590385
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java ---
    @@ -52,6 +52,14 @@ public void largeRemoteMode() throws Exception {
     		env.tearDown();
     	}
     
    +	@Test
    +	public void largeRemoteAlwaysFlush() throws Exception {
    +		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
    +		env.setUp(1, 1, 0, false);
    +		env.executeBenchmark(1_000_000);
    +		env.tearDown();
    +	}
    --- End diff --
    
    Is this not tested via your non-IT tests now?
    (we should be careful about adding integration/full stack tests because of their added time)


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170873853
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -187,7 +185,6 @@ public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException
     
     	@Override
     	public void notifyDataAvailable() {
    -		// We do the availability listener notification one by one
    --- End diff --
    
    This was the remanent of a different approach to fix the problem. Reverted.


---

[GitHub] flink issue #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race condit...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/5572
  
    Rebased. Lets merge with green travis.


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170875167
  
    --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/benchmark/StreamNetworkThroughputBenchmarkTests.java ---
    @@ -52,6 +52,14 @@ public void largeRemoteMode() throws Exception {
     		env.tearDown();
     	}
     
    +	@Test
    +	public void largeRemoteAlwaysFlush() throws Exception {
    +		StreamNetworkThroughputBenchmark env = new StreamNetworkThroughputBenchmark();
    +		env.setUp(1, 1, 0, false);
    +		env.executeBenchmark(1_000_000);
    +		env.tearDown();
    +	}
    --- End diff --
    
    This is smaller scope test compared to ITCase and it covers higher load. It's a very good stress test for investigating and detecting deadlocks/race conditions in the network stack. 


---

[GitHub] flink issue #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race condit...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on the issue:

    https://github.com/apache/flink/pull/5572
  
    Thanks for merging :)


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170574788
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -242,6 +239,22 @@ public boolean nextBufferIsEvent() {
     		return spilledView.nextBufferIsEvent();
     	}
     
    +	@Override
    +	public boolean isAvailable() {
    +		synchronized (buffers) {
    +			if (nextBuffer != null) {
    +				return true;
    +			}
    +			else if (spilledView == null) {
    +				return false;
    +			}
    +		} // else: spilled
    +
    +		checkState(spilledView != null, "No in-memory buffers available, but also nothing spilled.");
    --- End diff --
    
    Maybe I see something wrong here, but isn't `spilledView` always non-`null` here? In that case, please remove the check.


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170562983
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartition.java ---
    @@ -208,7 +208,7 @@ public ResultSubpartitionView createReadView(BufferAvailabilityListener availabi
     					parent.getBufferProvider().getMemorySegmentSize(),
     					availabilityListener);
     			}
    -
    +			readView.notifyDataAvailable();
    --- End diff --
    
    I know that both read views` implementation of this function are no-ops, but both constructors do their own notification handling already, so this should probably be removed again


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170585362
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadTask.java ---
    @@ -447,6 +447,8 @@ private void sendEndOfSuperstepToAllIterationOutputs() throws IOException, Inter
     
     		for (RecordWriter<?> eventualOutput : this.eventualOutputs) {
     			eventualOutput.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
    +			eventualOutput.clearBuffers();
    +			eventualOutput.flushAll();
    --- End diff --
    
    let's remove those two - they should be unnecessary


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170600612
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedSequenceNumberingViewReader.java ---
    @@ -206,7 +201,7 @@ public String toString() {
     		return "CreditBasedSequenceNumberingViewReader{" +
     			"requestLock=" + requestLock +
     			", receiverId=" + receiverId +
    -			", buffersAvailable=" + buffersAvailable.get() +
    +			", buffersAvailable=" + hasBuffersAvailable() +
    --- End diff --
    
    this line should now be removed (the property is not part of this object anymore and going through a synchronized block which may be nasty during debugging sessions)


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski closed the pull request at:

    https://github.com/apache/flink/pull/5572


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170557786
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java ---
    @@ -428,13 +455,15 @@ private static SingleInputGate createInputGate(int numberOfInputChannels) {
     	}
     
     	static void verifyBufferOrEvent(
    -		InputGate inputGate,
    -		boolean isBuffer,
    -		int channelIndex) throws IOException, InterruptedException {
    +			InputGate inputGate,
    +			boolean expectedIsBuffer,
    +			int expectedChannelIndex,
    +			boolean expectedMoreAvailable) throws IOException, InterruptedException {
     
     		final Optional<BufferOrEvent> bufferOrEvent = inputGate.getNextBufferOrEvent();
     		assertTrue(bufferOrEvent.isPresent());
    -		assertEquals(isBuffer, bufferOrEvent.get().isBuffer());
    -		assertEquals(channelIndex, bufferOrEvent.get().getChannelIndex());
    +		assertEquals(expectedIsBuffer, bufferOrEvent.get().isBuffer());
    +		assertEquals(expectedChannelIndex, bufferOrEvent.get().getChannelIndex());
    +		assertEquals(expectedMoreAvailable, bufferOrEvent.get().moreAvailable());
    --- End diff --
    
    Can we also verify `assertFalse(inputGate.pollNextBufferOrEvent().isPresent());` for `SingleInputGate` instances here? (ideally also for any other input gate, once `UnionInputGate` implements this.


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170560833
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java ---
    @@ -65,9 +67,8 @@ public boolean add(BufferConsumer bufferConsumer) {
     	@Override
     	public void flush() {
     		synchronized (buffers) {
    -			if (readView != null) {
    -				readView.notifyDataAvailable();
    -			}
    +			flushRequested = !buffers.isEmpty();
    +			notifyDataAvailable();
    --- End diff --
    
    maybe don't even flush at all if buffers is empty


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170589453
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionTest.java ---
    @@ -319,7 +319,7 @@ public void testConsumeSpillablePartitionSpilledDuringConsume() throws Exception
     		assertEquals(2, partition.getBuffersInBacklog());
     		assertEquals(partition.getBuffersInBacklog(), read.buffersInBacklog());
     		read.buffer().recycleBuffer();
    -		assertEquals(2, listener.getNumNotifications());
    +		assertEquals(1, listener.getNumNotifications());
    --- End diff --
    
    maybe add a small comment that we don't get a new notification (anymore!) since the buffer is already available at the `getNextBuffer()` call?


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170631511
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java ---
    @@ -219,6 +219,14 @@ public boolean nextBufferIsEvent() {
     		}
     	}
     
    +	@Override
    +	public synchronized boolean isAvailable() {
    +		if (nextBuffer != null) {
    +			return true;
    +		}
    +		return fileReader.hasReachedEndOfFile();
    --- End diff --
    
    should be `!fileReader.hasReachedEndOfFile();` - I will add tests in a later PR for this


---

[GitHub] flink pull request #5572: [FLINK-8694][runtime] Fix notifyDataAvailable race...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5572#discussion_r170573898
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpillableSubpartitionView.java ---
    @@ -187,7 +185,6 @@ public BufferAndBacklog getNextBuffer() throws IOException, InterruptedException
     
     	@Override
     	public void notifyDataAvailable() {
    -		// We do the availability listener notification one by one
    --- End diff --
    
    Well, this is still true for the `SpilledSubpartitionView` and without the (potentially extended) comment here, a reader might wonder why we don't do anything here, not even forwarding it to the spilled view.


---