You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "godfrey he (Jira)" <ji...@apache.org> on 2019/11/22 03:00:00 UTC

[jira] [Updated] (FLINK-14899) can not be translated to StreamExecDeduplicate when PROCTIME() is defined in query

     [ https://issues.apache.org/jira/browse/FLINK-14899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

godfrey he updated FLINK-14899:
-------------------------------
    Summary: can not be translated to StreamExecDeduplicate when PROCTIME() is defined in query  (was: can not be translated to StreamExecDeduplicate when PROCTIME is defined in query)

> can not be translated to StreamExecDeduplicate when PROCTIME() is defined in query
> ----------------------------------------------------------------------------------
>
>                 Key: FLINK-14899
>                 URL: https://issues.apache.org/jira/browse/FLINK-14899
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.0, 1.9.1
>            Reporter: godfrey he
>            Priority: Major
>             Fix For: 1.10.0
>
>
> CREATE TABLE user_log (
>     user_id VARCHAR,
>     item_id VARCHAR,
>     category_id VARCHAR,
>     behavior VARCHAR,
>     ts TIMESTAMP
> ) WITH (
>     'connector.type' = 'kafka',
>     'connector.version' = 'universal',
>     'connector.topic' = 'user_behavior',
>     'connector.startup-mode' = 'earliest-offset',
>     'connector.properties.0.key' = 'zookeeper.connect',
>     'connector.properties.0.value' = 'localhost:2181',
>     'connector.properties.1.key' = 'bootstrap.servers',
>     'connector.properties.1.value' = 'localhost:9092',
>     'update-mode' = 'append',
>     'format.type' = 'json',
>     'format.derive-schema' = 'true'
> );
> CREATE TABLE user_dist (
>     dt VARCHAR,
>     user_id VARCHAR,
>     behavior VARCHAR
> ) WITH (
>     'connector.type' = 'jdbc',
>     'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
>     'connector.table' = 'user_behavior_dup',
>     'connector.username' = 'root',
>     'connector.password' = ‘******',
>     'connector.write.flush.max-rows' = '1'
> );
> INSERT INTO user_dist
> SELECT
>   dt,
>   user_id,
>   behavior
> FROM (
>    SELECT
>       dt,
>       user_id,
>       behavior,
>      ROW_NUMBER() OVER (PARTITION BY dt, user_id, behavior ORDER BY proc asc ) AS rownum
>    FROM (select DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') as dt,user_id,behavior,PROCTIME() as proc
>             from user_log) )
> WHERE rownum = 1;
> Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:114)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:60)
> at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:149)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:439)
> at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:348)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)