You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Peter Ertl <pe...@gmx.net> on 2017/09/01 09:29:10 UTC

termination of stream#iterate on finite streams

Hi folks,

I was doing some experiments with DataStream#iterate and what felt strange to me is the fact that #iterate() does not terminate on it's own when consuming a _finite_ stream.

I think this is awkward und unexpected. Only thing that "helped" was setting an arbitrary and meaningless timeout on iterate.

Imho this should not be necessary (maybe sent an internal "poison message" downward the iteration stream to signal shutdown of the streaming task?)

example:

// ---------------------------------------------------
// does terminate by introducing a meaningless timeout
// ---------------------------------------------------
val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump meaningless 'x' chars just to do anything
}, 1000, keepPartitioning = false)

iterationResult1.print()

// ---------------------------------------------------
// does NEVER terminate
// ---------------------------------------------------
val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
  (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
})
iterationResult2.print()

Can someone elaborate on this - should I file a ticket?

Regards
Peter

Re: termination of stream#iterate on finite streams

Posted by Gábor Gévay <gg...@gmail.com>.
Hello,

There is a Flink Improvement Proposal to redesign the iterations:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132
This will address the termination issue.

Best,
Gábor





On Mon, Sep 4, 2017 at 11:00 AM, Xingcan Cui <xi...@gmail.com> wrote:
> Hi Peter,
>
> That's a good idea, but may not be applicable with an iteration operator.
> The operator can
> not determine when to generate the "end-of-stream message" for the feedback
> stream.
> The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has
> no side-effects.
>
> Best,
> Xingcan
>
>
>
> On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl <pe...@gmx.net> wrote:
>>
>> Hi Xingcan!
>>
>> if a _finite_ stream would, at the end, emit a special, trailing
>> "End-Of-Stream Message" that floats downward the operator stream, wouldn't
>> this enable us to deterministically end the iteration without needing a
>> timeout?
>>
>> Having an arbitrary timeout that must be longer than any iteration step
>> takes seems really awkward.
>>
>> What you think?
>>
>> Best regards
>> Peter
>>
>>
>> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xi...@gmail.com>:
>>
>> Hi Peter,
>>
>> I just omitted the filter part. Sorry for that.
>>
>> Actually, as the javadoc explained, by default a DataStream with iteration
>> will never terminate. That's because in a
>> stream environment with iteration, the operator will never know whether
>> the feedback stream has reached its end
>> (though the data source is terminated, there may be unknowable subsequent
>> data) and that's why it needs a
>> timeout value to make the judgement, just like many other function calls
>> in network connection. In other words,
>> you know the feedback stream will be empty in the future, but the operator
>> doesn't. Thus we provide it a maximum
>> waiting time for the next record.
>>
>> Internally, this mechanism is implemented via a blocking queue (the
>> related code can be found here).
>>
>> Hope everything is considered this time : )
>>
>> Best,
>> Xingcan
>>
>> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <pe...@gmx.net> wrote:
>>>
>>>
>>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xi...@gmail.com>:
>>>
>>> In your codes, all the the long values will subtract 1 and be sent back
>>> to the iterate operator, endlessly.
>>>
>>>
>>>
>>> Is this true? shouldn't
>>>
>>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump
>>> meaningless 'y' chars just to do anything
>>>   })
>>>   iterationResult2.print()
>>>
>>>
>>> produce the following _feedback_ streams?
>>>
>>> initial input to #iterate(): [1 2 3 4]
>>>
>>> iteration #1 : [1 2 3]
>>> iteration #2 : [1 2]
>>> iteration #3 : [1]
>>> iteration #4 : []  => empty feedback stream => cause termination? (which
>>> actually only happens when setting a timeout value)
>>>
>>> Best regards
>>> Peter
>>
>>
>>
>> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xi...@gmail.com>:
>>
>> Hi Peter,
>>
>> I just omitted the filter part. Sorry for that.
>>
>> Actually, as the javadoc explained, by default a DataStream with iteration
>> will never terminate. That's because in a
>> stream environment with iteration, the operator will never know whether
>> the feedback stream has reached its end
>> (though the data source is terminated, there may be unknowable subsequent
>> data) and that's why it needs a
>> timeout value to make the judgement, just like many other function calls
>> in network connection. In other words,
>> you know the feedback stream will be empty in the future, but the operator
>> doesn't. Thus we provide it a maximum
>> waiting time for the next record.
>>
>> Internally, this mechanism is implemented via a blocking queue (the
>> related code can be found here).
>>
>> Hope everything is considered this time : )
>>
>> Best,
>> Xingcan
>>
>> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <pe...@gmx.net> wrote:
>>>
>>>
>>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xi...@gmail.com>:
>>>
>>> In your codes, all the the long values will subtract 1 and be sent back
>>> to the iterate operator, endlessly.
>>>
>>>
>>>
>>> Is this true? shouldn't
>>>
>>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump
>>> meaningless 'y' chars just to do anything
>>>   })
>>>   iterationResult2.print()
>>>
>>>
>>> produce the following _feedback_ streams?
>>>
>>> initial input to #iterate(): [1 2 3 4]
>>>
>>> iteration #1 : [1 2 3]
>>> iteration #2 : [1 2]
>>> iteration #3 : [1]
>>> iteration #4 : []  => empty feedback stream => cause termination? (which
>>> actually only happens when setting a timeout value)
>>>
>>> Best regards
>>> Peter
>>>
>>>
>>
>>
>

