You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Dongwon Kim <ea...@gmail.com> on 2020/07/04 13:21:57 UTC

[Table API] how to configure a nested timestamp field

Hi,
I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
message looks like below.

>     {
>        "type":"Update",
>        "location":{
>           "id":"123e4567-e89b-12d3-a456-426652340000",
>           "lastUpdateTime":1593866161436
>        }
>     }


I wrote the following program just to see whether json messages are
correctly parsed by Table API:

>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
> envSettings);
>     tEnv
>       .connect(
>         new Kafka()
>           .version("universal")
>           .topic(consumerTopic)
>           .startFromLatest()
>           .properties(consumerProperties)
>       )
>       .withFormat(new Json())
>       .withSchema(new Schema().schema(
>         TableSchema.builder()
>           .field("type", STRING())
>           .field("location",
>             ROW(
>               FIELD("id", STRING()),
>               // (1)
>               FIELD("lastUpdateTime", BIGINT())
>               // (2)
>               FIELD("lastUpdateTime", TIMESTAMP())
>               // (3)
>               FIELD("lastUpdateTime",
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>             ))
>           .build()
>       ))
>       .createTemporaryTable("message");
>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>       .print();


Note that I tried BIGINT(), TIMESTAMP(), and
TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
(1) it works fine but later I can't use time-based operations like
windowing.

(2) it causes the following exception

> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field
> 'location' does not match with the physical type ROW<`id` STRING,
> `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource
> return type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
> at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at
> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id`
> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not
> match with the physical type ROW<`id` STRING, `lastUpdateTime`
> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
> at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
> ... 38 more


(3) it causes the following exception

> Caused by: java.time.format.DateTimeParseException: Text '1593868714814'
> could not be parsed at index 0
> at
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more


Can I read such json messages with time information in Flink 1.10.1?

Thanks

Dongwon

Fwd: Re: [Table API] how to configure a nested timestamp field

Posted by Danny Chan <yu...@gmail.com>.
Best,
Danny Chan
---------- 转发信息 ----------
发件人: Danny Chan <yu...@gmail.com>
日期: 2020年7月20日 +0800 PM4:51
收件人: Dongwon Kim <ea...@gmail.com>
主题: Re: [Table API] how to configure a nested timestamp field

> Or is it possible you pre-define a catalog there and register through the SQL CLI yaml ?
>
> Best,
> Danny Chan
> 在 2020年7月20日 +0800 PM3:23,Dongwon Kim <ea...@gmail.com>,写道:
> > Hi Leonard,
> >
> > > Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now,
> > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.
> > Okay, thanks a lot for your input.
> >
> > I just tried out Flink SQL client and wanted to store pre-defined YAML files each declaring a source table from a Kafka topic.
> > As you advised, I have to manually enter DDL in the SQL client on FLINK 1.11.x
> >
> > Best,
> >
> > Dongwon
> >
> >
> > > On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <xb...@gmail.com> wrote:
> > > > Hi, Kim
> > > >
> > > > > Hi Leonard,
> > > > >
> > > > > Can I have a YAML definition corresponding to the DDL you suggested?
> > > >
> > > > Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now,
> > > >
> > > > there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.
> > > >
> > > > Best,
> > > > Leonard Xu
> > > > [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> > > >
> > > > > 在 2020年7月20日,14:30,Dongwon Kim <ea...@gmail.com> 写道:
> > > > >
> > > > >
> > > > > I tried below (Flink 1.11.0) but got some error:
> > > > > > tables:
> > > > > >   - name: test
> > > > > >     type: source-table
> > > > > >     update-mode: append
> > > > > >     connector:
> > > > > >       property-version: 1
> > > > > >       type: kafka
> > > > > >       version: universal
> > > > > >       topic: ...
> > > > > >       properties:
> > > > > >         bootstrap.servers: ...
> > > > > >         group.id: ...
> > > > > >     format:
> > > > > >       property-version: 1
> > > > > >       type: json
> > > > > >     schema:
> > > > > >       - name: type
> > > > > >         data-type: STRING
> > > > > >       - name: location
> > > > > >         data-type: >
> > > > > >           ROW<
> > > > > >             id STRING,
> > > > > >             lastUpdateTime BIGINT
> > > > > >           >
> > > > > >       - name: timestampCol
> > > > > >         data-type: TIMESTAMP(3)
> > > > > >         rowtime:
> > > > > >           timestamps:
> > > > > >             type: from-field
> > > > > >             from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss'))
> > > > > >           watermarks:
> > > > > >             type: periodic-bounded
> > > > > >             delay: 5000
> > > > >
> > > > > SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message:
> > > > > > Caused by: java.lang.NullPointerException
> > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> > > > > > at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> > > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> > > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> > > > > > at SourceConversion$4.processElement(Unknown Source)
> > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> > > > > > at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> > > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> > > > > > at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> > > > > > at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> > > > > > at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> > > > > > at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> > > > > > at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> > > > > > at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> > > > > > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> > > > > > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> > > > > > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> > > > > > at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> > > > >
> > > > > > On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <ea...@gmail.com> wrote:
> > > > > > > Hi Leonard,
> > > > > > >
> > > > > > > Wow, that's great! It works like a charm.
> > > > > > > I've never considered this approach at all.
> > > > > > > Thanks a lot.
> > > > > > >
> > > > > > > Best,
> > > > > > > Dongwon
> > > > > > >
> > > > > > > > On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xb...@gmail.com> wrote:
> > > > > > > > > Hi, Kim
> > > > > > > > >
> > > > > > > > > The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
> > > > > > > > > The DDL syntax  as following:
> > > > > > > > >
> > > > > > > > > create table test (
> > > > > > > > >   `type` STRING,
> > > > > > > > >   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
> > > > > > > > >    timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
> > > > > > > > >    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
> > > > > > > > > )   with (
> > > > > > > > >   'connector' = '...',
> > > > > > > > >   'format' = 'json',
> > > > > > > > >   ...
> > > > > > > > > );
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Leonard Xu
> > > > > > > > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 在 2020年7月4日,21:21,Dongwon Kim <ea...@gmail.com> 写道:
> > > > > > > > > >
> > > > > > > > > > Hi,
> > > > > > > > > > I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
> > > > > > > > > > >     {
> > > > > > > > > > >        "type":"Update",
> > > > > > > > > > >        "location":{
> > > > > > > > > > >           "id":"123e4567-e89b-12d3-a456-426652340000",
> > > > > > > > > > >           "lastUpdateTime":1593866161436
> > > > > > > > > > >        }
> > > > > > > > > > >     }
> > > > > > > > > >
> > > > > > > > > > I wrote the following program just to see whether json messages are correctly parsed by Table API:
> > > > > > > > > > >     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> > > > > > > > > > >     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> > > > > > > > > > >     EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> > > > > > > > > > >     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
> > > > > > > > > > >     tEnv
> > > > > > > > > > >       .connect(
> > > > > > > > > > >         new Kafka()
> > > > > > > > > > >           .version("universal")
> > > > > > > > > > >           .topic(consumerTopic)
> > > > > > > > > > >           .startFromLatest()
> > > > > > > > > > >           .properties(consumerProperties)
> > > > > > > > > > >       )
> > > > > > > > > > >       .withFormat(new Json())
> > > > > > > > > > >       .withSchema(new Schema().schema(
> > > > > > > > > > >         TableSchema.builder()
> > > > > > > > > > >           .field("type", STRING())
> > > > > > > > > > >           .field("location",
> > > > > > > > > > >             ROW(
> > > > > > > > > > >               FIELD("id", STRING()),
> > > > > > > > > > >               // (1)
> > > > > > > > > > >               FIELD("lastUpdateTime", BIGINT())
> > > > > > > > > > >               // (2)
> > > > > > > > > > >               FIELD("lastUpdateTime", TIMESTAMP())
> > > > > > > > > > >               // (3)
> > > > > > > > > > >               FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
> > > > > > > > > > >             ))
> > > > > > > > > > >           .build()
> > > > > > > > > > >       ))
> > > > > > > > > > >       .createTemporaryTable("message");
> > > > > > > > > > >     tEnv.toAppendStream(tEnv.from("message"), Row.class)
> > > > > > > > > > >       .print();
> > > > > > > > > >
> > > > > > > > > > Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> > > > > > > > > > (1) it works fine but later I can't use time-based operations like windowing.
> > > > > > > > > >
> > > > > > > > > > (2) it causes the following exception
> > > > > > > > > > > Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
> > > > > > > > > > > at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> > > > > > > > > > > at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> > > > > > > > > > > at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> > > > > > > > > > > at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> > > > > > > > > > > at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> > > > > > > > > > > at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> > > > > > > > > > > at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> > > > > > > > > > > at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> > > > > > > > > > > at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> > > > > > > > > > > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> > > > > > > > > > > at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> > > > > > > > > > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > > > > > > > > at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > > > > > > > > > > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > > > > > > > > > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > > > > > > > > > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> > > > > > > > > > > at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> > > > > > > > > > > at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> > > > > > > > > > > at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> > > > > > > > > > > at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> > > > > > > > > > > at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> > > > > > > > > > > at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> > > > > > > > > > > at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> > > > > > > > > > > at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> > > > > > > > > > > at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
> > > > > > > > > > > Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> > > > > > > > > > > at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
> > > > > > > > > > > ... 38 more
> > > > > > > > > >
> > > > > > > > > > (3) it causes the following exception
> > > > > > > > > > > Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
> > > > > > > > > > > at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> > > > > > > > > > > at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> > > > > > > > > > > at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> > > > > > > > > > > ... 7 more
> > > > > > > > > >
> > > > > > > > > > Can I read such json messages with time information in Flink 1.10.1?
> > > > > > > > > >
> > > > > > > > > > Thanks
> > > > > > > > > >
> > > > > > > > > > Dongwon
> > > > > > > > >
> > > >

Re: [Table API] how to configure a nested timestamp field

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Leonard,

Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.

Okay, thanks a lot for your input.

I just tried out Flink SQL client and wanted to store pre-defined YAML
files each declaring a source table from a Kafka topic.
As you advised, I have to manually enter DDL in the SQL client on FLINK
1.11.x

Best,

Dongwon


On Mon, Jul 20, 2020 at 3:59 PM Leonard Xu <xb...@gmail.com> wrote:

> Hi, Kim
>
> Hi Leonard,
>
> Can I have a YAML definition corresponding to the DDL you suggested?
>
>
> Unfortunately the answer is no, the YAML you defined will parse by Table
> API and then execute, the root cause of your post error is Table API does
> not support computed column now,
>
> there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW,
> I think DDL is recommended way since FLINK 1.11.0.
>
> Best,
> Leonard Xu
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129:+Refactor+Descriptor+API+to+register+connectors+in+Table+API>
>
> 在 2020年7月20日,14:30,Dongwon Kim <ea...@gmail.com> 写道:
>
>
> I tried below (Flink 1.11.0) but got some error:
>
>> tables:
>>   - name: test
>>     type: source-table
>>     update-mode: append
>>     connector:
>>       property-version: 1
>>       type: kafka
>>       version: universal
>>       topic: ...
>>       properties:
>>         bootstrap.servers: ...
>>         group.id: ...
>>     format:
>>       property-version: 1
>>       type: json
>>     schema:
>>       - name: type
>>         data-type: STRING
>>       - name: location
>>         data-type: >
>>           ROW<
>>             id STRING,
>>             lastUpdateTime BIGINT
>>           >
>>       - name: timestampCol
>>         data-type: TIMESTAMP(3)
>>         rowtime:
>>           timestamps:
>>             type: from-field
>>             from:
>> TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd
>> HH:mm:ss'))
>>           watermarks:
>>             type: periodic-bounded
>>             delay: 5000
>>
>
> SQL client doesn't complain about the file but, when I execute "SELECT
> timestampCol from test", the job fails with the following error message:
>
>> Caused by: java.lang.NullPointerException
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
>> at
>> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at SourceConversion$4.processElement(Unknown Source)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
>> at
>> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
>> at
>> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
>> at
>> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
>> at
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> at
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> at
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
>
>
> On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <ea...@gmail.com> wrote:
>
>> Hi Leonard,
>>
>> Wow, that's great! It works like a charm.
>> I've never considered this approach at all.
>> Thanks a lot.
>>
>> Best,
>> Dongwon
>>
>> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xb...@gmail.com> wrote:
>>
>>> Hi, Kim
>>>
>>> The reason your attempts (2) and (3) failed is that the json format does
>>> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
>>> field and then use a computed column to extract TIMESTAMP field, you can
>>> also define the time attribute on TIMESTAMP filed for using time-based
>>> operations in Flink 1.10.1. But the computed column only support in pure
>>> DDL, the Table API lacks the support and should be aligned in 1.12 as I
>>> know.
>>> The DDL syntax  as following:
>>>
>>> create table test (
>>>   `type` STRING,
>>>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>>>    timestampCol as
>>> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd
>>> HH:mm:ss')), —computed column
>>>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
>>> )   with (
>>>   'connector' = '...',
>>>   'format' = 'json',
>>>   ...
>>> );
>>>
>>>
>>> Best,
>>> Leonard Xu
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>>>
>>>
>>> 在 2020年7月4日,21:21,Dongwon Kim <ea...@gmail.com> 写道:
>>>
>>> Hi,
>>> I use Flink 1.10.1 and I want to use Table API to read JSON messages.
>>> The message looks like below.
>>>
>>>>     {
>>>>        "type":"Update",
>>>>        "location":{
>>>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>>>           "lastUpdateTime":1593866161436
>>>>        }
>>>>     }
>>>
>>>
>>> I wrote the following program just to see whether json messages are
>>> correctly parsed by Table API:
>>>
>>>>     StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>>     EnvironmentSettings envSettings =
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>>>> envSettings);
>>>>     tEnv
>>>>       .connect(
>>>>         new Kafka()
>>>>           .version("universal")
>>>>           .topic(consumerTopic)
>>>>           .startFromLatest()
>>>>           .properties(consumerProperties)
>>>>       )
>>>>       .withFormat(new Json())
>>>>       .withSchema(new Schema().schema(
>>>>         TableSchema.builder()
>>>>           .field("type", STRING())
>>>>           .field("location",
>>>>             ROW(
>>>>               FIELD("id", STRING()),
>>>>               // (1)
>>>>               FIELD("lastUpdateTime", BIGINT())
>>>>               // (2)
>>>>               FIELD("lastUpdateTime", TIMESTAMP())
>>>>               // (3)
>>>>               FIELD("lastUpdateTime",
>>>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>>>             ))
>>>>           .build()
>>>>       ))
>>>>       .createTemporaryTable("message");
>>>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>>>       .print();
>>>
>>>
>>> Note that I tried BIGINT(), TIMESTAMP(), and
>>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
>>> (1) it works fine but later I can't use time-based operations like
>>> windowing.
>>>
>>> (2) it causes the following exception
>>>
>>>> Exception in thread "main"
>>>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>>>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>>>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>>>> the 'location' field of the TableSource return type.
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>>>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>>> at
>>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>>> at
>>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>>> at
>>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>>> at
>>>> java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>>> at
>>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>> at
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>>> at
>>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>>> at
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>>> at
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>>> at
>>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>>>> Caused by: org.apache.flink.table.api.ValidationException: Type
>>>> ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location'
>>>> does not match with the physical type ROW<`id` STRING, `lastUpdateTime`
>>>> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>>> at
>>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>>>> ... 38 more
>>>
>>>
>>> (3) it causes the following exception
>>>
>>>> Caused by: java.time.format.DateTimeParseException: Text
>>>> '1593868714814' could not be parsed at index 0
>>>> at
>>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>>> ... 7 more
>>>
>>>
>>> Can I read such json messages with time information in Flink 1.10.1?
>>>
>>> Thanks
>>>
>>> Dongwon
>>>
>>>
>>>
>

Re: [Table API] how to configure a nested timestamp field

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Kim

> Hi Leonard,
> 
> Can I have a YAML definition corresponding to the DDL you suggested?

Unfortunately the answer is no, the YAML you defined will parse by Table API and then execute, the root cause of your post error is Table API does not support computed column now, 

there is a FLIP under discussion[1], this should be ready in 1.12.0. BTW, I think DDL is recommended way since FLINK 1.11.0.

Best,
Leonard Xu
[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-129%3A+Refactor+Descriptor+API+to+register+connectors+in+Table+API <https://cwiki.apache.org/confluence/display/FLINK/FLIP-129:+Refactor+Descriptor+API+to+register+connectors+in+Table+API>

> 在 2020年7月20日,14:30,Dongwon Kim <ea...@gmail.com> 写道:
> 
> 
> I tried below (Flink 1.11.0) but got some error:
> tables:
>   - name: test
>     type: source-table
>     update-mode: append
>     connector:
>       property-version: 1
>       type: kafka
>       version: universal
>       topic: ...
>       properties:
>         bootstrap.servers: ...
>         group.id <http://group.id/>: ...
>     format:
>       property-version: 1
>       type: json
>     schema:
>       - name: type
>         data-type: STRING
>       - name: location
>         data-type: >
>           ROW<
>             id STRING,
>             lastUpdateTime BIGINT
>           >
>       - name: timestampCol
>         data-type: TIMESTAMP(3)
>         rowtime:
>           timestamps:
>             type: from-field
>             from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss'))
>           watermarks:
>             type: periodic-bounded
>             delay: 5000
> 
> SQL client doesn't complain about the file but, when I execute "SELECT timestampCol from test", the job fails with the following error message:
> Caused by: java.lang.NullPointerException
> at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> 
> On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <eastcirclek@gmail.com <ma...@gmail.com>> wrote:
> Hi Leonard,
> 
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
> 
> Best,
> Dongwon
> 
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xbjtdcq@gmail.com <ma...@gmail.com>> wrote:
> Hi, Kim
> 
> The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
> The DDL syntax  as following:
> 
> create table test (
>   `type` STRING,
>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>    timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
> )   with (
>   'connector' = '...',
>   'format' = 'json',
>   ...
> );
> 
> 
> Best,
> Leonard Xu
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html>
> 
> 
>> 在 2020年7月4日,21:21,Dongwon Kim <eastcirclek@gmail.com <ma...@gmail.com>> 写道:
>> 
>> Hi,
>> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
>>     {
>>        "type":"Update",
>>        "location":{
>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>           "lastUpdateTime":1593866161436
>>        }
>>     }
>> 
>> I wrote the following program just to see whether json messages are correctly parsed by Table API:
>>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>     EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
>>     tEnv
>>       .connect(
>>         new Kafka()
>>           .version("universal")
>>           .topic(consumerTopic)
>>           .startFromLatest()
>>           .properties(consumerProperties)
>>       )
>>       .withFormat(new Json())
>>       .withSchema(new Schema().schema(
>>         TableSchema.builder()
>>           .field("type", STRING())
>>           .field("location",
>>             ROW(
>>               FIELD("id", STRING()),
>>               // (1)
>>               FIELD("lastUpdateTime", BIGINT())
>>               // (2)
>>               FIELD("lastUpdateTime", TIMESTAMP())
>>               // (3)
>>               FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>             ))
>>           .build()
>>       ))
>>       .createTemporaryTable("message");
>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>       .print();
>> 
>> Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
>> (1) it works fine but later I can't use time-based operations like windowing.
>> 
>> (2) it causes the following exception
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>> at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>> at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>> at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>> at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>> at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>> at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>> at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>> ... 38 more
>> 
>> (3) it causes the following exception
>> Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
>> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> ... 7 more
>> 
>> Can I read such json messages with time information in Flink 1.10.1?
>> 
>> Thanks
>> 
>> Dongwon
> 


Re: [Table API] how to configure a nested timestamp field

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Leonard,

Can I have a YAML definition corresponding to the DDL you suggested?

I tried below (Flink 1.11.0) but got some error:

> tables:
>   - name: test
>     type: source-table
>     update-mode: append
>     connector:
>       property-version: 1
>       type: kafka
>       version: universal
>       topic: ...
>       properties:
>         bootstrap.servers: ...
>         group.id: ...
>     format:
>       property-version: 1
>       type: json
>     schema:
>       - name: type
>         data-type: STRING
>       - name: location
>         data-type: >
>           ROW<
>             id STRING,
>             lastUpdateTime BIGINT
>           >
>       - name: timestampCol
>         data-type: TIMESTAMP(3)
>         rowtime:
>           timestamps:
>             type: from-field
>             from: TO_TIMESTAMP(FROM_UNIXTIME(location.lastUpdateTime/1000,
> 'yyyy-MM-dd HH:mm:ss'))
>           watermarks:
>             type: periodic-bounded
>             delay: 5000
>

SQL client doesn't complain about the file but, when I execute "SELECT
timestampCol from test", the job fails with the following error message:

> Caused by: java.lang.NullPointerException
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:236)
> at
> org.apache.flink.table.planner.plan.nodes.physical.stream.PeriodicWatermarkAssignerWrapper.extractTimestamp(StreamExecLegacyTableSourceScan.scala:228)
> at
> org.apache.flink.streaming.runtime.operators.TimestampsAndWatermarksOperator.processElement(TimestampsAndWatermarksOperator.java:94)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at SourceConversion$4.processElement(Unknown Source)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52)
> at
> org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310)
> at
> org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409)
> at
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185)
> at
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> at
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> at
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)


