You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Fabian Hueske (JIRA)" <ji...@apache.org> on 2017/05/28 12:56:04 UTC

[jira] [Commented] (FLINK-6736) Fix UDTF codegen bug when window follow by join( UDTF)

    [ https://issues.apache.org/jira/browse/FLINK-6736?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027810#comment-16027810 ] 

Fabian Hueske commented on FLINK-6736:
--------------------------------------

Yes, this looks like a bug that needs to be fixed, IMO.
If your analysis is correct, this would mean that we cannot use a window after a TableFunction was applied.

Although this is a serious limitation of the Table API, I'm not sure if this qualifies as a release blocker.
it would be good to fix it ASAP nonetheless, so we can include it if there is another RC or in the first bugfix release of 1.3.x.

> Fix UDTF codegen bug when window follow by join( UDTF)
> ------------------------------------------------------
>
>                 Key: FLINK-6736
>                 URL: https://issues.apache.org/jira/browse/FLINK-6736
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>    Affects Versions: 1.3.0
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> When we run the tableAPI as follows:
> {code}
> val table = stream.toTable(tEnv, 'long.rowtime, 'int, 'double, 'float, 'bigdec, 'date,'pojo, 'string)
>     val windowedTable = table
>       .join(udtf2('string) as ('a, 'b))
>       .window(Slide over 5.milli every 2.milli on 'long as 'w)
>       .groupBy('w)
>       .select('int.count, agg1('pojo, 'bigdec, 'date, 'int), 'w.start, 'w.end)
> {code}
> We will get the error message:
> {code}
> org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
> 	at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
> 	at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
> 	at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397)
> 	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
> 	at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
> 	at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.compile(CRowCorrelateProcessRunner.scala:35)
> 	at org.apache.flink.table.runtime.CRowCorrelateProcessRunner.open(CRowCorrelateProcessRunner.scala:59)
> 	at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:111)
> 	at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:377)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> Caused by: org.codehaus.commons.compiler.CompileException: Line 77, Column 62: Unknown variable or type "in2"
> 	at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11523)
> 	at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6292)
> 	at org.codehaus.janino.UnitCompiler.access$12900(UnitCompiler.java:209)
> 	at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5904)
> 	at org.codehaus.janino.UnitCompiler$18.visitPackage(UnitCompiler.java:5901)
> 	at org.codehaus.janino.Java$Package.accept(Java.java:4074)
> 	at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:5901)
> 	at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6287)
> 	at org.codehaus.janino.UnitCompiler.access$13500(UnitCompiler.java:209)
> {code}
> The reason is {{val generator = new CodeGenerator(config, false, inputSchema.physicalTypeInfo)}} `physicalTypeInfo` will remove the TimeIndicator.
> I think we should fix this. What do you think [~fhueske] [~twalthr] , And hope your suggestions. :)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)