You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefano (Jira)" <ji...@apache.org> on 2019/09/03 09:43:00 UTC

[jira] [Updated] (FLINK-13944) Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.

     [ https://issues.apache.org/jira/browse/FLINK-13944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stefano updated FLINK-13944:
----------------------------
    Description: 
{{Using: Scala streaming API and the StreamTableEnvironment.}}

{{Given the classes:}}

object EntityType extends Enumeration {
  type EntityType = Value
  val ACTIVITY = Value
 }
sealed trait Entity extends Serializable

{{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}}

 

What I try to do is\{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
 {{val activityDataStream = partialComputation1}}
 \{{ .filter(_._1 == EntityType.ACTIVITY)}}
 {{ .map(x => x._3.asInstanceOf[Activity])}}
 {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}

{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
 {{selectedTable.printSchema()}}
 {{// root}}
 {{// |-- card_id: BIGINT}}
 {{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
 {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 2}}
 {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 3}}
 {{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
 {{// output.print}}

{{// ATTEMPT 4}}
 {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}}
 {{output.print}}

 

The result for each of the attempts is always the same:

 

{{------------------------------------------- The program finished with the following exception:}}
 {{org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 334fe364c516008ca34b76e27c5c6f79) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at

... 23 more

Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)}}

 

My project in which I face the error is attached.

  was:
{{Using: Scala streaming API and the StreamTableEnvironment.}}

Given the classes:

{{object EntityType extends Enumeration {}}
{{ type EntityType = Value}}
{{ val ACTIVITY = Value}}
{{}}}

{{sealed trait Entity extends Serializable}}

{{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}}

 

What I try to do is{{ to convert a table after selection to an appendStream.}}

 

{{/** activity table **/}}
{{val activityDataStream = partialComputation1}}
{{ .filter(_._1 == EntityType.ACTIVITY)}}
{{ .map(x => x._3.asInstanceOf[Activity])}}
{{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}


{{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
{{selectedTable.printSchema()}}
{{// root}}
{{// |-- card_id: BIGINT}}
{{// |-- second: BIGINT}}

{{// ATTEMPT 1}}
{{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 2}}
{{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 3}}
{{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
{{// output.print}}

{{// ATTEMPT 4}}
{{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}}
{{output.print}}

 

The result for each of the attempts is always the same:

 

{{------------------------------------------- The program finished with the following exception:}}
 {{org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 334fe364c516008ca34b76e27c5c6f79) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at

... 23 more

Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)}}

 

My project in which I face the error is attached.


> Table.toAppendStream: InvalidProgramException: Table program cannot be compiled.
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-13944
>                 URL: https://issues.apache.org/jira/browse/FLINK-13944
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Scala, Table SQL / API
>    Affects Versions: 1.8.1, 1.9.0
>         Environment: {{$ java -version}}
> {{ openjdk version "1.8.0_222"}}
> {{ OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}}
> {{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}}
> {{------}}
> {{$ scala -version}}
> {{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}}
> {{------}}
> {{build.}}{{sbt}}
> [...]
> ThisBuild / scalaVersion := "2.11.12"
> val flinkVersion = "1.9.0"
> val flinkDependencies = Seq(
>  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
>  "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided")
> [...]
>  
>            Reporter: Stefano
>            Priority: Major
>         Attachments: app.zip
>
>
> {{Using: Scala streaming API and the StreamTableEnvironment.}}
> {{Given the classes:}}
> object EntityType extends Enumeration {
>   type EntityType = Value
>   val ACTIVITY = Value
>  }
> sealed trait Entity extends Serializable
> {{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}}
>  
> What I try to do is\{{ to convert a table after selection to an appendStream.}}
>  
> {{/** activity table **/}}
>  {{val activityDataStream = partialComputation1}}
>  \{{ .filter(_._1 == EntityType.ACTIVITY)}}
>  {{ .map(x => x._3.asInstanceOf[Activity])}}
>  {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}}
> {{val selectedTable = tableEnv.scan("activity").select("card_id, second")}}
>  {{selectedTable.printSchema()}}
>  {{// root}}
>  {{// |-- card_id: BIGINT}}
>  {{// |-- second: BIGINT}}
> {{// ATTEMPT 1}}
>  {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}}
>  {{// output.print}}
> {{// ATTEMPT 2}}
>  {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}}
>  {{// output.print}}
> {{// ATTEMPT 3}}
>  {{// val output = tableEnv.toAppendStream[Row](selectedTable)}}
>  {{// output.print}}
> {{// ATTEMPT 4}}
>  {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}}
>  {{output.print}}
>  
> The result for each of the attempts is always the same:
>  
> {{------------------------------------------- The program finished with the following exception:}}
>  {{org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 334fe364c516008ca34b76e27c5c6f79) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at
> ... 23 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)}}
>  
> My project in which I face the error is attached.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)