You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by wangsan <wa...@163.com> on 2017/11/28 13:31:15 UTC

Question about Timestamp in Flink SQL

Hi all,

While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.

Here’s my simplified code:
val senv = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

val sTableEnv = TableEnvironment.getTableEnvironment(senv)
println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")

val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
sTableEnv.registerTable("foo", table)
val result = sTableEnv.sql("select * from foo")
result.printSchema()
result.toAppendStream[Row].print()

senv.execute("foo")
And here’s the result:



Best,
wangsan

Re: Question about Timestamp in Flink SQL

Posted by Timo Walther <tw...@apache.org>.
Hi Wangsan,

I opened an issue to document the behavior properly in the future 
(https://issues.apache.org/jira/browse/FLINK-8169). Basically, both your 
event-time and processing-time timestamps should be GMT. We plan to 
support offsets for windows in the future 
(https://issues.apache.org/jira/browse/FLINK-8168). Internally, the long 
values remain constant in GMT. Only the toString() output is timezone 
dependent. For now, I would suggest to implement either some 
user-defined scalar functions to implement your desired behavior or just 
subtract the offset (ts - INTERVAL '8' HOURS should work).

The timezone support must definitely improved in future versions of 
Flink SQL.

Regards,
Timo



Am 11/29/17 um 10:50 AM schrieb wangsan:
> Hi Timo,
>
> What I am doing is extracting a timestamp field (may be string format 
> as “2017-11-28 11:00:00” or a long value base on my current timezone) 
> as/Event time/ attribute. So In /timestampAndWatermarkAssigner , /for 
> string//format I should parse the data time string using GMT, and for 
> long value I should add the offset as opposite to what 
> /internalToTimestamp /did. But the Processing time attribute can not 
> keep consistent. Am I understanding that correctly?
>
> Best,
> wangsan
>
>
>
>> On 29 Nov 2017, at 4:43 PM, Timo Walther <twalthr@apache.org 
>> <ma...@apache.org>> wrote:
>>
>> Hi Wangsan,
>>
>> currently the timestamps in Flink SQL do not depend on a timezone. 
>> All calculations happen on the UTC timestamp. This also guarantees 
>> that an input with Timestamp.valueOf("XXX") remains consistent when 
>> parsing and outputing it with toString().
>>
>> Regards,
>> Timo
>>
>>
>> Am 11/29/17 um 3:43 AM schrieb wangsan:
>>> Hi Xincan,
>>>
>>> Thanks for your reply.
>>>
>>> The system default timezone is just as what I expected 
>>> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
>>>
>>> I looked into the generated code, and I found the following code 
>>> snippet:
>>>
>>> ```
>>> result$20 = 
>>> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
>>> ```
>>>
>>> And what `internalToTimestamp` function did is:
>>>
>>> ```
>>> public static Timestamp internalToTimestamp(long v) {
>>>     return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
>>> }
>>> ```
>>>
>>> So, if I give it an event time with unix timestamp 0, then I got the 
>>> Timestamp(-28800000). I am confused why `internalToTimestamp` need 
>>> to subtract the offset?
>>>
>>> Best,
>>> wangsan
>>>
>>>
>>>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingcanc@gmail.com 
>>>> <ma...@gmail.com>> wrote:
>>>>
>>>> Hi wangsan,
>>>>
>>>> in Flink, the ProcessingTime is just implemented by invoking 
>>>> System.currentTimeMillis() and the long value will be automatically 
>>>> wrapped to a Timestamp with the following statement:
>>>>
>>>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>>>
>>>> You can check your TimeZone.getDefault() to see if it returns the 
>>>> right TimeZone. Generally, the returned value should rely on the 
>>>> default TimeZone of your operating system.
>>>>
>>>> Hope that helps.
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamgsam@163.com 
>>>> <ma...@163.com>> wrote:
>>>>
>>>>     Hi all,
>>>>
>>>>     While using Timestamp in Flint SQL, how can I set timezone
>>>>     info? Since my current timezone is *GMT+8*, and I found the
>>>>     selected processing time is always *8 hours* late than current
>>>>     time. So as extracted event time.
>>>>
>>>>     Here’s my simplified code:
>>>>
>>>>     val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>>>     senv.setParallelism(1)
>>>>     senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>>>
>>>>     val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>>>     println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")
>>>>
>>>>     val stream:DataStream[(String, String, String)]= senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
>>>>     val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
>>>>     sTableEnv.registerTable("foo", table)
>>>>     val result = sTableEnv.sql("select * from foo")
>>>>     result.printSchema()
>>>>     result.toAppendStream[Row].print()
>>>>
>>>>     senv.execute("foo")
>>>>
>>>>     And here’s the result:
>>>>
>>>>     <PastedGraphic-1.png>
>>>>
>>>>     Best,
>>>>     wangsan
>>>>
>>>>
>>>
>>
>


Re: Question about Timestamp in Flink SQL

Posted by wangsan <wa...@163.com>.
Hi Timo,

What I am doing is extracting a timestamp field (may be string format as “2017-11-28 11:00:00” or a long value base on my current timezone) as Event time attribute. So In timestampAndWatermarkAssigner , for string format I should parse the data time string using GMT, and for long value I should add the offset as opposite to what internalToTimestamp did. But the Processing time attribute can not keep consistent. Am I understanding that correctly?

Best,
wangsan



> On 29 Nov 2017, at 4:43 PM, Timo Walther <tw...@apache.org> wrote:
> 
> Hi Wangsan,
> 
> currently the timestamps in Flink SQL do not depend on a timezone. All calculations happen on the UTC timestamp. This also guarantees that an input with Timestamp.valueOf("XXX") remains consistent when parsing and outputing it with toString().
> 
> Regards,
> Timo
> 
> 
> Am 11/29/17 um 3:43 AM schrieb wangsan:
>> Hi Xincan,
>> 
>> Thanks for your reply. 
>> 
>> The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
>> I looked into the generated code, and I found the following code snippet:
>> 
>> ```
>> result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
>> ```
>> 
>> And what `internalToTimestamp` function did is:
>> 
>> ```
>> public static Timestamp internalToTimestamp(long v) {
>>     return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
>> }
>> ```
>> 
>> So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract the offset?
>> 
>> Best,
>> wangsan
>> 
>> 
>>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingcanc@gmail.com <ma...@gmail.com>> wrote:
>>> 
>>> Hi wangsan,
>>> 
>>> in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis() and the long value will be automatically wrapped to a Timestamp with the following statement:
>>> 
>>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>> 
>>> You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally, the returned value should rely on the default TimeZone of your operating system.
>>> 
>>> Hope that helps.
>>> 
>>> Best,
>>> Xingcan
>>> 
>>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamgsam@163.com <ma...@163.com>> wrote:
>>> Hi all,
>>> 
>>> While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.
>>> 
>>> Here’s my simplified code:
>>> val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>> senv.setParallelism(1)
>>> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>> 
>>> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")
>>> 
>>> val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
>>> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
>>> sTableEnv.registerTable("foo", table)
>>> val result = sTableEnv.sql("select * from foo")
>>> result.printSchema()
>>> result.toAppendStream[Row].print()
>>> 
>>> senv.execute("foo")
>>> And here’s the result:
>>> 
>>> <PastedGraphic-1.png>
>>> 
>>> Best,
>>> wangsan
>>> 
>> 
> 


Re: Question about Timestamp in Flink SQL

Posted by Timo Walther <tw...@apache.org>.
Hi Wangsan,

currently the timestamps in Flink SQL do not depend on a timezone. All 
calculations happen on the UTC timestamp. This also guarantees that an 
input with Timestamp.valueOf("XXX") remains consistent when parsing and 
outputing it with toString().

Regards,
Timo


Am 11/29/17 um 3:43 AM schrieb wangsan:
> Hi Xincan,
>
> Thanks for your reply.
>
> The system default timezone is just as what I expected 
> (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
>
> I looked into the generated code, and I found the following code snippet:
>
> ```
> result$20 = 
> org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
> ```
>
> And what `internalToTimestamp` function did is:
>
> ```
> public static Timestamp internalToTimestamp(long v) {
>     return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
> }
> ```
>
> So, if I give it an event time with unix timestamp 0, then I got the 
> Timestamp(-28800000). I am confused why `internalToTimestamp` need to 
> subtract the offset?
>
> Best,
> wangsan
>
>
>> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xingcanc@gmail.com 
>> <ma...@gmail.com>> wrote:
>>
>> Hi wangsan,
>>
>> in Flink, the ProcessingTime is just implemented by invoking 
>> System.currentTimeMillis() and the long value will be automatically 
>> wrapped to a Timestamp with the following statement:
>>
>> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
>>
>> You can check your TimeZone.getDefault() to see if it returns the 
>> right TimeZone. Generally, the returned value should rely on the 
>> default TimeZone of your operating system.
>>
>> Hope that helps.
>>
>> Best,
>> Xingcan
>>
>> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamgsam@163.com 
>> <ma...@163.com>> wrote:
>>
>>     Hi all,
>>
>>     While using Timestamp in Flint SQL, how can I set timezone info?
>>     Since my current timezone is *GMT+8*, and I found the selected
>>     processing time is always *8 hours* late than current time. So as
>>     extracted event time.
>>
>>     Here’s my simplified code:
>>
>>     val senv = StreamExecutionEnvironment.getExecutionEnvironment
>>     senv.setParallelism(1)
>>     senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>>
>>     val sTableEnv = TableEnvironment.getTableEnvironment(senv)
>>     println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")
>>
>>     val stream:DataStream[(String, String, String)]= senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
>>     val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
>>     sTableEnv.registerTable("foo", table)
>>     val result = sTableEnv.sql("select * from foo")
>>     result.printSchema()
>>     result.toAppendStream[Row].print()
>>
>>     senv.execute("foo")
>>
>>     And here’s the result:
>>
>>     <PastedGraphic-1.png>
>>
>>     Best,
>>     wangsan
>>
>>
>


Re: Question about Timestamp in Flink SQL

Posted by wangsan <wa...@163.com>.
Hi Xincan,

Thanks for your reply. 

The system default timezone is just as what I expected (sun.util.calendar.ZoneInfo[id="Asia/Shanghai",offset=28800000,dstSavings=0,useDaylight=false,transitions=19,lastRule=null]). 
I looked into the generated code, and I found the following code snippet:

```
result$20 = org.apache.calcite.runtime.SqlFunctions.internalToTimestamp(result$19);
```

And what `internalToTimestamp` function did is:

```
public static Timestamp internalToTimestamp(long v) {
    return new Timestamp(v - (long)LOCAL_TZ.getOffset(v));
}
```

So, if I give it an event time with unix timestamp 0, then I got the Timestamp(-28800000). I am confused why `internalToTimestamp` need to subtract the offset?

Best,
wangsan


> On 28 Nov 2017, at 11:32 PM, Xingcan Cui <xi...@gmail.com> wrote:
> 
> Hi wangsan,
> 
> in Flink, the ProcessingTime is just implemented by invoking System.currentTimeMillis() and the long value will be automatically wrapped to a Timestamp with the following statement:
> 
> `new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`
> 
> You can check your TimeZone.getDefault() to see if it returns the right TimeZone. Generally, the returned value should rely on the default TimeZone of your operating system.
> 
> Hope that helps.
> 
> Best,
> Xingcan
> 
> On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wamgsam@163.com <ma...@163.com>> wrote:
> Hi all,
> 
> While using Timestamp in Flint SQL, how can I set timezone info? Since my current timezone is GMT+8, and I found the selected processing time is always 8 hours late than current time. So as extracted event time.
> 
> Here’s my simplified code:
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
> 
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")
> 
> val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
> 
> senv.execute("foo")
> And here’s the result:
> 
> <PastedGraphic-1.png>
> 
> Best,
> wangsan
> 


Re: Question about Timestamp in Flink SQL

Posted by Xingcan Cui <xi...@gmail.com>.
Hi wangsan,

in Flink, the ProcessingTime is just implemented by invoking
System.currentTimeMillis() and the long value will be automatically wrapped
to a Timestamp with the following statement:

`new java.sql.Timestamp(time - TimeZone.getDefault().getOffset(time));`

You can check your TimeZone.getDefault() to see if it returns the right
TimeZone. Generally, the returned value should rely on the default TimeZone
of your operating system.

Hope that helps.

Best,
Xingcan

On Tue, Nov 28, 2017 at 9:31 PM, wangsan <wa...@163.com> wrote:

> Hi all,
>
> While using Timestamp in Flint SQL, how can I set timezone info? Since my
> current timezone is *GMT+8*, and I found the selected processing time is
> always *8 hours* late than current time. So as extracted event time.
>
> Here’s my simplified code:
>
> val senv = StreamExecutionEnvironment.getExecutionEnvironment
> senv.setParallelism(1)
> senv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
>
> val sTableEnv = TableEnvironment.getTableEnvironment(senv)
> println(s"current time: ${new SimpleDateFormat("yyyy.MM.dd HH:mm:ss.SSS", Locale.CHINA).format(new Date())}")
>
> val stream: DataStream[(String, String, String)] = senv.socketTextStream("localhost", 9999).map(line => (line, line, line))
> val table = sTableEnv.fromDataStream(stream, 'col1, 'col2, 'col3, 't.proctime)
> sTableEnv.registerTable("foo", table)
> val result = sTableEnv.sql("select * from foo")
> result.printSchema()
> result.toAppendStream[Row].print()
>
> senv.execute("foo")
>
> And here’s the result:
>
>
> Best,
> wangsan
>