You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Fei Han <ha...@aliyun.com.INVALID> on 2020/02/23 07:43:40 UTC

timestamp问题

Hi,all:
   我在zeppelin执行如下DDL和SQL,报如下错误:
  DDL:
DROP TABLE IF EXISTS user_log ;
CREATE TABLE user_log (
    user_id VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP
) WITH (
  'connector.type' = 'kafka',       
  'connector.version' = 'universal',
  'connector.topic' = 'ods',
  'connector.startup-mode' = 'earliest-offset',
  'connector.properties.zookeeper.connect'=    'fdw1:2181,fdw2:2181,fdw3:2181',
  'connector.properties.bootstrap.servers' = 'fdw1:9092,fdw2:9092,fdww3:9092',
  'connector.properties.group.id' = 'testGroup',
  'connector.startup-mode' = 'earliest-offset',
  'format.type' = 'json',
  'update-mode' = 'append',
  'format.derive-schema' = 'true');

DROP TABLE IF EXISTS pvuv_sinks ;
CREATE TABLE pvuv_sinks (
    dt VARCHAR,
    pv BIGINT,
    uv BIGINT
) WITH (
    'connector.type' = 'jdbc', -- 使用 jdbc connector
    'connector.url' = 'jdbc:mysql://fdw1:3306/flink', -- jdbc url
    'connector.table' = 'pvuv_sinks', -- 表名
    'connector.username' = 'flink', -- 用户名
    'connector.password' = 'flink', -- 密码
    'connector.write.flush.max-rows' = '5' -- 默认5000条,为了演示改为1条
)
SQL:
 INSERT INTO pvuv_sinks
SELECT
  DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
  COUNT(*) AS pv,
  COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');

报错:
 org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of table field 'ts' does not match with the physical type TIMESTAMP(3) of the 'ts' field of the TableSource return type.
 at org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
 at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
 at org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
 at org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132)
 at org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
 at org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
 at org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
 at org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
 at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
 at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
 at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
 at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
 at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
 at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 at org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
 at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
 at org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
 at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
 at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
 at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
 at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at scala.collection.Iterator$class.foreach(Iterator.scala:891)
 at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
 at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
 at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
 at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
 at scala.collection.AbstractTraversable.map(Traversable.scala:104)
 at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
 at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
 at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
 at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
 at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:341)
 at org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:207)
 at org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
 at org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
 at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
 at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
 at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
 at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
 at org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
 at org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

请大家帮忙看下。

Re: timestamp问题

Posted by Jark Wu <im...@gmail.com>.
Hi Fei,

Kafka source/sink 不支持 TIMESTAMP(6) 类型,支持精度3,且现在 TIMESTAMP 不带精度默认是6,所以需要你将
DDL 声明中的 TIMESTAMP 改成 TIMESTAMP(3).

Beest,
Jark

On Sun, 23 Feb 2020 at 15:44, Fei Han <ha...@aliyun.com.invalid>
wrote:

