You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/10/10 09:18:00 UTC

[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

    [ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16198404#comment-16198404 ] 

ASF GitHub Bot commented on FLINK-7416:
---------------------------------------

Github user zhijiangW commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4533#discussion_r143671614
  
    --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java ---
    @@ -37,20 +43,31 @@
     
     import java.io.IOException;
     import java.net.SocketAddress;
    +import java.util.ArrayDeque;
     import java.util.concurrent.ConcurrentHashMap;
     import java.util.concurrent.ConcurrentMap;
     import java.util.concurrent.atomic.AtomicReference;
     
    +/**
    + * Channel handler to read {@link BufferResponse} and {@link ErrorResponse} messages from the
    + * producer, to write and flush {@link AddCredit} message for the producer.
    + */
     class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
     
     	private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class);
     
    +	/** Channels, which already requested partitions from the producers. */
     	private final ConcurrentMap<InputChannelID, RemoteInputChannel> inputChannels = new ConcurrentHashMap<>();
     
    +	/** Channels, which will notify the producers about unannounced credit. */
    +	private final ArrayDeque<RemoteInputChannel> inputChannelsWithCredit = new ArrayDeque<>();
    +
     	private final AtomicReference<Throwable> channelError = new AtomicReference<>();
     
    +	private final ChannelFutureListener writeListener = new WriteAndFlushNextMessageIfPossibleListener();
    +
     	/**
    -	 * Set of cancelled partition requests. A request is cancelled iff an input channel is cleared
    +	 * Set of cancelled partition requests. A request is cancelled if an input channel is cleared
    --- End diff --
    
    got it, already restored the original writing.


> Implement Netty receiver outgoing pipeline for credit-based
> -----------------------------------------------------------
>
>                 Key: FLINK-7416
>                 URL: https://issues.apache.org/jira/browse/FLINK-7416
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Network
>            Reporter: zhijiang
>            Assignee: zhijiang
>             Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and contain as much credit as available for the channel at that point in time. Otherwise, it would only add latency to the announcements and not increase throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)