You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/02/03 14:00:00 UTC

[jira] [Resolved] (FLINK-15840) ClassCastException is thrown when use tEnv.from for temp/catalog table under Blink planner

     [ https://issues.apache.org/jira/browse/FLINK-15840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu resolved FLINK-15840.
-----------------------------
    Resolution: Fixed

Fixed in 1.11.0: 183c8d143bc3bb5a31747417355690adaba3d226
1.10.0: 9a8f15cba18d431550a2ec587d69b948b4bedabd

> ClassCastException is thrown when use tEnv.from for temp/catalog table under Blink planner
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-15840
>                 URL: https://issues.apache.org/jira/browse/FLINK-15840
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / API
>            Reporter: sunjincheng
>            Assignee: Jingsong Lee
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 20m
>  Remaining Estimate: 0h
>
> ClassCastException is thrown when use ConnectorDescriptor under Blink planner.
> The exception can be reproduced by the following test:
> {code:java}
> @Test
> def testDescriptor(): Unit = {
>  this.env = StreamExecutionEnvironment.getExecutionEnvironment
>  val setting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>  this.tEnv = StreamTableEnvironment.create(env, setting)
>  tEnv.connect(new FileSystem().path("/tmp/input"))
>  .withFormat(new OldCsv().field("word", DataTypes.STRING()))
>  .withSchema(new Schema().field("word", DataTypes.STRING()))
>  .createTemporaryTable("sourceTable")
>  val sink = new TestingAppendSink
>  tEnv.from("sourceTable").toAppendStream[Row].addSink(sink)
>  env.execute()
> }
> {code}
> Exceptions:
> {code:java}
> java.lang.ClassCastException: org.apache.calcite.plan.ViewExpanders$2 cannot be cast to org.apache.flink.table.planner.calcite.FlinkToRelContext
>  at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:89)
>  at org.apache.calcite.rel.rules.TableScanRule.onMatch(TableScanRule.java:55)
>  at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:319)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:560)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:419)
>  at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:256)
>  at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127)
>  at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:215)
>  at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:202)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
>  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
>  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.immutable.Range.foreach(Range.scala:160)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
>  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>  at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
>  at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104)
>  at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:57)
>  at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:167)
>  at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:89)
>  at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77)
>  at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:248)
>  at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:151)
>  at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
>  at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toAppendStream(StreamTableEnvironmentImpl.scala:107)
>  at org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:101)
>  at org.apache.flink.table.planner.runtime.stream.table.CalcITCase.testDescriptor(CalcITCase.scala:541)
> {code}
> It seems we should not cast `context` to `FlinkToRelContext` directly as it could also be an anonymous classes in `org.apache.calcite.plan.ViewExpanders`.
> What do you think? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)