You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by "Youngwoo Kim (김영우)" <yw...@apache.org> on 2020/08/15 04:11:51 UTC

Format for timestamp type in Flink SQL

Hi,

I'm trying to create a table using Flink SQL to query from a Kafka topic.
Messages from Kafka look like following:

(snip)
"svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
"log_ymdt":"2020-08-15T02:01:33.968Z"
"snapshot_dt":"2020-08-13"
"network_type":"LTE"

I'd like to make the column as 'TIMESTAMP' type but it does not work. Logs
from TM:

Caused by: java.time.format.DateTimeParseException: Text
'2020-08-15T00:01:29.003Z' could not be parsed at index 10
    at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter
.java:1949) ~[?:1.8.0_265]
    at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
~[?:1.8.0_265]
......

And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP
WITH TIME ZONE, ), the DDL does not work as well.

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line
5, column 29.
Was expecting:
    "LOCAL" ...

How can I make my date-time format as the timestamp type? I'm running Flink
1.11.1 and executing sql using FlinkSQL CLI.

Thanks,
Youngwoo

Re: Format for timestamp type in Flink SQL

Posted by "Youngwoo Kim (김영우)" <yw...@apache.org>.
Thanks Godfrey for the detailed explanation.

Youngwoo

2020년 8월 19일 (수) 오후 12:47, godfrey he <go...@gmail.com>님이 작성:

> Hi Youngwoo,
>
> > 1. TIMESTAMP WITH LOCAL TIME ZONE
> Currently, SQL client uses legacy types for the collect sink, that means `TIMESTAMP
> WITH LOCAL TIME ZONE` is not supported.
> you can refer to [1] to find the supported types, and there is a pr [2] to
> fix this.
>
> >2. TIMESTAMP(3) WITH LOCAL TIME ZONE
> I do not reproduce the exception
>
> > 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
> sql parser does not support them yet.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html
> [2] https://github.com/apache/flink/pull/12872
>
> Best,
> Godfrey
>
> Youngwoo Kim (김영우) <yw...@apache.org> 于2020年8月16日周日 上午1:23写道:
>
>> Hi Benchao,
>>
>> I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL
>> but it does not work with slightly different errors:
>>
>> 1. TIMESTAMP WITH LOCAL TIME ZONE
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Exception in thread "main"
>> org.apache.flink.table.client.SqlClientException: Unexpected exception.
>> This is a bug. Please consider filing an issue.
>>
>>
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
>>
>>
>> Caused by: org.apache.flink.table.api.TableException: Unsupported
>> conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion
>> class: java.time.Instant) to type information. Only data types that
>> originated from type information fully support a reverse conversion.
>>
>>
>> at
>> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
>>
>>
>> at
>> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>>
>>
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>>
>>
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>>
>>
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>>
>>
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
>>
>>
>> at
>> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>>
>>
>> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
>>
>>
>> at
>> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
>>
>>
>> at
>> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
>>
>>
>> at
>> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
>>
>>
>> at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
>>
>>
>> at
>> org.apache.flink.table.client.gateway.local.result.CollectStreamResult.<init>(CollectStreamResult.java:71)
>>
>>
>> at
>> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:101)
>>
>>
>> at
>> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:129)
>>
>>
>> at
>> org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
>>
>>
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
>>
>>
>> at
>> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
>>
>>
>> at
>> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
>>
>>
>> at
>> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
>>
>>
>> at java.util.Optional.ifPresent(Optional.java:159)
>>
>>
>> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
>>
>>
>> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
>>
>>
>> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
>>
>>
>> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>>
>>
>> 2. TIMESTAMP(3) WITH LOCAL TIME ZONE
>>
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3)
>> WITH LOCAL TIME ZONE
>>
>>
>> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
>>
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
>> line 3, column 32.
>> Was expecting:
>>     "LOCAL" ...
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> It looks like the timestamp 'yyyy-MM-ddTHH:mm:ss.SSSZ' is not supported
>> in both 'SQL' and 'ISO-8601' format standard.
>>
>> Just curious, what is the default format for timestamp type with a time
>> zone?
>>
>>
>> Thanks,
>>
>> Youngwoo
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, Aug 15, 2020 at 8:16 PM Benchao Li <li...@apache.org> wrote:
>>
>>> Hi Youngwoo,
>>>
>>> What version of Flink and Json Format are you using?
>>> From 1.11, we introduced  `json.timestamp-format.standard` to declare
>>> the timestamp format.
>>> You can try `timestamp with local zone` data type with `ISO-8601`
>>> timestamp format.
>>>
>>> Youngwoo Kim (김영우) <yw...@apache.org> 于2020年8月15日周六 下午12:12写道:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to create a table using Flink SQL to query from a Kafka
>>>> topic. Messages from Kafka look like following:
>>>>
>>>> (snip)
>>>> "svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
>>>> "log_ymdt":"2020-08-15T02:01:33.968Z"
>>>> "snapshot_dt":"2020-08-13"
>>>> "network_type":"LTE"
>>>>
>>>> I'd like to make the column as 'TIMESTAMP' type but it does not work.
>>>> Logs from TM:
>>>>
>>>> Caused by: java.time.format.DateTimeParseException: Text
>>>> '2020-08-15T00:01:29.003Z' could not be parsed at index 10
>>>>     at java.time.format.DateTimeFormatter.parseResolved0(
>>>> DateTimeFormatter.java:1949) ~[?:1.8.0_265]
>>>>     at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:
>>>> 1777) ~[?:1.8.0_265]
>>>> ......
>>>>
>>>> And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP
>>>> WITH TIME ZONE, ), the DDL does not work as well.
>>>>
>>>> [ERROR] Could not execute SQL statement. Reason:
>>>> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
>>>> line 5, column 29.
>>>> Was expecting:
>>>>     "LOCAL" ...
>>>>
>>>> How can I make my date-time format as the timestamp type? I'm running
>>>> Flink 1.11.1 and executing sql using FlinkSQL CLI.
>>>>
>>>> Thanks,
>>>> Youngwoo
>>>>
>>>> Best,
>>>> Benchao Li
>>>>
>>>>
>>>>
>>
>>
>
>

