You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2019/12/30 06:53:00 UTC

[jira] [Assigned] (FLINK-15421) GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp

     [ https://issues.apache.org/jira/browse/FLINK-15421?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu reassigned FLINK-15421:
-------------------------------

    Assignee: Zhenghua Gao

> GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp
> ------------------------------------------------------------------------------------
>
>                 Key: FLINK-15421
>                 URL: https://issues.apache.org/jira/browse/FLINK-15421
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.9.1, 1.10.0
>            Reporter: Benchao Li
>            Assignee: Zhenghua Gao
>            Priority: Critical
>             Fix For: 1.9.2, 1.10.0
>
>
> `TimestmapType` has two types of physical representation: `Timestamp` and `LocalDateTime`. When we use following SQL, it will conflict each other:
> {code:java}
> SELECT 
>   SUM(cnt) as s, 
>   MAX(ts)
> FROM 
>   SELECT 
>     `string`,
>     `int`,
>     COUNT(*) AS cnt,
>     MAX(rowtime) as ts
>   FROM T1
>   GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
> GROUP BY `string`
> {code}
> with 'table.exec.emit.early-fire.enabled' = true.
> The exceptions is below:
> {quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
>  at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
>  at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
>  at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
>  at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
>  at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
>  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
>  at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
>  at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
>  at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
>  at java.lang.Thread.run(Thread.java:748)
> {quote}
> I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
> {code:java}
> @Test
> def testEarlyFireWithTumblingWindow(): Unit = {
>   val stream = failingDataSource(data)
>     .assignTimestampsAndWatermarks(
>       new TimestampAndWatermarkWithOffset
>         [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
>   val table = stream.toTable(tEnv,
>     'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
>   tEnv.registerTable("T1", table)
>   tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true)
>   tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms")
>   val sql =
>     """
>       |SELECT
>       |  SUM(cnt) as s,
>       |  MAX(ts)
>       |FROM
>       |  (SELECT
>       |    `string`,
>       |    `int`,
>       |    COUNT(*) AS cnt,
>       |    MAX(rowtime) as ts
>       |  FROM T1
>       |  GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
>       |GROUP BY `string`
>       |""".stripMargin
>   tEnv.sqlQuery(sql).toRetractStream[Row].print()
>   env.execute()
> }
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)