You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Faye Pressly <Fa...@outlook.com> on 2020/08/11 20:55:52 UTC

Using Event Timestamp sink get's back with machine timezone

Hello,

I am having an issue with Event time stamp and timezone with Flink 1.8 (1.8 because I need it to work on AWS Kinesis)

I have a very simple pipeline that read events from a stream, transform to a Table does a small window (Tumblin 1 min) aggregation and groupby, transforms back to a stream and sink the result.

I have created a small Integration test where I pass a custom Source and Custom Sink Collector so that I can verify the results.

I go inspired by this project to do the testing, https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java

This is a snipped from my Integration Test. 0L is the event timestamp that will be used by the flink job. So here I'm firing all the events at 1970-01-01 00:00:00

ParallelSourceFunction<List<ObjectNode>> source =
  new ParallelCollectionSource(
    Arrays.asList(
      new Tuple2<>(1L, new Event("1", "111", "impression", "A", 0L)),
      new Tuple2<>(1L, new Event("1", "111", "click", "A", 0L)),
      new Tuple2<>(1L, new Event("2", "111", "impression", "A", 0L)),
      new Tuple2<>(1L, new Event("2", "111", "click", "A", 0L)),
      new Tuple2<>(1L, new Event("3", "111", "impression", "A", 0L)),
      new Tuple2<>(1L, new Event("4", "111", "impression", "A", 0L)),
      new Tuple2<>(1L, new Event("4", "111", "click", "A", 0L))));


CollectingSink sink = new CollectingSink();

new Pipeline().execute(source, sink);

(https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/utils/ParallelCollectionSource.java)

My Flink pipeline uses a Tumbling Window of 1 minute and I add to the objects (which has a filed java.sql.Timestamp) the window.rowTime which is written to the sink.

When I check the results in sink.result all the timestamp.getTime() are using my computer timezone (gtm +1).
For example the first window which is 1970-01-01 00:00:59.999 has as timestamp.getTime() of `-3540001`

I expected it to be 59999 which would really corespong to 1970-01-01 00:00:59.999.

Is this a bug or do I have to setup something in order for Flink to consider all the timestamp UTC ?

Thank you!

Re: Using Event Timestamp sink get's back with machine timezone

Posted by Timo Walther <tw...@apache.org>.
Hi Faye,

the problem lies in the wrong design of JDK's java.sql.Timestamp. You 
can also find a nice summary in the answer here [1]. java.sql.Timestamp 
is timezone dependent. Internally, we subtract/normalize the timezone 
and work with the UNIX timestamp. Beginning from Flink 1.9 we are using 
the new Java time classes such as LocalDateTime. Until then it would be 
great to set the JVM's timezone to UTC or make remove the timezone both 
in sources and sinks.

Regards,
Timo

[1] https://stackoverflow.com/a/43883203/806430


On 11.08.20 22:55, Faye Pressly wrote:
> Hello,
> 
> I am having an issue with Event time stamp and timezone with Flink 1.8 
> (1.8 because I need it to work on AWS Kinesis)
> 
> I have a very simple pipeline that read events from a stream, transform 
> to a Table does a small window (Tumblin 1 min) aggregation and groupby, 
> transforms back to a stream and sink the result.
> 
> I have created a small Integration test where I pass a custom Source and 
> Custom Sink Collector so that I can verify the results.
> 
> I go inspired by this project to do the testing, 
> https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/StreamingJobIntegrationTest.java
> 
> This is a snipped from my Integration Test. 0L is the event timestamp 
> that will be used by the flink job. So here I'm firing all the events at 
> 1970-01-01 00:00:00
> 
> ParallelSourceFunction<List<ObjectNode>> source =
>    new ParallelCollectionSource(
>      Arrays.asList(
>        new Tuple2<>(1L, new Event("1", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("1", "111", "click", "A", 0L)),
> new Tuple2<>(1L, new Event("2", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("2", "111", "click", "A", 0L)),
> new Tuple2<>(1L, new Event("3", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("4", "111", "impression", "A", 0L)),
> new Tuple2<>(1L, new Event("4", "111", "click", "A", 0L))));
> 
> CollectingSink sink =new CollectingSink();
> 
> new Pipeline().execute(source, sink);
> 
> (https://github.com/knaufk/flink-testing-pyramid/blob/master/src/test/java/com/github/knaufk/testing/java/utils/ParallelCollectionSource.java)
> 
> My Flink pipeline uses a Tumbling Window of 1 minute and I add to the 
> objects (which has a filed java.sql.Timestamp) the window.rowTime which 
> is written to the sink.
> 
> When I check the results in sink.result all the timestamp.getTime() are 
> using my computer timezone (gtm +1).
> For example the first window which is 1970-01-01 00:00:59.999 has as 
> timestamp.getTime() of `-3540001`
> 
> I expected it to beĀ 59999 which would really corespong to 1970-01-01 
> 00:00:59.999.
> 
> Is this a bug or do I have to setup something in order for Flink to 
> consider all the timestamp UTC ?
> 
> Thank you!