You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by pnowojski <gi...@git.apache.org> on 2018/02/27 09:49:27 UTC

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

GitHub user pnowojski opened a pull request:

    https://github.com/apache/flink/pull/5588

    [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent

    Because of race condition between:
      1. releasing inputChannelsWithData lock in this method and reaching this place
      2. empty data notification that re-enqueues a channel
    we can end up with moreAvailable flag set to true, while we expect no more data.
        
    This commit detects such situation, makes a correct assertion and turn off moreAvailable flag.
    
    ## Verifying this change
    
    This bug could be reproduce by looping couple of thousand times `org.apache.flink.table.runtime.stream.table.JoinITCase`. 
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): (yes / **no**)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**)
      - The serializers: (yes / **no** / don't know)
      - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
      - The S3 file system connector: (yes / **no** / don't know)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (yes / **no**)
      - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pnowojski/flink f8750

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5588.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5588
    
----
commit 388d16118763dddff7d4c3593572169ad3e65c0d
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:37:37Z

    [hotfix][tests] Deduplicate code in SingleInputGateTest

commit e22a44b24ab1e9f02c236440f899a1f4dfdfc873
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T11:11:14Z

    [hotfix][runtime] Remove duplicated check

commit 85d98dee9bfc59fee660db934855014d6b73182e
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:20:21Z

    [FLINK-8760][runtime] Correctly propagate moreAvailable flag through SingleInputGate
    
    Previously if we SingleInputGate was re-eqnqueuing an input channel, isMoreAvailable
    might incorrectly return false. This might caused some dead locks.

commit 20e808053d365e66b3ebd21a10e7acda3a9ebdbd
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:27:54Z

    [hotfixu][tests] Do not hide original exception in SuccessAfterNetworkBuffersFailureITCase

commit 57f83c7747f192dfa1c98902676baedc3ccd1694
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-23T10:28:20Z

    [FLINK-8694][runtime] Fix notifyDataAvailable race condition
    
    Before there was a race condition that might resulted in igonoring some notifyDataAvailable calls.
    This fixes the problem by moving buffersAvailable handling to Supartitions and adds stress test
    for flushAlways (without this fix this test is dead locking).

commit 065be67f6d862ef35f2caa5a39773816385475b1
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-26T15:13:06Z

    [hotfix][runtime] Optimize EvenSerializer.isEvent method
    
    For example, previously if the method was used to check for EndOfPartitionEvent
    and the Buffer contained huge custom event, the even had to be deserialized before
    performing the actual check. Now we are quickly entering the correct if/else branch
    and doing full costly deserialization only if we have to.
    
    Other calls to isEvent() then checking against EndOfPartitionEvent were not used.

commit 626e10fc8e8b9ae148b82460117c090147961a4f
Author: Piotr Nowojski <pi...@...>
Date:   2018-02-27T09:39:00Z

    [FLINK-8750][runtime] Improve detection of no remaining data after EndOfPartitionEvent
    
    Because of race condition between:
      1. releasing inputChannelsWithData lock in this method and reaching this place
      2. empty data notification that re-enqueues a channel
    we can end up with moreAvailable flag set to true, while we expect no more data.
    
    This commit detects such situation, makes a correct assertion and turn off moreAvailable flag.

----


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171000868
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---
    @@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) {
     	 *
     	 * @param buffer the buffer to peak into
     	 * @param eventClass the expected class of the event type
    -	 * @param classLoader the class loader to use for custom event classes
     	 * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
     	 */
    -	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader classLoader) throws IOException {
    +	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass) throws IOException {
    --- End diff --
    
    this change qualifies for a separate JIRA ticket, not just a hotfix


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5588


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171001862
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---
    @@ -318,13 +295,9 @@ public static AbstractEvent fromBuffer(Buffer buffer, ClassLoader classLoader) t
     	 *
     	 * @param buffer the buffer to peak into
     	 * @param eventClass the expected class of the event type
    -	 * @param classLoader the class loader to use for custom event classes
     	 * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
     	 */
    -	public static boolean isEvent(final Buffer buffer,
    -		final Class<?> eventClass,
    -		final ClassLoader classLoader) throws IOException {
    -		return !buffer.isBuffer() &&
    -			isEvent(buffer.getNioBufferReadable(), eventClass, classLoader);
    +	public static boolean isEvent(Buffer buffer, Class<?> eventClass) throws IOException {
    +		return !buffer.isBuffer() && isEvent(buffer.getNioBufferReadable(), eventClass);
    --- End diff --
    
    similar here: add a comment, that checking for custom events is not supported anymore


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171004092
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---
    @@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception {
     	}
     
     	/**
    -	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns
    +	 * Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
     	 * the correct answer for various encoded event buffers.
     	 */
     	@Test
     	public void testIsEvent() throws Exception {
     		AbstractEvent[] events = {
     			EndOfPartitionEvent.INSTANCE,
    -			EndOfSuperstepEvent.INSTANCE,
     			new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
     			new TestTaskEvent(Math.random(), 12361231273L),
    -			new CancelCheckpointMarker(287087987329842L)
    +			new CancelCheckpointMarker(287087987329842L),
    +			EndOfSuperstepEvent.INSTANCE
    +		};
    +
    +		Class[] expectedClasses = {
    +			EndOfPartitionEvent.class,
    +			CheckpointBarrier.class,
    +			CancelCheckpointMarker.class,
    +			EndOfSuperstepEvent.class
    --- End diff --
    
    This extra array seems a bit error-prone and requires maintenance in case the events are extended - wouldn't it be equally clear if we used your new naming with the original array?


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171183475
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java ---
    @@ -553,6 +553,12 @@ public void requestPartitions() throws IOException, InterruptedException {
     				channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
     
     				if (channelsWithEndOfPartitionEvents.cardinality() == numberOfInputChannels) {
    +					// Because of race condition between:
    +					// 1. releasing inputChannelsWithData lock in this method and reaching this place
    +					// 2. empty data notification that re-enqueues a channel
    +					// we can end up with moreAvailable flag set to true, while we expect no more data.
    +					checkState(!moreAvailable || !pollNextBufferOrEvent().isPresent());
    +					moreAvailable = false;
    --- End diff --
    
    While this certainly fixes the `checkState(!bufferOrEvent.moreAvailable());` in the `UnionInputGate`, it does not improve the detection of additional data after the `EndOfPartitionEvent` too much. How about also adding `checkState(!pollNextBufferOrEvent().isPresent());` here:
    ```
    	private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
    		if (hasReceivedAllEndOfPartitionEvents) {
    			checkState(!pollNextBufferOrEvent().isPresent());
    			return Optional.empty();
    		}
    ```
    In that case, if we ever try to get more data (due to a data notification) there should be no actual data left and only empty buffers.


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171002283
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---
    @@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception {
     	}
     
     	/**
    -	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns
    +	 * Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
     	 * the correct answer for various encoded event buffers.
     	 */
     	@Test
     	public void testIsEvent() throws Exception {
     		AbstractEvent[] events = {
     			EndOfPartitionEvent.INSTANCE,
    -			EndOfSuperstepEvent.INSTANCE,
     			new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
     			new TestTaskEvent(Math.random(), 12361231273L),
    -			new CancelCheckpointMarker(287087987329842L)
    +			new CancelCheckpointMarker(287087987329842L),
    +			EndOfSuperstepEvent.INSTANCE
    +		};
    --- End diff --
    
    I wonder why the order of the events changed?


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171001535
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---
    @@ -107,10 +107,9 @@ else if (eventClass == CancelCheckpointMarker.class) {
     	 *
     	 * @param buffer the buffer to peak into
     	 * @param eventClass the expected class of the event type
    -	 * @param classLoader the class loader to use for custom event classes
     	 * @return whether the event class of the <tt>buffer</tt> matches the given <tt>eventClass</tt>
     	 */
    -	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoader classLoader) throws IOException {
    +	private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass) throws IOException {
    --- End diff --
    
    You should also add a comment, that checking for custom events is not supported.


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by pnowojski <gi...@git.apache.org>.
Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171214062
  
    --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializerTest.java ---
    @@ -117,49 +116,55 @@ public void testIsEventPeakOnly() throws Exception {
     	}
     
     	/**
    -	 * Tests {@link EventSerializer#isEvent(Buffer, Class, ClassLoader)} returns
    +	 * Tests {@link EventSerializer#isEvent(Buffer, Class)} returns
     	 * the correct answer for various encoded event buffers.
     	 */
     	@Test
     	public void testIsEvent() throws Exception {
     		AbstractEvent[] events = {
     			EndOfPartitionEvent.INSTANCE,
    -			EndOfSuperstepEvent.INSTANCE,
     			new CheckpointBarrier(1678L, 4623784L, CheckpointOptions.forCheckpointWithDefaultLocation()),
     			new TestTaskEvent(Math.random(), 12361231273L),
    -			new CancelCheckpointMarker(287087987329842L)
    +			new CancelCheckpointMarker(287087987329842L),
    +			EndOfSuperstepEvent.INSTANCE
    +		};
    --- End diff --
    
    Reverted


---

[GitHub] flink pull request #5588: [FLINK-8750][runtime] Improve detection of no rema...

Posted by NicoK <gi...@git.apache.org>.
Github user NicoK commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5588#discussion_r171001301
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/EventSerializer.java ---
    @@ -122,38 +121,16 @@ private static boolean isEvent(ByteBuffer buffer, Class<?> eventClass, ClassLoad
     		try {
     			int type = buffer.getInt();
     
    -			switch (type) {
    -				case END_OF_PARTITION_EVENT:
    -					return eventClass.equals(EndOfPartitionEvent.class);
    -				case CHECKPOINT_BARRIER_EVENT:
    -					return eventClass.equals(CheckpointBarrier.class);
    -				case END_OF_SUPERSTEP_EVENT:
    -					return eventClass.equals(EndOfSuperstepEvent.class);
    -				case CANCEL_CHECKPOINT_MARKER_EVENT:
    -					return eventClass.equals(CancelCheckpointMarker.class);
    -				case OTHER_EVENT:
    -					try {
    -						final DataInputDeserializer deserializer = new DataInputDeserializer(buffer);
    -						final String className = deserializer.readUTF();
    -
    -						final Class<? extends AbstractEvent> clazz;
    -						try {
    -							clazz = classLoader.loadClass(className).asSubclass(AbstractEvent.class);
    -						}
    -						catch (ClassNotFoundException e) {
    -							throw new IOException("Could not load event class '" + className + "'.", e);
    -						}
    -						catch (ClassCastException e) {
    -							throw new IOException("The class '" + className + "' is not a valid subclass of '"
    -								+ AbstractEvent.class.getName() + "'.", e);
    -						}
    -						return eventClass.equals(clazz);
    -					}
    -					catch (Exception e) {
    -						throw new IOException("Error while deserializing or instantiating event.", e);
    -					}
    -				default:
    -					throw new IOException("Corrupt byte stream for event");
    +			if (eventClass.equals(EndOfPartitionEvent.class)) {
    +				return type == END_OF_PARTITION_EVENT;
    +			} else if (eventClass.equals(CheckpointBarrier.class)) {
    +				return type == CHECKPOINT_BARRIER_EVENT;
    +			} else if (eventClass.equals(EndOfSuperstepEvent.class)) {
    +				return type == END_OF_SUPERSTEP_EVENT;
    +			} else if (eventClass.equals(CancelCheckpointMarker.class)) {
    +				return type == CANCEL_CHECKPOINT_MARKER_EVENT;
    +			} else {
    +				throw new IOException("Corrupt byte stream for event or unsupported eventClass = " + eventClass);
    --- End diff --
    
    Actually, this should be an `UnsupportedOperationException` since this is only based on the class being given and not the input stream.


---