You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stephan Ewen (JIRA)" <ji...@apache.org> on 2017/04/28 14:45:04 UTC

[jira] [Commented] (FLINK-6413) Add stream operator callback to notify about consumed network buffer

    [ https://issues.apache.org/jira/browse/FLINK-6413?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15988933#comment-15988933 ] 

Stephan Ewen commented on FLINK-6413:
-------------------------------------

Not sure about this one. Seems quite complex to model a mechanism that is not needed by Flink (and exists in Beam because some other runtimes may need it).
This sounds to me like making the runtime more complex to match an interface that was designed for a different runtime.

An alternative thought: We had been thinking a while back that we should offer an interface to operators to {{processBuffer}} rather than {{processElement}}.
I think that would be more natural than a buffer consumption listener.

Also, given that the network stack needs some quite crucial other fixes along latency and stream alignments, I am very skeptical to introduce such added complexity unless we have a strong case that some users need that.

> Add stream operator callback to notify about consumed network buffer 
> ---------------------------------------------------------------------
>
>                 Key: FLINK-6413
>                 URL: https://issues.apache.org/jira/browse/FLINK-6413
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>
> This is originally motivated by BEAM-1612. Beam has the notion of bundles and allows users to do work at the start/end of each bundle. This could be used for setting up some expensive connection or for batching accesses to some external system. There is also internal optimisation potential because accesses/updates to state could be kept in-memory per bundle/buffer and only afterwards be written to fault-tolerant state.
> The bundling induced by the Flink network stack (which depends on the network buffer size and the buffer timeout) seems like a natural fit for this. I propose to add an _experimental_ interface {{BufferConsumedListener}} (or some such name):
> {code}
> interface BufferConsumedListener {
>   void notifyBufferConsumed():
> }
> {code}
> that is invoked in the input processor whenever a network buffer is exhausted: https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L178-L178
> The change is very simple, three lines of code would be added:
> {code}
> if (result.isBufferConsumed()) {
>   currentRecordDeserializer.getCurrentBuffer().recycle();
>   currentRecordDeserializer = null;
>   if (streamOperator instanceof BufferConsumedListener) {
>     ((BufferConsumedListener) streamOperator).notifyBufferConsumed():
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)