You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by wangzhijiang999 <gi...@git.apache.org> on 2016/06/21 09:17:07 UTC

[GitHub] flink pull request #2141: [flink-4021] Problem of setting autoread for netty...

GitHub user wangzhijiang999 opened a pull request:

    https://github.com/apache/flink/pull/2141

    [flink-4021] Problem of setting autoread for netty channel when more ta\u2026

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed
    
    \u2026sks sharing the same Tcp connection

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/wangzhijiang999/flink master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2141.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2141
    
----
commit 7941150cc153d694b543dc92a36cc089eafcdbac
Author: \u6dd8\u6c5f <ta...@alibaba-inc.com>
Date:   2016-06-21T09:13:37Z

    flink-4021:Problem of setting autoread for netty channel when more tasks sharing the same Tcp connection

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2141: [FLINK-4021] Problem of setting autoread for netty channe...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2141
  
    Very impressive that you made it through that part of the system. It's very poorly documented and overly complex. The change looks good and ensures that a staged buffer that cannot be decoded because of a closed buffer pool does not leave the channel with auto read set to false. This is currently not a problem as all tasks of the job are failed, but with partial recovery this will lead to problems if the channel is kept alive by other consuming tasks.
    
    We could add the following as a test for this in `PartitionRequestClientHandlerTest`. What do you think? This covers both branches that you added.
    
    ```java
    /**
     * Tests that an unsuccessful message decode call for a staged message
     * does not leave the channel with auto read set to false.
     */
    @Test
    @SuppressWarnings("unchecked")
    public void testAutoReadAfterUnsuccessfulStagedMessage() throws Exception {
    	PartitionRequestClientHandler handler = new PartitionRequestClientHandler();
    	EmbeddedChannel channel = new EmbeddedChannel(handler);
    
    	final AtomicReference<EventListener<Buffer>> listener = new AtomicReference<>();
    
    	BufferProvider bufferProvider = mock(BufferProvider.class);
    	when(bufferProvider.addListener(any(EventListener.class))).thenAnswer(new Answer<Boolean>() {
    		@Override
    		@SuppressWarnings("unchecked")
    		public Boolean answer(InvocationOnMock invocation) throws Throwable {
    			listener.set((EventListener<Buffer>) invocation.getArguments()[0]);
    			return true;
    		}
    	});
    
    	when(bufferProvider.requestBuffer()).thenReturn(null);
    
    	InputChannelID channelId = new InputChannelID(0, 0);
    	RemoteInputChannel inputChannel = mock(RemoteInputChannel.class);
    	when(inputChannel.getInputChannelId()).thenReturn(channelId);
    
    	handler.addInputChannel(inputChannel);
    
    	BufferResponse msg = createBufferResponse(channelId, channel);
    
    	// Write 1st buffer msg. No buffer is available, therefore the buffer
    	// should be staged and auto read should be set to false.
    	assertTrue(channel.config().isAutoRead());
    	channel.writeInbound(msg);
    
    	// No buffer available, auto read false
    	assertFalse(channel.config().isAutoRead());
    
    	// Write more buffers... all staged.
    	msg = createBufferResponse(channelId, channel);
    	channel.writeInbound(msg);
    
    	msg = createBufferResponse(channelId, channel);
    	channel.writeInbound(msg);
    
    	// Notify about buffer => handle 1st msg
    	Buffer availableBuffer = createBuffer(false);
    	listener.get().onEvent(availableBuffer);
    
    	// Start processing of staged buffers (in run pending tasks). Make
    	// sure that the buffer provider acts like it's destroyed.
    	when(bufferProvider.addListener(any(EventListener.class))).thenReturn(false);
    	when(bufferProvider.isDestroyed()).thenReturn(true);
    
    	// The 3rd staged msg has a null buffer provider
    	when(inputChannel.getBufferProvider()).thenReturn(bufferProvider, bufferProvider, null);
    
    	// Execute all tasks that are scheduled in the event loop. Further
    	// eventLoop().execute() calls are directly executed, if they are
    	// called in the scope of this call.
    	channel.runPendingTasks();
    
    	assertTrue(channel.config().isAutoRead());
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2141: [FLINK-4021] Problem of setting autoread for netty...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2141#discussion_r74760105
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java ---
    @@ -292,7 +296,11 @@ else if (bufferListener.waitForBuffer(bufferProvider, bufferOrEvent)) {
     						return false;
     					}
     					else if (bufferProvider.isDestroyed()) {
    -						return false;
    --- End diff --
    
    We usually have a white space between keywords like `if` or `else`:
    ```java
    if (isStagedBuffer) {
        return true;
    } else {
        return false;
    }
    ```
    
    In this case, you can simplify the return value to `return isStagedBuffer`. Same for the other place where you use this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2141: [FLINK-4021] Problem of setting autoread for netty channe...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2141
  
    Thank you for this PR. I will try to look into it next week. I think we should wait for the 1.1 release before we merge this though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2141: [FLINK-4021] Problem of setting autoread for netty channe...

Posted by uce <gi...@git.apache.org>.
Github user uce commented on the issue:

    https://github.com/apache/flink/pull/2141
  
    I'm going to address the comments and merge this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2141: [FLINK-4021] Problem of setting autoread for netty...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/2141


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---