Re: termination of stream#iterate on finite streams

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Peter,

That's a good idea, but may not be applicable with an iteration operator.
The operator can
not determine when to generate the "end-of-stream message" for the feedback
stream.
The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has
no side-effects.

Best,
Xingcan



On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl <pe...@gmx.net> wrote:

> Hi Xingcan!
>
> if a _finite_ stream would, at the end, emit a special, trailing
> "End-Of-Stream Message" that floats downward the operator stream, wouldn't
> this enable us to deterministically end the iteration without needing a
> timeout?
>
> Having an arbitrary timeout that must be longer than any iteration step
> takes seems really awkward.
>
> What you think?
>
> Best regards
> Peter
>
>
> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xi...@gmail.com>:
>
> Hi Peter,
>
> I just omitted the filter part. Sorry for that.
>
> Actually, as the javadoc explained, by default a DataStream with iteration
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether
> the feedback stream has reached its end
> (though the data source is terminated, *there may
> be unknowable subsequent data*) and that's why it needs a
> timeout value to make the judgement, just like many other function calls
> in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
>
> Internally, this mechanism is implemented via a blocking queue (the
> related code can be found here
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
> ).
>
> Hope everything is considered this time : )
>
> Best,
> Xingcan
>
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <pe...@gmx.net> wrote:
>
>>
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xi...@gmail.com>:
>>
>> In your codes, all the the long values will subtract 1 and be sent back
>> to the iterate operator, endlessly.
>>
>>
>>
>> Is this true? shouldn't
>>
>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
>>   })
>>   iterationResult2.print()
>>
>>
>> produce the following _feedback_ streams?
>>
>> initial input to #iterate(): [1 2 3 4]
>>
>> iteration #1 : [1 2 3]
>> iteration #2 : [1 2]
>> iteration #3 : [1]
>> iteration #4 : []  => empty feedback stream => cause termination? (which
>> actually only happens when setting a timeout value)
>>
>> Best regards
>> Peter
>>
>
>
> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xi...@gmail.com>:
>
> Hi Peter,
>
> I just omitted the filter part. Sorry for that.
>
> Actually, as the javadoc explained, by default a DataStream with iteration
> will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether
> the feedback stream has reached its end
> (though the data source is terminated, *there may
> be unknowable subsequent data*) and that's why it needs a
> timeout value to make the judgement, just like many other function calls
> in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator
> doesn't. Thus we provide it a maximum
> waiting time for the next record.
>
> Internally, this mechanism is implemented via a blocking queue (the
> related code can be found here
> <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
> ).
>
> Hope everything is considered this time : )
>
> Best,
> Xingcan
>
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <pe...@gmx.net> wrote:
>
>>
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xi...@gmail.com>:
>>
>> In your codes, all the the long values will subtract 1 and be sent back
>> to the iterate operator, endlessly.
>>
>>
>>
>> Is this true? shouldn't
>>
>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
>>   })
>>   iterationResult2.print()
>>
>>
>> produce the following _feedback_ streams?
>>
>> initial input to #iterate(): [1 2 3 4]
>>
>> iteration #1 : [1 2 3]
>> iteration #2 : [1 2]
>> iteration #3 : [1]
>> iteration #4 : []  => empty feedback stream => cause termination? (which
>> actually only happens when setting a timeout value)
>>
>> Best regards
>> Peter
>>
>>
>>
>
>

Re: termination of stream#iterate on finite streams

Posted by Peter Ertl <pe...@gmx.net>.
Hi Xingcan!

if a _finite_ stream would, at the end, emit a special, trailing "End-Of-Stream Message" that floats downward the operator stream, wouldn't this enable us to deterministically end the iteration without needing a timeout?

Having an arbitrary timeout that must be longer than any iteration step takes seems really awkward.

What you think?

Best regards
Peter


> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>>:
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related code can be found here <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.ertl@gmx.net <ma...@gmx.net>> wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>>:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which actually only happens when setting a timeout value)
> 
> Best regards
> Peter



> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xi...@gmail.com>:
> 
> Hi Peter,
> 
> I just omitted the filter part. Sorry for that.
> 
> Actually, as the javadoc explained, by default a DataStream with iteration will never terminate. That's because in a
> stream environment with iteration, the operator will never know whether the feedback stream has reached its end
> (though the data source is terminated, there may be unknowable subsequent data) and that's why it needs a
> timeout value to make the judgement, just like many other function calls in network connection. In other words,
> you know the feedback stream will be empty in the future, but the operator doesn't. Thus we provide it a maximum
> waiting time for the next record.
> 
> Internally, this mechanism is implemented via a blocking queue (the related code can be found here <https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>).
> 
> Hope everything is considered this time : )
> 
> Best,
> Xingcan
> 
> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.ertl@gmx.net <ma...@gmx.net>> wrote:
> 
>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>>:
>> 
>> In your codes, all the the long values will subtract 1 and be sent back to the iterate operator, endlessly.
> 
> 
> Is this true? shouldn't
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
> 
> produce the following _feedback_ streams?
> 
> initial input to #iterate(): [1 2 3 4]
> 
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which actually only happens when setting a timeout value)
> 
> Best regards
> Peter
> 
> 
> 


Re: termination of stream#iterate on finite streams

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Peter,

I just omitted the filter part. Sorry for that.

Actually, as the javadoc explained, by default a DataStream with iteration
will never terminate. That's because in a
stream environment with iteration, the operator will never know whether the
feedback stream has reached its end
(though the data source is terminated, *there may be unknowable subsequent
data*) and that's why it needs a
timeout value to make the judgement, just like many other function calls in
network connection. In other words,
you know the feedback stream will be empty in the future, but the operator
doesn't. Thus we provide it a maximum
waiting time for the next record.

Internally, this mechanism is implemented via a blocking queue (the related
code can be found here
<https://github.com/apache/flink/blob/12b4185c6c09101b64e12a84c33dc4d28f95cff9/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java#L80>
).

Hope everything is considered this time : )

Best,
Xingcan

On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <pe...@gmx.net> wrote:

>
> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xi...@gmail.com>:
>
> In your codes, all the the long values will subtract 1 and be sent back to
> the iterate operator, endlessly.
>
>
>
> Is this true? shouldn't
>
>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
>   })
>   iterationResult2.print()
>
>
> produce the following _feedback_ streams?
>
> initial input to #iterate(): [1 2 3 4]
>
> iteration #1 : [1 2 3]
> iteration #2 : [1 2]
> iteration #3 : [1]
> iteration #4 : []  => empty feedback stream => cause termination? (which
> actually only happens when setting a timeout value)
>
> Best regards
> Peter
>
>
>

Re: termination of stream#iterate on finite streams

Posted by Peter Ertl <pe...@gmx.net>.
> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xi...@gmail.com>:
> 
> In your codes, all the the long values will subtract 1 and be sent back to the iterate operator, endlessly.


Is this true? shouldn't
  val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
    (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
  })
  iterationResult2.print()

produce the following _feedback_ streams?

initial input to #iterate(): [1 2 3 4]

iteration #1 : [1 2 3]
iteration #2 : [1 2]
iteration #3 : [1]
iteration #4 : []  => empty feedback stream => cause termination? (which actually only happens when setting a timeout value)

Best regards
Peter



Re: termination of stream#iterate on finite streams

Posted by Xingcan Cui <xi...@gmail.com>.
Hi Peter,

Let me try to explain this.

As you shown in the examples, the iterate method takes a function, which
"split" the initial stream
into two separate streams, i.e., initialStream => (stream1, stream2). The
stream2 works as the output
stream, whose results will be emitted to the successor operators (PrintSink
in your example), while
the stream1 works as a feedback stream, whose results will be resent to the
iterate operator.

In your codes, all the the long values will subtract 1 and be sent back to
the iterate operator, endlessly.
Try replacing your first map function to (_ + 1) and you'll see the
infinite results. For more information,
you can refer to this
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/datastream_api.html#iterations>
or
read the javadoc.

Hope that helps.

Best,
Xingcan

On Fri, Sep 1, 2017 at 5:29 PM, Peter Ertl <pe...@gmx.net> wrote:

> Hi folks,
>
> I was doing some experiments with DataStream#iterate and what felt strange
> to me is the fact that #iterate() does not terminate on it's own when
> consuming a _finite_ stream.
>
> I think this is awkward und unexpected. Only thing that "helped" was
> setting an arbitrary and meaningless timeout on iterate.
>
> Imho this should not be necessary (maybe sent an internal "poison message"
> downward the iteration stream to signal shutdown of the streaming task?)
>
> example:
>
> // ---------------------------------------------------
>
> // does terminate by introducing a meaningless timeout
> // ---------------------------------------------------
> val iterationResult1 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'x')) // dump meaningless 'x' chars just to do anything
> }, 1000, keepPartitioning = false)
>
> iterationResult1.print()
>
> // ---------------------------------------------------
> // does NEVER terminate
> // ---------------------------------------------------
> val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>   (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) // dump meaningless 'y' chars just to do anything
> })
> iterationResult2.print()
>
>
> Can someone elaborate on this - should I file a ticket?
>
> Regards
> Peter
>