You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Benchao Li (Jira)" <ji...@apache.org> on 2019/12/27 07:42:00 UTC

[jira] [Created] (FLINK-15421) GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp

Benchao Li created FLINK-15421:
----------------------------------

             Summary: GroupAggsHandler throws java.time.LocalDateTime cannot be cast to java.sql.Timestamp
                 Key: FLINK-15421
                 URL: https://issues.apache.org/jira/browse/FLINK-15421
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.9.1, 1.10.0
            Reporter: Benchao Li


`TimestmapType` has two types of physical representation: `Timestamp` and `LocalDateTime`. When we use following SQL, it will conflict each other:
{quote}SELECT 
 SUM(cnt) as s, 
 MAX(ts)
 FROM 
 SELECT 
 `string`,
 `int`,
 COUNT(*) AS cnt,
 MAX(rowtime) as ts
 FROM T1
 GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND)
 GROUP BY `string`
{quote}
with 'table.exec.emit.early-fire.enabled' = true.

The exceptions is below:
{quote}Caused by: java.lang.ClassCastException: java.time.LocalDateTime cannot be cast to java.sql.Timestamp
 at GroupAggsHandler$83.getValue(GroupAggsHandler$83.java:529)
 at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:164)
 at org.apache.flink.table.runtime.operators.aggregate.GroupAggFunction.processElement(GroupAggFunction.java:43)
 at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
 at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
 at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
 at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311)
 at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:488)
 at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470)
 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527)
 at java.lang.Thread.run(Thread.java:748)
{quote}
I also create a UT to quickly reproduce this bug in `WindowAggregateITCase`:
{quote} @Test
  def testEarlyFireWithTumblingWindow(): Unit = {
    val stream = failingDataSource(data)
      .assignTimestampsAndWatermarks(
        new TimestampAndWatermarkWithOffset
          [(Long, Int, Double, Float, BigDecimal, String, String)](10L))
    val table = stream.toTable(tEnv,
      'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'name)
    tEnv.registerTable("T1", table)
    tEnv.getConfig.getConfiguration.setBoolean("table.exec.emit.early-fire.enabled", true)
    tEnv.getConfig.getConfiguration.setString("table.exec.emit.early-fire.delay", "1000 ms")

    val sql =
      """
        |SELECT
        |  SUM(cnt) as s,
        |  MAX(ts)
        |FROM
        |  (SELECT
        |    `string`,
        |    `int`,
        |    COUNT(*) AS cnt,
        |    MAX(rowtime) as ts
        |  FROM T1
        |  GROUP BY `string`, `int`, TUMBLE(rowtime, INTERVAL '10' SECOND))
        |GROUP BY `string`
        |""".stripMargin

    tEnv.sqlQuery(sql).toRetractStream[Row].print()
    env.execute()
  }{quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)