You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Andrea Spina <an...@radicalbit.io> on 2017/03/23 16:56:37 UTC

Cogrouped Stream never triggers tumbling event time window

Dear Community,

I'm really struggling on a co-grouped stream. The workload is the following:

*
    val firstStream: DataStream[FirstType] =
      firstRaw.assignTimestampsAndWatermarks(new
MyCustomFirstExtractor(maxOutOfOrder))

    val secondStream: DataStream[SecondType] = secondRaw
      .assignTimestampsAndWatermarks(new
MyCustomSecondExtractor(maxOutOfOrder))
      .map(new toSecondsStreamMapper())
*

where both the Extractors extend BoundedOutOfOrdernessTimestampExtractor by
overriding the extractTimestamp method and assigning timestamps owned
respectively by FirstType and SecondType objects.

*override def extractTimestamp(first: FirstType): Long = first.timestamp*

Then I'm calling cogroup as follows

*

val stockDetails = firstStream
      .coGroup(secondStream)
      .where(_.id)
      .equalTo(_.id)
      .window(TumblingEventTimeWindows.of(Time.seconds(1)))
      .apply(new MyCogroupFunction())
      .uid("myCogroup")
      .name("My CoGroup")

*

The problem is the CoGroup function is never triggered. I did several tests
and I was not able to solve it at all. 
The first relevant point is that event time can be seriously out-of-order. I
can even bump into 0 timestamp. Then I faked also timestamps in order to
distribute them in a set of two seconds, five seconds, so forth. These tries
didn't change at all the behavior: no one window is raised.

Another relevant is: I'm running locally by reading from a pre-loaded kafka
topic, then all the events are ridden sequentially at startup.

I will give a couple example

Workload 1 (faked timestamps)
fields (id, timestamp)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280129517)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131191)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131214)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131250)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131294)
FirstType(9781783433803 ,1490280131328)
FirstType(9781783433803 ,1490280131328)

SecondType(9781783433803,1490280130465)
SecondType(9781783433803,1490280131027)
SecondType(9781783433803,1490280131051)
SecondType(9781783433803,1490280131070)
SecondType(9781783433803,1490280131085)
SecondType(9781783433803,1490280131103)
SecondType(9781783433803,1490280131124)
SecondType(9781783433803,1490280131143)
SecondType(9781783433803,1490280131158)
SecondType(9781783433803,1490280131175)

Workload 2 (real case timestamps)

> FirstType(9781783433803, 1490172958602)
1> FirstType(9781783433803, ,1490172958611)
1> FirstType(9781783433803, 1490172958611)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803, 1490172958620)
1> FirstType(9781783433803 ,1490196171869)
1> FirstType(9781783433803, 1490196171869)

SecondType(9781783433803 ,0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 0)
SecondType(9781783433803, 1488834670490)
SecondType(9781783433803, 1489577984143)
SecondType(9781783433803, 1489689399726)
SecondType(9781783433803, 1489689399726)

I confirm that I have healthy incoming streams at the entrance of the
coGroup operator.
I think I'm likely missing something easy.

Any help will be really appreciated.

Sincerly,

Andrea




--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Cogrouped Stream never triggers tumbling event time window

Posted by Andrea Spina <an...@radicalbit.io>.
Sorry, I forgot to put the Flink version. 1.1.2

Thanks, Andrea





--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12374.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Cogrouped Stream never triggers tumbling event time window

Posted by Aljoscha Krettek <al...@apache.org>.
Hi,
thanks for letting us know! And sorry that you didn’t get any response from the community, I myself just got back from vacation so I’m only now catching up on mail.

Best,
Aljoscha
> On 30. Mar 2017, at 18:24, Andrea Spina <an...@radicalbit.io> wrote:
> 
> Dear community,
> 
> I finally solved the issue i was bumped into.
> Basically the reason of the encountered problem was the behavior of my
> input: incoming rates were so far different in behavior (really late and
> scarce presence of second type event in event time).
> 
> The solution I employed was to assign timestamps and watermarks to the
> source stream just before splitting it into my first type and second type
> handled streams. I suppose this solved my problem due to EventTimeTrigger
> .getCurrentWatermark() method, which I think it returns the minimum
> watermark between the streams scoped by the TriggerContext. So the window
> was hanging because of the incoming rate behavior of the second type stream.
> 
> Hope it could help someone in the future.
> 
> Cheers,
> 
> Andrea
> 
> 
> 
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12468.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.


Re: Cogrouped Stream never triggers tumbling event time window

Posted by Andrea Spina <an...@radicalbit.io>.
Dear community,

I finally solved the issue i was bumped into.
Basically the reason of the encountered problem was the behavior of my
input: incoming rates were so far different in behavior (really late and
scarce presence of second type event in event time).

The solution I employed was to assign timestamps and watermarks to the
source stream just before splitting it into my first type and second type
handled streams. I suppose this solved my problem due to EventTimeTrigger
.getCurrentWatermark() method, which I think it returns the minimum
watermark between the streams scoped by the TriggerContext. So the window
was hanging because of the incoming rate behavior of the second type stream.

Hope it could help someone in the future.

Cheers,

Andrea



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Cogrouped-Stream-never-triggers-tumbling-event-time-window-tp12373p12468.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.