Re: Format for timestamp type in Flink SQL

Posted by godfrey he <go...@gmail.com>.
Hi Youngwoo,

> 1. TIMESTAMP WITH LOCAL TIME ZONE
Currently, SQL client uses legacy types for the collect sink, that
means `TIMESTAMP
WITH LOCAL TIME ZONE` is not supported.
you can refer to [1] to find the supported types, and there is a pr [2] to
fix this.

>2. TIMESTAMP(3) WITH LOCAL TIME ZONE
I do not reproduce the exception

> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
sql parser does not support them yet.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sqlClient.html
[2] https://github.com/apache/flink/pull/12872

Best,
Godfrey

Youngwoo Kim (김영우) <yw...@apache.org> 于2020年8月16日周日 上午1:23写道:

> Hi Benchao,
>
> I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL
> but it does not work with slightly different errors:
>
> 1. TIMESTAMP WITH LOCAL TIME ZONE
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
>
> Caused by: org.apache.flink.table.api.TableException: Unsupported
> conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion
> class: java.time.Instant) to type information. Only data types that
> originated from type information fully support a reverse conversion.
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)
>
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
>
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
>
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
>
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
>
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)
>
> at
> java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
>
> at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)
>
> at
> org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)
>
> at
> org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)
>
> at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)
>
> at
> org.apache.flink.table.client.gateway.local.result.CollectStreamResult.<init>(CollectStreamResult.java:71)
>
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:101)
>
> at
> org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:129)
>
> at
> org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)
>
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)
>
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)
>
> at
> org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)
>
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)
>
> at java.util.Optional.ifPresent(Optional.java:159)
>
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)
>
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)
>
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)
>
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
>
>
> 2. TIMESTAMP(3) WITH LOCAL TIME ZONE
>
>
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3)
> WITH LOCAL TIME ZONE
>
>
> 3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE
>
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
> line 3, column 32.
> Was expecting:
>     "LOCAL" ...
>
>
>
> It looks like the timestamp 'yyyy-MM-ddTHH:mm:ss.SSSZ' is not supported in
> both 'SQL' and 'ISO-8601' format standard.
>
> Just curious, what is the default format for timestamp type with a time
> zone?
>
>
> Thanks,
>
> Youngwoo
>
>
> On Sat, Aug 15, 2020 at 8:16 PM Benchao Li <li...@apache.org> wrote:
>
>> Hi Youngwoo,
>>
>> What version of Flink and Json Format are you using?
>> From 1.11, we introduced  `json.timestamp-format.standard` to declare the
>> timestamp format.
>> You can try `timestamp with local zone` data type with `ISO-8601`
>> timestamp format.
>>
>> Youngwoo Kim (김영우) <yw...@apache.org> 于2020年8月15日周六 下午12:12写道:
>>
>>> Hi,
>>>
>>> I'm trying to create a table using Flink SQL to query from a Kafka
>>> topic. Messages from Kafka look like following:
>>>
>>> (snip)
>>> "svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
>>> "log_ymdt":"2020-08-15T02:01:33.968Z"
>>> "snapshot_dt":"2020-08-13"
>>> "network_type":"LTE"
>>>
>>> I'd like to make the column as 'TIMESTAMP' type but it does not work.
>>> Logs from TM:
>>>
>>> Caused by: java.time.format.DateTimeParseException: Text
>>> '2020-08-15T00:01:29.003Z' could not be parsed at index 10
>>>     at java.time.format.DateTimeFormatter.parseResolved0(
>>> DateTimeFormatter.java:1949) ~[?:1.8.0_265]
>>>     at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:
>>> 1777) ~[?:1.8.0_265]
>>> ......
>>>
>>> And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP
>>> WITH TIME ZONE, ), the DDL does not work as well.
>>>
>>> [ERROR] Could not execute SQL statement. Reason:
>>> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
>>> line 5, column 29.
>>> Was expecting:
>>>     "LOCAL" ...
>>>
>>> How can I make my date-time format as the timestamp type? I'm running
>>> Flink 1.11.1 and executing sql using FlinkSQL CLI.
>>>
>>> Thanks,
>>> Youngwoo
>>>
>>> Best,
>>> Benchao Li
>>>
>>