On Mon, Jul 6, 2020 at 3:09 PM Dongwon Kim <ea...@gmail.com> wrote:

> Hi Leonard,
>
> Wow, that's great! It works like a charm.
> I've never considered this approach at all.
> Thanks a lot.
>
> Best,
> Dongwon
>
> On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xb...@gmail.com> wrote:
>
>> Hi, Kim
>>
>> The reason your attempts (2) and (3) failed is that the json format does
>> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
>> field and then use a computed column to extract TIMESTAMP field, you can
>> also define the time attribute on TIMESTAMP filed for using time-based
>> operations in Flink 1.10.1. But the computed column only support in pure
>> DDL, the Table API lacks the support and should be aligned in 1.12 as I
>> know.
>> The DDL syntax  as following:
>>
>> create table test (
>>   `type` STRING,
>>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>>    timestampCol as
>> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd
>> HH:mm:ss')), —computed column
>>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
>> )   with (
>>   'connector' = '...',
>>   'format' = 'json',
>>   ...
>> );
>>
>>
>> Best,
>> Leonard Xu
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>>
>>
>> 在 2020年7月4日,21:21,Dongwon Kim <ea...@gmail.com> 写道:
>>
>> Hi,
>> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
>> message looks like below.
>>
>>>     {
>>>        "type":"Update",
>>>        "location":{
>>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>>           "lastUpdateTime":1593866161436
>>>        }
>>>     }
>>
>>
>> I wrote the following program just to see whether json messages are
>> correctly parsed by Table API:
>>
>>>     StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>>     EnvironmentSettings envSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>>> envSettings);
>>>     tEnv
>>>       .connect(
>>>         new Kafka()
>>>           .version("universal")
>>>           .topic(consumerTopic)
>>>           .startFromLatest()
>>>           .properties(consumerProperties)
>>>       )
>>>       .withFormat(new Json())
>>>       .withSchema(new Schema().schema(
>>>         TableSchema.builder()
>>>           .field("type", STRING())
>>>           .field("location",
>>>             ROW(
>>>               FIELD("id", STRING()),
>>>               // (1)
>>>               FIELD("lastUpdateTime", BIGINT())
>>>               // (2)
>>>               FIELD("lastUpdateTime", TIMESTAMP())
>>>               // (3)
>>>               FIELD("lastUpdateTime",
>>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>>             ))
>>>           .build()
>>>       ))
>>>       .createTemporaryTable("message");
>>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>>       .print();
>>
>>
>> Note that I tried BIGINT(), TIMESTAMP(), and
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
>> (1) it works fine but later I can't use time-based operations like
>> windowing.
>>
>> (2) it causes the following exception
>>
>>> Exception in thread "main"
>>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>>> the 'location' field of the TableSource return type.
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>>> at
>>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>>> at
>>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>>> at
>>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>>> at
>>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>> at
>>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>>> at
>>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>>> at
>>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>>> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id`
>>> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not
>>> match with the physical type ROW<`id` STRING, `lastUpdateTime`
>>> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>>> at
>>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>>> ... 38 more
>>
>>
>> (3) it causes the following exception
>>
>>> Caused by: java.time.format.DateTimeParseException: Text '1593868714814'
>>> could not be parsed at index 0
>>> at
>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> at
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>> ... 7 more
>>
>>
>> Can I read such json messages with time information in Flink 1.10.1?
>>
>> Thanks
>>
>> Dongwon
>>
>>
>>

Re: [Table API] how to configure a nested timestamp field

Posted by Dongwon Kim <ea...@gmail.com>.
Hi Leonard,

Wow, that's great! It works like a charm.
I've never considered this approach at all.
Thanks a lot.

Best,
Dongwon

On Mon, Jul 6, 2020 at 11:26 AM Leonard Xu <xb...@gmail.com> wrote:

> Hi, Kim
>
> The reason your attempts (2) and (3) failed is that the json format does
> not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT
> field and then use a computed column to extract TIMESTAMP field, you can
> also define the time attribute on TIMESTAMP filed for using time-based
> operations in Flink 1.10.1. But the computed column only support in pure
> DDL, the Table API lacks the support and should be aligned in 1.12 as I
> know.
> The DDL syntax  as following:
>
> create table test (
>   `type` STRING,
>   `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
>    timestampCol as
> TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd
> HH:mm:ss')), —computed column
>    WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
> )   with (
>   'connector' = '...',
>   'format' = 'json',
>   ...
> );
>
>
> Best,
> Leonard Xu
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html
>
>
> 在 2020年7月4日,21:21,Dongwon Kim <ea...@gmail.com> 写道:
>
> Hi,
> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The
> message looks like below.
>
>>     {
>>        "type":"Update",
>>        "location":{
>>           "id":"123e4567-e89b-12d3-a456-426652340000",
>>           "lastUpdateTime":1593866161436
>>        }
>>     }
>
>
> I wrote the following program just to see whether json messages are
> correctly parsed by Table API:
>
>>     StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>>     EnvironmentSettings envSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env,
>> envSettings);
>>     tEnv
>>       .connect(
>>         new Kafka()
>>           .version("universal")
>>           .topic(consumerTopic)
>>           .startFromLatest()
>>           .properties(consumerProperties)
>>       )
>>       .withFormat(new Json())
>>       .withSchema(new Schema().schema(
>>         TableSchema.builder()
>>           .field("type", STRING())
>>           .field("location",
>>             ROW(
>>               FIELD("id", STRING()),
>>               // (1)
>>               FIELD("lastUpdateTime", BIGINT())
>>               // (2)
>>               FIELD("lastUpdateTime", TIMESTAMP())
>>               // (3)
>>               FIELD("lastUpdateTime",
>> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>>             ))
>>           .build()
>>       ))
>>       .createTemporaryTable("message");
>>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>>       .print();
>
>
> Note that I tried BIGINT(), TIMESTAMP(), and
> TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> (1) it works fine but later I can't use time-based operations like
> windowing.
>
> (2) it causes the following exception
>
>> Exception in thread "main"
>> org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING,
>> `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match
>> with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of
>> the 'location' field of the TableSource return type.
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
>> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>> at
>> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
>> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>> at
>> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>> at
>> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>> at
>> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>> at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>> at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
>> at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
>> at
>> org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
>> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
>> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id`
>> STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not
>> match with the physical type ROW<`id` STRING, `lastUpdateTime`
>> TIMESTAMP(3)> of the 'location' field of the TableSource return type.
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
>> at
>> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
>> ... 38 more
>
>
> (3) it causes the following exception
>
>> Caused by: java.time.format.DateTimeParseException: Text '1593868714814'
>> could not be parsed at index 0
>> at
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> at
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> ... 7 more
>
>
> Can I read such json messages with time information in Flink 1.10.1?
>
> Thanks
>
> Dongwon
>
>
>

Re: [Table API] how to configure a nested timestamp field

Posted by Leonard Xu <xb...@gmail.com>.
Hi, Kim

The reason your attempts (2) and (3) failed is that the json format does not support convert a BIGINT to TIMESTAMP, you can first define the BIGINT field and then use a computed column to extract TIMESTAMP field, you can also define the time attribute on TIMESTAMP filed for using time-based operations in Flink 1.10.1. But the computed column only support in pure DDL, the Table API lacks the support and should be aligned in 1.12 as I know.
The DDL syntax  as following:

create table test (
  `type` STRING,
  `location` ROW<`id` STRING, lastUpdateTime BIGINT>,
   timestampCol as TO_TIMESTAMP(FROM_UNIXTIME(`location`.lastUpdateTime/1000, 'yyyy-MM-dd HH:mm:ss')), —computed column
   WATERMARK FOR timestampCol AS timestampCol - INTERVAL '5' SECOND
)   with (
  'connector' = '...',
  'format' = 'json',
  ...
);


Best,
Leonard Xu
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html <https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/sql/create.html>


> 在 2020年7月4日,21:21,Dongwon Kim <ea...@gmail.com> 写道:
> 
> Hi,
> I use Flink 1.10.1 and I want to use Table API to read JSON messages. The message looks like below.
>     {
>        "type":"Update",
>        "location":{
>           "id":"123e4567-e89b-12d3-a456-426652340000",
>           "lastUpdateTime":1593866161436
>        }
>     }
> 
> I wrote the following program just to see whether json messages are correctly parsed by Table API:
>     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>     EnvironmentSettings envSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, envSettings);
>     tEnv
>       .connect(
>         new Kafka()
>           .version("universal")
>           .topic(consumerTopic)
>           .startFromLatest()
>           .properties(consumerProperties)
>       )
>       .withFormat(new Json())
>       .withSchema(new Schema().schema(
>         TableSchema.builder()
>           .field("type", STRING())
>           .field("location",
>             ROW(
>               FIELD("id", STRING()),
>               // (1)
>               FIELD("lastUpdateTime", BIGINT())
>               // (2)
>               FIELD("lastUpdateTime", TIMESTAMP())
>               // (3)
>               FIELD("lastUpdateTime", TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class))
>             ))
>           .build()
>       ))
>       .createTemporaryTable("message");
>     tEnv.toAppendStream(tEnv.from("message"), Row.class)
>       .print();
> 
> Note that I tried BIGINT(), TIMESTAMP(), and TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class).
> (1) it works fine but later I can't use time-based operations like windowing.
> 
> (2) it causes the following exception
> Exception in thread "main" org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
> at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:191)
> at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:252)
> at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
> at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
> at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
> at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
> at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
> at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:234)
> at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:212)
> at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:116)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:153)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
> at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:351)
> at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:259)
> at org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.java:250)
> at com.kakaomobility.mobdata.Finder.main(Finder.java:133)
> Caused by: org.apache.flink.table.api.ValidationException: Type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(6)> of table field 'location' does not match with the physical type ROW<`id` STRING, `lastUpdateTime` TIMESTAMP(3)> of the 'location' field of the TableSource return type.
> at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:166)
> at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:188)
> ... 38 more
> 
> (3) it causes the following exception
> Caused by: java.time.format.DateTimeParseException: Text '1593868714814' could not be parsed at index 0
> at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalDateTime(JsonRowDeserializationSchema.java:366)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> ... 7 more
> 
> Can I read such json messages with time information in Flink 1.10.1?
> 
> Thanks
> 
> Dongwon