You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jingsong Lee (Jira)" <ji...@apache.org> on 2020/05/21 02:51:00 UTC

[jira] [Commented] (FLINK-17803) Should throws a readable exception when group by Map type

    [ https://issues.apache.org/jira/browse/FLINK-17803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112742#comment-17112742 ] 

Jingsong Lee commented on FLINK-17803:
--------------------------------------

[~zouyunhe] Re-open this, we should throws a better exception, FYI

> Should throws a readable exception when group by Map type
> ---------------------------------------------------------
>
>                 Key: FLINK-17803
>                 URL: https://issues.apache.org/jira/browse/FLINK-17803
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: zouyunhe
>            Priority: Major
>             Fix For: 1.11.0
>
>
> We use flink 1.10.0 ,  blink planner,  to  submit a batch sql job to read from a hive table which contains map type fields, and then aggregate.   the sql as below:
> ```
>  create view aaa
>  as select * from table1 where event_id = '0103002' and `day`='2020-05-13'
>  and `hour`='13';
>  create view view_1
>  as
>  select
>  `day`,
>  a.rtime as itime,
>  a.uid as uid,
>  trim(BOTH a.event.log_1['scene']) as refer_list,
>  T.s as abflags,
>  a.hdid as hdid,
>  a.country as country
>  from aaa as a
>  left join LATERAL TABLE(splitByChar(trim(BOTH a.event.log_1['abflag]),
>  ',')) as T(s) on true;
> {color:#172b4d}CREATE VIEW view_6 as {color}
>  {color:#172b4d} SELECT{color}
>  {color:#172b4d} `uid`,{color}
>  {color:#172b4d} `refer_list`,{color}
>  {color:#172b4d} `abflag`,{color}
>  {color:#172b4d}        last_value(country){color}
>  {color:#172b4d} FROM view_1{color}
>  {color:#172b4d} where `refer_list` in ('WELOG_NEARBY', 'WELOG_FOLLOW', 'WELOG_POPULAR'){color}
>  {color:#172b4d} GROUP BY  `uid`, `refer_list`, abflag;{color}
>  insert into ............
>  ``` 
> when submit the job, the exception occurs as below:
>  org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot)
>          at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
>          at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
>          at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
>          at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
>          at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
>          at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
>          at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
>          at java.security.AccessController.doPrivileged(Native Method)
>          at javax.security.auth.Subject.doAs(Subject.java:422)
>          at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
>          at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>          at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
>  Caused by: java.lang.RuntimeException: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot)
>          at sg.bigo.streaming.sql.StreamingSqlRunner.main(StreamingSqlRunner.java:143)
>          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>          at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.lang.reflect.Method.invoke(Method.java:498)
>          at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:321)
>          ... 11 more
>  Caused by: scala.MatchError: MAP (of class org.apache.flink.table.types.logical.LogicalTypeRoot)
>          at org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:212)
>          at org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:97)
>          at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
>          at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
>   
>  and then we found the method hashCodeForType  in the CodeGenUtils class do not match MAP type.  and we fix it as below
> ```
>  def hashCodeForType(
>  ctx: CodeGeneratorContext, t: LogicalType, term: String): String = t.getTypeRoot match
> { case BOOLEAN => s"$\\{className[JBoolean]}
> .hashCode($term)"
>  case MAP => s"$\{className[BaseMap]}.getHashCode($term)"  //the code we add
>  case TINYINT => s"$\{className[JByte]}.hashCode($term)"
>  ```
>  then the job can be sumitted, it run for a while, another exception occurs:
>  java.lang.RuntimeException: Could not instantiate generated class 'HashAggregateWithKeys$1543'
>  at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)
>  at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:46)
>  at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:48)
>  at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:156)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:433)
>  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>  at java.lang.Thread.run(Thread.java:745)
>  Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
>  at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
>  at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
>  at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)
>  ... 8 more
>  Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
>  at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
>  ... 10 more
>  Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.
>  at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
>  at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
>  at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
>  ... 13 more
>  Caused by: org.codehaus.commons.compiler.CompileException: Line 459, Column 57: A method named "compareTo" is not declared in any enclosing class nor any supertype, nor through a static import
>  at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
>  at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997)
>  at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060)
>  at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)
>  at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)
>  at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)
>  at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)
>  at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)
>  at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)
>  at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2580)
>  at org.codehaus.janino.UnitCompiler.access$2700(UnitCompiler.java:215)
>   



--
This message was sent by Atlassian Jira
(v8.3.4#803005)