Re: Format for timestamp type in Flink SQL

Posted by "Youngwoo Kim (김영우)" <yw...@apache.org>.
Hi Benchao,

I include ['json.timestamp-format.standard' = 'ISO-8601'] to table's DDL
but it does not work with slightly different errors:

1. TIMESTAMP WITH LOCAL TIME ZONE

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected exception.
This is a bug. Please consider filing an issue.

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)

Caused by: org.apache.flink.table.api.TableException: Unsupported
conversion from data type 'TIMESTAMP(6) WITH LOCAL TIME ZONE' (conversion
class: java.time.Instant) to type information. Only data types that
originated from type information fully support a reverse conversion.

at
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:259)

at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)

at
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)

at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)

at
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)

at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:546)

at
java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)

at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:505)

at
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.convertToRowTypeInfo(LegacyTypeInfoDataTypeConverter.java:329)

at
org.apache.flink.table.types.utils.LegacyTypeInfoDataTypeConverter.toLegacyTypeInfo(LegacyTypeInfoDataTypeConverter.java:237)

at
org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo(TypeConversions.java:49)

at org.apache.flink.table.api.TableSchema.toRowType(TableSchema.java:271)

at
org.apache.flink.table.client.gateway.local.result.CollectStreamResult.<init>(CollectStreamResult.java:71)

at
org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:101)

at
org.apache.flink.table.client.gateway.local.result.MaterializedCollectStreamResult.<init>(MaterializedCollectStreamResult.java:129)

at
org.apache.flink.table.client.gateway.local.ResultStore.createResult(ResultStore.java:83)

at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQueryInternal(LocalExecutor.java:608)

at
org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:465)

at
org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:555)

at
org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:311)

at java.util.Optional.ifPresent(Optional.java:159)

at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212)

at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142)

at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114)

at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)


2. TIMESTAMP(3) WITH LOCAL TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Unsupported type: TIMESTAMP(3)
WITH LOCAL TIME ZONE


