You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Luke Xiong <le...@gmail.com> on 2023/05/02 04:18:03 UTC

Table API behaves differently in STREAMING mode v.s. BATCH mode

Many thanks if anybody could help. I ran into this in version 1.15.

I have a DataStream of a custom type *X* which has a field *foo* of
type *Map<String,
Long>*. I need to query this DataStream to find elements that meet the
conditions like *foo['bar'] == value*.

The table is created with
*tenv.createTemporaryView(tableName, tenv.fromDataStream(XStream, schema))*
then
*tenv.toChangelogStream(tenv.sqlQuery(sql))*
or
*tenv.toDataStream(tenv.sqlQuery(sql))*

My code works in streaming mode, but while in batch mode, I got the
following error:

org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Unsupported type(MAP<STRING, BIGINT>) to generate hash
code, the type(MAP<STRING, BIGINT>) is not supported as a
GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.
        at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
        at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
        at
org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
        at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
        at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
        at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
        at
org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
        at
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
Caused by: java.lang.UnsupportedOperationException: Unsupported
type(MAP<STRING, BIGINT>) to generate hash code, the type(MAP<STRING,
BIGINT>) is not supported as a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.
        at
org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:317)
        at
org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:102)
        at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
        at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
        at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateCodeBody(HashCodeGenerator.scala:95)
        at
org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateRowHash(HashCodeGenerator.scala:60)
        at
org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:323)
        at
org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:102)
        at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at
scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
        at
scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
        at
scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateCodeBody(HashCodeGenerator.scala:95)
        at
org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateRowHash(HashCodeGenerator.scala:60)
        at
org.apache.flink.table.planner.codegen.HashCodeGenerator.generateRowHash(HashCodeGenerator.scala)
        at
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.createHashPartitioner(BatchExecExchange.java:211)
        at
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.java:160)
        at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
        at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
        at
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate.translateToPlanInternal(BatchExecHashAggregate.java:94)
        at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
        at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
        at
org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
        at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
        at
org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
        at
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:65)
        at
org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
        at
org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:86)
        at
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
        at scala.collection.Iterator.foreach(Iterator.scala:937)
        at scala.collection.Iterator.foreach$(Iterator.scala:937)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
        at scala.collection.IterableLike.foreach(IterableLike.scala:70)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike.map(TraversableLike.scala:233)
        at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at
org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
        at
org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188)
        at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223)
        at
org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:218)
        at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:245)
        at
org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:218)
        ...

- Luke

Re: Table API behaves differently in STREAMING mode v.s. BATCH mode

Posted by Shammon FY <zj...@gmail.com>.
Hi Luke,

This issue has been fixed in FLINK-25645 [1]. You can try 1.16 and later
versions of flink.

[1] https://issues.apache.org/jira/browse/FLINK-25645

Best,
Shammon FY

On Tue, May 2, 2023 at 12:18 PM Luke Xiong <le...@gmail.com> wrote:

> Many thanks if anybody could help. I ran into this in version 1.15.
>
> I have a DataStream of a custom type *X* which has a field *foo* of type *Map<String,
> Long>*. I need to query this DataStream to find elements that meet the
> conditions like *foo['bar'] == value*.
>
> The table is created with
> *tenv.createTemporaryView(tableName, tenv.fromDataStream(XStream, schema))*
> then
> *tenv.toChangelogStream(tenv.sqlQuery(sql))*
> or
> *tenv.toDataStream(tenv.sqlQuery(sql))*
>
> My code works in streaming mode, but while in batch mode, I got the
> following error:
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: Unsupported type(MAP<STRING, BIGINT>) to generate
> hash code, the type(MAP<STRING, BIGINT>) is not supported as a
> GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.
>         at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
>         at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
>         at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
>         at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:836)
>         at
> org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:247)
>         at
> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1078)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1156)
>         at
> org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1156)
> Caused by: java.lang.UnsupportedOperationException: Unsupported
> type(MAP<STRING, BIGINT>) to generate hash code, the type(MAP<STRING,
> BIGINT>) is not supported as a GROUP_BY/PARTITION_BY/JOIN_EQUAL/UNION field.
>         at
> org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:317)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:102)
>         at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>         at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>         at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>         at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37)
>         at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateCodeBody(HashCodeGenerator.scala:95)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateRowHash(HashCodeGenerator.scala:60)
>         at
> org.apache.flink.table.planner.codegen.CodeGenUtils$.hashCodeForType(CodeGenUtils.scala:323)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.$anonfun$generateCodeBody$1(HashCodeGenerator.scala:102)
>         at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>         at
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:32)
>         at
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:29)
>         at
> scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:37)
>         at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateCodeBody(HashCodeGenerator.scala:95)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator$.generateRowHash(HashCodeGenerator.scala:60)
>         at
> org.apache.flink.table.planner.codegen.HashCodeGenerator.generateRowHash(HashCodeGenerator.scala)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.createHashPartitioner(BatchExecExchange.java:211)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange.translateToPlanInternal(BatchExecExchange.java:160)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecHashAggregate.translateToPlanInternal(BatchExecHashAggregate.java:94)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:249)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecSink.translateToPlanInternal(BatchExecSink.java:65)
>         at
> org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:158)
>         at
> org.apache.flink.table.planner.delegation.BatchPlanner.$anonfun$translateToPlan$1(BatchPlanner.scala:86)
>         at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233)
>         at scala.collection.Iterator.foreach(Iterator.scala:937)
>         at scala.collection.Iterator.foreach$(Iterator.scala:937)
>         at scala.collection.AbstractIterator.foreach(Iterator.scala:1425)
>         at scala.collection.IterableLike.foreach(IterableLike.scala:70)
>         at scala.collection.IterableLike.foreach$(IterableLike.scala:69)
>         at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>         at scala.collection.TraversableLike.map(TraversableLike.scala:233)
>         at scala.collection.TraversableLike.map$(TraversableLike.scala:226)
>         at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>         at
> org.apache.flink.table.planner.delegation.BatchPlanner.translateToPlan(BatchPlanner.scala:85)
>         at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:188)
>         at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:223)
>         at
> org.apache.flink.table.api.bridge.internal.AbstractStreamTableEnvironmentImpl.toStreamInternal(AbstractStreamTableEnvironmentImpl.java:218)
>         at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:245)
>         at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:218)
>         ...
>
> - Luke
>