You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Aljoscha Krettek (JIRA)" <ji...@apache.org> on 2017/05/03 13:27:04 UTC

[jira] [Closed] (FLINK-6413) Add stream operator callback to notify about consumed network buffer

     [ https://issues.apache.org/jira/browse/FLINK-6413?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Aljoscha Krettek closed FLINK-6413.
-----------------------------------
    Resolution: Won't Fix

Closing in favour of a different solution (buffer-level processing)

> Add stream operator callback to notify about consumed network buffer 
> ---------------------------------------------------------------------
>
>                 Key: FLINK-6413
>                 URL: https://issues.apache.org/jira/browse/FLINK-6413
>             Project: Flink
>          Issue Type: Improvement
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>
> This is originally motivated by BEAM-1612. Beam has the notion of bundles and allows users to do work at the start/end of each bundle. This could be used for setting up some expensive connection or for batching accesses to some external system. There is also internal optimisation potential because accesses/updates to state could be kept in-memory per bundle/buffer and only afterwards be written to fault-tolerant state.
> The bundling induced by the Flink network stack (which depends on the network buffer size and the buffer timeout) seems like a natural fit for this. I propose to add an _experimental_ interface {{BufferConsumedListener}} (or some such name):
> {code}
> interface BufferConsumedListener {
>   void notifyBufferConsumed():
> }
> {code}
> that is invoked in the input processor whenever a network buffer is exhausted: https://github.com/apache/flink/blob/922352ac35f3753334e834632e3e361fbd36336e/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java#L178-L178
> The change is very simple, three lines of code would be added:
> {code}
> if (result.isBufferConsumed()) {
>   currentRecordDeserializer.getCurrentBuffer().recycle();
>   currentRecordDeserializer = null;
>   if (streamOperator instanceof BufferConsumedListener) {
>     ((BufferConsumedListener) streamOperator).notifyBufferConsumed():
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)