3. TIMESTAMP WITH TIME ZONE and TIMESTAMP(3) WITH TIME ZONE


[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at line
3, column 32.
Was expecting:
    "LOCAL" ...



It looks like the timestamp 'yyyy-MM-ddTHH:mm:ss.SSSZ' is not supported in
both 'SQL' and 'ISO-8601' format standard.

Just curious, what is the default format for timestamp type with a time
zone?


Thanks,

Youngwoo


On Sat, Aug 15, 2020 at 8:16 PM Benchao Li <li...@apache.org> wrote:

> Hi Youngwoo,
>
> What version of Flink and Json Format are you using?
> From 1.11, we introduced  `json.timestamp-format.standard` to declare the
> timestamp format.
> You can try `timestamp with local zone` data type with `ISO-8601`
> timestamp format.
>
> Youngwoo Kim (김영우) <yw...@apache.org> 于2020年8月15日周六 下午12:12写道:
>
>> Hi,
>>
>> I'm trying to create a table using Flink SQL to query from a Kafka topic.
>> Messages from Kafka look like following:
>>
>> (snip)
>> "svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
>> "log_ymdt":"2020-08-15T02:01:33.968Z"
>> "snapshot_dt":"2020-08-13"
>> "network_type":"LTE"
>>
>> I'd like to make the column as 'TIMESTAMP' type but it does not work.
>> Logs from TM:
>>
>> Caused by: java.time.format.DateTimeParseException: Text
>> '2020-08-15T00:01:29.003Z' could not be parsed at index 10
>>     at java.time.format.DateTimeFormatter.parseResolved0(
>> DateTimeFormatter.java:1949) ~[?:1.8.0_265]
>>     at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:
>> 1777) ~[?:1.8.0_265]
>> ......
>>
>> And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP
>> WITH TIME ZONE, ), the DDL does not work as well.
>>
>> [ERROR] Could not execute SQL statement. Reason:
>> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
>> line 5, column 29.
>> Was expecting:
>>     "LOCAL" ...
>>
>> How can I make my date-time format as the timestamp type? I'm running
>> Flink 1.11.1 and executing sql using FlinkSQL CLI.
>>
>> Thanks,
>> Youngwoo
>>
>> Best,
>> Benchao Li
>>
>

Re: Format for timestamp type in Flink SQL

Posted by Benchao Li <li...@apache.org>.
Hi Youngwoo,

What version of Flink and Json Format are you using?
From 1.11, we introduced  `json.timestamp-format.standard` to declare the
timestamp format.
You can try `timestamp with local zone` data type with `ISO-8601` timestamp
format.

Youngwoo Kim (김영우) <yw...@apache.org> 于2020年8月15日周六 下午12:12写道:

> Hi,
>
> I'm trying to create a table using Flink SQL to query from a Kafka topic.
> Messages from Kafka look like following:
>
> (snip)
> "svc_mgmt_num":"7749b6a7e17127d43431e21b94f4eb0c116..."
> "log_ymdt":"2020-08-15T02:01:33.968Z"
> "snapshot_dt":"2020-08-13"
> "network_type":"LTE"
>
> I'd like to make the column as 'TIMESTAMP' type but it does not work. Logs
> from TM:
>
> Caused by: java.time.format.DateTimeParseException: Text
> '2020-08-15T00:01:29.003Z' could not be parsed at index 10
>     at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter
> .java:1949) ~[?:1.8.0_265]
>     at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:
> 1777) ~[?:1.8.0_265]
> ......
>
> And also, If I declare the column with TIMEZONE (... log_ymdt TIMESTAMP
> WITH TIME ZONE, ), the DDL does not work as well.
>
> [ERROR] Could not execute SQL statement. Reason:
> org.apache.flink.sql.parser.impl.ParseException: Encountered "TIME" at
> line 5, column 29.
> Was expecting:
>     "LOCAL" ...
>
> How can I make my date-time format as the timestamp type? I'm running
> Flink 1.11.1 and executing sql using FlinkSQL CLI.
>
> Thanks,
> Youngwoo
>


-- 

Best,
Benchao Li