You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Adam Warski <ad...@warski.org> on 2016/08/06 10:33:18 UTC
Flink 1.1 event-time windowing changes from 1.0.3
Hello,
I have a very simple stream where I window data using event-time.
As a data source I’m using a CSV file, sorted by increasing timestamps.
Here’s the source:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val lines = env.readTextFile(csvFileName)
lines
.flatMap { l => parseLine(l) }
.assignAscendingTimestamps(t => t.timestampSeconds * 1000L)
.keyBy(t => t.key)
.timeWindow(Time.minutes(30), Time.minutes(5))
.fold(0)((c, _) => c+1)
.addSink { c =>
println(c)
}
env.execute()
This used to work fine in 1.0.3, that is the aggregate counts are printed to stdout.
However after updating to 1.1, nothing happens - I can see the stages being initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but then immediately going to FINISHED, without printing anything out.
If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can see the data flowing - so data *is* being read, just somehow the windowing causes it to be lost?
Any ideas on where to look for possible causes?
Thanks!
--
Adam Warski
http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>
Re: Flink 1.1 event-time windowing changes from 1.0.3
Posted by Adam Warski <ad...@warski.org>.
Thanks! I’ll be watching that issue then
Adam
> On 08 Aug 2016, at 05:01, Aljoscha Krettek <al...@apache.org> wrote:
>
> Hi Adam,
> sorry for the inconvenience. This is caused by a new file read operator, specifically how it treats watermarks/timestamps. I opened an issue here that describes the situation: https://issues.apache.org/jira/browse/FLINK-4329 <https://issues.apache.org/jira/browse/FLINK-4329>.
>
> I think this should be fixed for an upcoming 1.1.1 bug fixing release.
>
> Cheers,
> Aljoscha
>
> On Sat, 6 Aug 2016 at 12:33 Adam Warski <adam@warski.org <ma...@warski.org>> wrote:
> Hello,
>
> I have a very simple stream where I window data using event-time.
> As a data source I’m using a CSV file, sorted by increasing timestamps.
>
> Here’s the source:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val lines = env.readTextFile(csvFileName)
>
> lines
> .flatMap { l => parseLine(l) }
> .assignAscendingTimestamps(t => t.timestampSeconds * 1000L)
> .keyBy(t => t.key)
> .timeWindow(Time.minutes(30), Time.minutes(5))
> .fold(0)((c, _) => c+1)
> .addSink { c =>
> println(c)
> }
>
> env.execute()
>
> This used to work fine in 1.0.3, that is the aggregate counts are printed to stdout.
>
> However after updating to 1.1, nothing happens - I can see the stages being initialized (switching state from SCHEDULED to DEPLOYING to RUNNING), but then immediately going to FINISHED, without printing anything out.
>
> If I add a .map {x => println(x); x} after .assignAscendingTimestamps I can see the data flowing - so data *is* being read, just somehow the windowing causes it to be lost?
>
> Any ideas on where to look for possible causes?
>
> Thanks!
>
> --
> Adam Warski
>
> http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
> http://www.softwaremill.com <http://www.softwaremill.com/>
> http://www.warski.org <http://www.warski.org/>
--
Adam Warski
http://twitter.com/#!/adamwarski <http://twitter.com/#!/adamwarski>
http://www.softwaremill.com <http://www.softwaremill.com/>
http://www.warski.org <http://www.warski.org/>
Re: Flink 1.1 event-time windowing changes from 1.0.3
Posted by Aljoscha Krettek <al...@apache.org>.
Hi Adam,
sorry for the inconvenience. This is caused by a new file read operator,
specifically how it treats watermarks/timestamps. I opened an issue here
that describes the situation:
https://issues.apache.org/jira/browse/FLINK-4329.
I think this should be fixed for an upcoming 1.1.1 bug fixing release.
Cheers,
Aljoscha
On Sat, 6 Aug 2016 at 12:33 Adam Warski <ad...@warski.org> wrote:
> Hello,
>
> I have a very simple stream where I window data using event-time.
> As a data source I’m using a CSV file, sorted by increasing timestamps.
>
> Here’s the source:
>
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
> val lines = env.readTextFile(csvFileName)
>
> lines
> .flatMap { l => parseLine(l) }
> .assignAscendingTimestamps(t => t.timestampSeconds * 1000L)
> .keyBy(t => t.key)
> .timeWindow(Time.minutes(30), Time.minutes(5))
> .fold(0)((c, _) => c+1)
> .addSink { c =>
> println(c)
> }
>
> env.execute()
>
> This used to work fine in 1.0.3, that is the aggregate counts are printed
> to stdout.
>
> However after updating to 1.1, nothing happens - I can see the stages
> being initialized (switching state from SCHEDULED to DEPLOYING to RUNNING),
> but then immediately going to FINISHED, without printing anything out.
>
> If I add a .map {x => println(x); x} after .assignAscendingTimestamps I
> can see the data flowing - so data *is* being read, just somehow the
> windowing causes it to be lost?
>
> Any ideas on where to look for possible causes?
>
> Thanks!
>
> --
> Adam Warski
>
> http://twitter.com/#!/adamwarski
> http://www.softwaremill.com
> http://www.warski.org
>
>