You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Nirmalya Sengupta <se...@gmail.com> on 2015/12/26 18:01:45 UTC

Explanation of the output of timeWindowAll(Time.milliseconds(3))

Hello Fabian <fh...@gmail.com>

Merry Christmas to you and everyone else in this forum.

Another neophyte's question, patience please.

I have following code:

    val env = StreamExecutionEnvironment.createLocalEnvironment(1)

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val readings =
      readIncomingReadings(env,"./sampleIOT.csv")
      .map(e => (e.timeStamp,e.ambientTemperature))
      .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
      .timeWindowAll(Time.milliseconds(3))
      .maxBy(1)


In the datafile, timestamps are 2nd from the right field (first few records
only):

probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
probe-24444323,197,816.06,84.0816,1448028161,4.405
probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
probe-20c609fb,204,804.37,84.5243,1448028161,22.87
probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
probe-960906ca,197,797.63,77.4359,1448028162,27.62
probe-16226f9e,199,835.5,81.2027,1448028162,18.82
probe-4de4e64b,200,851.04,80.5296,1448028162,27.43
.......


The output is:

(1448028163,27.83)
(1448028166,32.06)
(1448028160,30.02)

The contents are correct, but I am not sure about the *order in which they
appear*. Because I am using

val env = StreamExecutionEnvironment.createLocalEnvironment(1)  // only one
thread anyway


and the timestamps are guaranteed to be in the *ascending order* (I have
sorted the CSV before using it), my expectation is that the Flink should
print the output as:

(1448028160,30.02)

(1448028163,27.83)

(1448028166,32.06)

How do I explain the randomness?

-- Nirmalya

-- 
Software Technologist
http://www.linkedin.com/in/nirmalyasengupta
"If you have built castles in the air, your work need not be lost. That is
where they should be.
Now put the foundation under them."

Re: Explanation of the output of timeWindowAll(Time.milliseconds(3))

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Nirmalya,

event time events (such as an event time trigger to compute a window) are
triggered when a watermark is received that is larger than the triggers
timestamp. By default, watermarks are emitted with a fixed time interval,
i.e., every x milliseconds. When a new watermark is emitted, Flink asks for
the currently valid watermark value. If a window operator receives a
watermark that closes multiple windows, the order in which the windows are
computed is random.

In your case, you are reading data from a file which is very fast and
several windows are completed when the first watermark is received. The
order in which these windows are computed an their events returned is
random.

You can configure the watermark interval
with ExecutionConfig.setAutoWatermarkInterval(long milliseconds)).
Alternatively, you can implement a source function that emits watermarks by
itself.

Best,
Fabian



2015-12-26 18:01 GMT+01:00 Nirmalya Sengupta <se...@gmail.com>:

> Hello Fabian <fh...@gmail.com>
>
> Merry Christmas to you and everyone else in this forum.
>
> Another neophyte's question, patience please.
>
> I have following code:
>
>     val env = StreamExecutionEnvironment.createLocalEnvironment(1)
>
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>
>     val readings =
>       readIncomingReadings(env,"./sampleIOT.csv")
>       .map(e => (e.timeStamp,e.ambientTemperature))
>       .assignAscendingTimestamps(pairOfReadings => pairOfReadings._1)
>       .timeWindowAll(Time.milliseconds(3))
>       .maxBy(1)
>
>
> In the datafile, timestamps are 2nd from the right field (first few
> records only):
>
> probe-42a9ddca,193,819.12,74.3712,1448028161,22.07
> probe-252a5bbd,197,710.32,80.6072,1448028161,14.64
> probe-987f2cb6,200,750.4,76.0533,1448028161,14.72
> probe-24444323,197,816.06,84.0816,1448028161,4.405
> probe-6dd6fdc4,201,717.64,78.4031,1448028161,29.43
> probe-20c609fb,204,804.37,84.5243,1448028161,22.87
> probe-c027fdc9,195,858.61,81.7682,1448028161,24.47
> probe-2c6cd3de,198,826.96,85.26,1448028162,18.99
> probe-960906ca,197,797.63,77.4359,1448028162,27.62
> probe-16226f9e,199,835.5,81.2027,1448028162,18.82
> probe-4de4e64b,200,851.04,80.5296,1448028162,27.43
> .......
>
>
> The output is:
>
> (1448028163,27.83)
> (1448028166,32.06)
> (1448028160,30.02)
>
> The contents are correct, but I am not sure about the *order in which
> they appear*. Because I am using
>
> val env = StreamExecutionEnvironment.createLocalEnvironment(1)  // only
> one thread anyway
>
>
> and the timestamps are guaranteed to be in the *ascending order* (I have
> sorted the CSV before using it), my expectation is that the Flink should
> print the output as:
>
> (1448028160,30.02)
>
> (1448028163,27.83)
>
> (1448028166,32.06)
>
> How do I explain the randomness?
>
> -- Nirmalya
>
> --
> Software Technologist
> http://www.linkedin.com/in/nirmalyasengupta
> "If you have built castles in the air, your work need not be lost. That is
> where they should be.
> Now put the foundation under them."
>