You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "gkgkgk (Jira)" <ji...@apache.org> on 2020/02/07 01:00:08 UTC

[jira] [Comment Edited] (FLINK-15943) Rowtime field name cannot be the same as the json field

    [ https://issues.apache.org/jira/browse/FLINK-15943?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17031751#comment-17031751 ] 

gkgkgk edited comment on FLINK-15943 at 2/7/20 12:59 AM:
---------------------------------------------------------

 

 

 

tracking the source code:
 # at org.apache.flink.table.descriptors.SchemaValidator. deriveFieldMapping(SchemaValidator.java:275),  proctime/rowtime field  be removed from field mapping
 # at org.apache.flink.table.planner.sources.TableSourceUtil.resolveInputField(TableSourceUtil.scala:357),  resolve  field by field mapping

Is there a conflict between the above two?

 

        

 

 


was (Author: gkgkgk):
 

 

 

tracking the source code:
 # at org.apache.flink.table.descriptors.SchemaValidator. deriveFieldMapping(SchemaValidator.java:275),  proctime/rowtime field  be removed from field mapping
 # at org.apache.flink.table.sources.TableSourceValidation.resolveField(TableSourceValidation.java:245),  resolve  field from field mapping

Is there a conflict between the above two?

 

        

 

 

> Rowtime  field name cannot be the same as the json field
> --------------------------------------------------------
>
>                 Key: FLINK-15943
>                 URL: https://issues.apache.org/jira/browse/FLINK-15943
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API, Table SQL / Planner
>    Affects Versions: 1.9.0
>            Reporter: gkgkgk
>            Priority: Major
>
> Run the following sql:
> -- sql start 
> --source
> CREATE TABLE dwd_user_log (
>   id VARCHAR,
>   ctime TIMESTAMP,
>   sessionId VARCHAR,
>   pageId VARCHAR,
>   eventId VARCHAR,
>   deviceId Decimal
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'dev_dwd_user_log_02',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.0.key' = 'zookeeper.connect',
>   'connector.properties.0.value' = 'node01:2181',
>   'connector.properties.1.key' = 'bootstrap.servers',
>   'connector.properties.1.value' = 'node01:9092',
>   'connector.properties.2.key' = 'group.id',
>   'connector.properties.2.value' = 'dev-group',
>   'update-mode' = 'append',
>   'format.type' = 'json',
>   -- 'format.derive-schema' = 'true',
>   'format.json-schema' = '{
>     "type": "object",
>     "properties": {
>       "id": {
>       "type": "string"
>       },
>       "ctime": {
>       "type": "string",
>       "format": "date-time"
>       },
>       "pageId": {
>       "type": "string"
>       },
>       "eventId": {
>       "type": "string"
>       },
>       "sessionId": {
>       "type": "string"
>       },
>       "deviceId": {
>       "type": "number"
>       }
>     }
>   }',
>   'schema.1.rowtime.timestamps.type' = 'from-field',
>   'schema.1.rowtime.timestamps.from' = 'ctime',
>   'schema.1.rowtime.watermarks.type' = 'periodic-bounded',
>   'schema.1.rowtime.watermarks.delay' = '10000'
> );  
> -- sink
> -- sink for pv
> CREATE TABLE dws_pv (
>     windowStart TIMESTAMP,
>   windowEnd TIMESTAMP,
>   pageId VARCHAR,
>   viewCount BIGINT
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'dev_dws_pvuv_02',
>   'connector.startup-mode' = 'latest-offset',
>   'connector.properties.0.key' = 'zookeeper.connect',
>   'connector.properties.0.value' = 'node01:2181',
>   'connector.properties.1.key' = 'bootstrap.servers',
>   'connector.properties.1.value' = 'node01:9092',
>   'connector.properties.2.key' = 'group.id',
>   'connector.properties.2.value' = 'dev-group',
>   'update-mode' = 'append',
>   'format.type' = 'json',
>   'format.derive-schema' = 'true'
> );
> -- pv
> INSERT INTO dws_pv
> SELECT
>   TUMBLE_START(ctime, INTERVAL '20' SECOND)  AS windowStart,
>   TUMBLE_END(ctime, INTERVAL '20' SECOND)  AS windowEnd,
>   pageId,
>   COUNT(deviceId) AS viewCount
> FROM dwd_user_log
> GROUP BY TUMBLE(ctime, INTERVAL '20' SECOND),pageId;
> -- sql end
> And hit the following error:
> {code:java}
> //Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping.Exception in thread "main" org.apache.flink.table.api.ValidationException: Field 'ctime' could not be resolved by the field mapping. at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputField(TableSourceUtil.scala:357) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields$1.apply(TableSourceUtil.scala:388) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.flink.table.planner.sources.TableSourceUtil$.org$apache$flink$table$planner$sources$TableSourceUtil$$resolveInputFields(TableSourceUtil.scala:388) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:275) at org.apache.flink.table.planner.sources.TableSourceUtil$$anonfun$getRowtimeExtractionExpression$1.apply(TableSourceUtil.scala:270) at scala.Option.map(Option.scala:146) at org.apache.flink.table.planner.sources.TableSourceUtil$.getRowtimeExtractionExpression(TableSourceUtil.scala:270) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:117) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:55) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)