>
> Hi,all:
>    我在zeppelin执行如下DDL和SQL,报如下错误:
>   DDL:
> DROP TABLE IF EXISTS user_log ;
> CREATE TABLE user_log (
>     user_id VARCHAR,
>     item_id VARCHAR,
>     category_id VARCHAR,
>     behavior VARCHAR,
>     ts TIMESTAMP
> ) WITH (
>   'connector.type' = 'kafka',
>   'connector.version' = 'universal',
>   'connector.topic' = 'ods',
>   'connector.startup-mode' = 'earliest-offset',
>   'connector.properties.zookeeper.connect'=
> 'fdw1:2181,fdw2:2181,fdw3:2181',
>   'connector.properties.bootstrap.servers' =
> 'fdw1:9092,fdw2:9092,fdww3:9092',
>   'connector.properties.group.id' = 'testGroup',
>   'connector.startup-mode' = 'earliest-offset',
>   'format.type' = 'json',
>   'update-mode' = 'append',
>   'format.derive-schema' = 'true');
>
> DROP TABLE IF EXISTS pvuv_sinks ;
> CREATE TABLE pvuv_sinks (
>     dt VARCHAR,
>     pv BIGINT,
>     uv BIGINT
> ) WITH (
>     'connector.type' = 'jdbc', -- 使用 jdbc connector
>     'connector.url' = 'jdbc:mysql://fdw1:3306/flink', -- jdbc url
>     'connector.table' = 'pvuv_sinks', -- 表名
>     'connector.username' = 'flink', -- 用户名
>     'connector.password' = 'flink', -- 密码
>     'connector.write.flush.max-rows' = '5' -- 默认5000条,为了演示改为1条
> )
> SQL:
>  INSERT INTO pvuv_sinks
> SELECT
>   DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
>   COUNT(*) AS pv,
>   COUNT(DISTINCT user_id) AS uv
> FROM user_log
> GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00');
>
> 报错:
>  org.apache.flink.table.api.ValidationException: Type TIMESTAMP(6) of
> table field 'ts' does not match with the physical type TIMESTAMP(3) of the
> 'ts' field of the TableSource return type.
>  at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)
>  at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:277)
>  at
> org.apache.flink.table.utils.TypeMappingUtils$1.defaultMethod(TypeMappingUtils.java:254)
>  at
> org.apache.flink.table.types.logical.utils.LogicalTypeDefaultVisitor.visit(LogicalTypeDefaultVisitor.java:132)
>  at
> org.apache.flink.table.types.logical.TimestampType.accept(TimestampType.java:151)
>  at
> org.apache.flink.table.utils.TypeMappingUtils.checkIfCompatible(TypeMappingUtils.java:254)
>  at
> org.apache.flink.table.utils.TypeMappingUtils.checkPhysicalLogicalTypeCompatible(TypeMappingUtils.java:160)
>  at
> org.apache.flink.table.utils.TypeMappingUtils.lambda$computeInCompositeType$8(TypeMappingUtils.java:232)
>  at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
>  at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
>  at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
>  at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
>  at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
>  at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
>  at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>  at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
>  at
> org.apache.flink.table.utils.TypeMappingUtils.computeInCompositeType(TypeMappingUtils.java:214)
>  at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndices(TypeMappingUtils.java:192)
>  at
> org.apache.flink.table.utils.TypeMappingUtils.computePhysicalIndicesOrTimeAttributeMarkers(TypeMappingUtils.java:112)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.computeIndexMapping(StreamExecTableSourceScan.scala:212)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:107)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlanInternal(StreamExecTableSourceScan.scala:62)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTableSourceScan.translateToPlan(StreamExecTableSourceScan.scala:62)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:54)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalc.translateToPlanInternal(StreamExecCalc.scala:39)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:84)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlanInternal(StreamExecExchange.scala:44)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecExchange.translateToPlan(StreamExecExchange.scala:44)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:139)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlanInternal(StreamExecGroupAggregate.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecGroupAggregate.translateToPlan(StreamExecGroupAggregate.scala:55)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToTransformation(StreamExecSink.scala:184)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:118)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
>  at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
>  at
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>  at
> org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
>  at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
>  at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
>  at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callInsertInto(FlinkSqlInterrpeter.java:341)
>  at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.callCommand(FlinkSqlInterrpeter.java:207)
>  at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.runSqlList(FlinkSqlInterrpeter.java:151)
>  at
> org.apache.zeppelin.flink.FlinkSqlInterrpeter.interpret(FlinkSqlInterrpeter.java:104)
>  at
> org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:103)
>  at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:676)
>  at
> org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:569)
>  at org.apache.zeppelin.scheduler.Job.run(Job.java:172)
>  at
> org.apache.zeppelin.scheduler.AbstractScheduler.runJob(AbstractScheduler.java:121)
>  at
> org.apache.zeppelin.scheduler.ParallelScheduler.lambda$runJobInScheduler$0(ParallelScheduler.java:39)
>  at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>  at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>  at java.lang.Thread.run(Thread.java:745)
>
> 请大家帮忙看下。