You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Barak Ben-Nathan (Jira)" <ji...@apache.org> on 2021/03/24 16:13:00 UTC
[jira] [Created] (FLINK-21962) SQL Group Windows do not work on
Flink 1.11
Barak Ben-Nathan created FLINK-21962:
----------------------------------------
Summary: SQL Group Windows do not work on Flink 1.11
Key: FLINK-21962
URL: https://issues.apache.org/jira/browse/FLINK-21962
Project: Flink
Issue Type: Bug
Components: Table SQL / API, Table SQL / Planner
Affects Versions: 1.11.3
Reporter: Barak Ben-Nathan
I am running this on Flink ver.1.11.3:
{code:java}
val bsEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bsTableEnv: StreamTableEnvironment = StreamTableEnvironment.create(bsEnv, bsSettings)
val cat = new GenericInMemoryCatalog("custom_catalog", "customDB")
bsTableEnv.registerCatalog("custom_catalog", cat)
bsTableEnv.useCatalog("custom_catalog")
bsTableEnv.useDatabase("customDB")
val createInputTableDDL =
"""
|
|CREATE TABLE kafkaTable (
| user_id BIGINT,
| item_id BIGINT,
| category_id BIGINT,
| behavior STRING,
| timestamp1 TIMESTAMP(3),
| WATERMARK FOR timestamp1 AS timestamp1
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'test-in',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'json',
| 'scan.startup.mode' = 'earliest-offset'
|)
|""".stripMargin
val createOutputTableDDL =
"""
|CREATE TABLE kafkaOutTable (
| strt TIMESTAMP(3),
| sum_ijh BIGINT
|) WITH (
| 'connector' = 'kafka',
| 'topic' = 'test-out',
| 'properties.bootstrap.servers' = 'localhost:9092',
| 'properties.group.id' = 'testGroup',
| 'format' = 'json',
| 'scan.startup.mode' = 'earliest-offset'
|)
|""".stripMargin
bsTableEnv.executeSql(createInputTableDDL)
bsTableEnv.executeSql(createOutputTableDDL)
val result = bsTableEnv.sqlQuery(
"SELECT HOP_START(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE) as strt, sum(item_id) as sum_ijh FROM kafkaTable " +
"GROUP BY HOP(timestamp1, INTERVAL '10' SECOND, INTERVAL '2' MINUTE)"
)
result.executeInsert("kafkaOutTable")
{code}
The job keeps running (locally), it does not stop, but It doesn't work: no output.
When setting log level to TRACE, the job fails with this exception:
{code:java}
Exception in thread "main" java.lang.ClassCastException: org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to org.apache.flink.table.planner.plan.cost.FlinkCostFactoryException in thread "main" java.lang.ClassCastException: org.apache.calcite.plan.RelOptCostImpl$Factory cannot be cast to org.apache.flink.table.planner.plan.cost.FlinkCostFactory at org.apache.flink.table.planner.plan.nodes.common.CommonPhysicalExchange.computeSelfCost(CommonPhysicalExchange.scala:50) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdNonCumulativeCost.getNonCumulativeCost(FlinkRelMdNonCumulativeCost.scala:41) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_NonCumulativeCost.getNonCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getNonCumulativeCost(RelMetadataQuery.java:284) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:38) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.$anonfun$getCumulativeCost$1(FlinkRelMdCumulativeCost.scala:41) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.metadata.FlinkRelMdCumulativeCost.getCumulativeCost(FlinkRelMdCumulativeCost.scala:40) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost_$(Unknown Source) at GeneratedMetadataHandler_CumulativeCost.getCumulativeCost(Unknown Source) at org.apache.calcite.rel.metadata.RelMetadataQuery.getCumulativeCost(RelMetadataQuery.java:265) at org.apache.calcite.plan.AbstractRelOptPlanner.getCost(AbstractRelOptPlanner.java:249) at org.apache.calcite.plan.hep.HepPlanner.dumpGraph(HepPlanner.java:1045) at org.apache.calcite.plan.hep.HepPlanner.setRoot(HepPlanner.java:162) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$2(FlinkGroupProgram.scala:63) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1(FlinkGroupProgram.scala:60) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.$anonfun$optimize$1$adapted(FlinkGroupProgram.scala:55) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.immutable.Range.foreach(Range.scala:158) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:196) at scala.collection.TraversableOnce$folder$1.apply(TraversableOnce.scala:194) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:108) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1270) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:701) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549){code}
When I upgrade to version 1.12.2. The problem vanishes, there is output.
It might be related to this:
https://issues.apache.org/jira/browse/FLINK-15333
--
This message was sent by Atlassian Jira
(v8.3.4#803005)