You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by YI <uu...@protonmail.com> on 2020/06/28 07:37:17 UTC

Convert sql table with field of type MULITSET to datastream with field of type java.util.Map[T, java.lang.Integer]

Hi, all

I am trying to do something like this
```
tEnv
.sqlQuery("SELECT rawEvent.id, collect(rawEvent.name) FROM rawEvent GROUP BY rawEvent.id")
.toRetractStream[(Long, java.util.Map[String, java.lang.Integer])]
```

An exception is thrown when I ran the above code with the default planner setting in 1.10.1. I presume I am using the older planner.

```
Exception in thread "main" org.apache.flink.table.api.TableException: Result field does not match requested type. Requested: GenericType<java.util.Map>; Actual: Multiset<String>
at org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2(Conversions.scala:104)
at org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2$adapted(Conversions.scala:98)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
at org.apache.flink.table.planner.DataStreamConversions$.getConversionMapperWithChanges(DataStreamConversions.scala:184)
at org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:90)
at org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:413)
at org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
at org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:273)
at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
at org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:127)
at org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
at io.redacted.sub.package$.getMatchesWithParent(package.scala:244)
at io.redacted.sub.package$.process(package.scala:156)
at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
at io.redacted.DataAggregator.main(DataAggregator.scala)

Process finished with exit code 1
```

The result type of aggregation function collect is multiset. How do I convert it to a `java.util.Map[String, java.lang.Integer]`?

Cheers,
YI

Re: Convert sql table with field of type MULITSET to datastream with field of type java.util.Map[T, java.lang.Integer]

Posted by Timo Walther <tw...@apache.org>.
Hi YI,

not all conversion might be supported in the `toRetractStream` method. 
Unfortunately, the rework of the type system is still in progress. I 
hope we can improve the user experience there quite soon.

Have you tried to use `Row` instead? `toRetractStream[Row]` should work 
for all data types. A subsequent conversion MapFunction can then 
transform the data in your desired representation.

Regards,
Timo


On 28.06.20 09:37, YI wrote:
> Hi, all
> 
> I am trying to do something like this
> ```
> tEnv
>    .sqlQuery("SELECT rawEvent.id, collect(rawEvent.name) FROM rawEvent 
> GROUP BY rawEvent.id")
>    .toRetractStream[(Long, java.util.Map[String, java.lang.Integer])]
> ```
> 
> An exception is thrown when I ran the above code with the default 
> planner setting in 1.10.1. I presume I am using the older planner.
> 
> ```
> Exception in thread "main" org.apache.flink.table.api.TableException: 
> Result field does not match requested type. Requested: 
> GenericType<java.util.Map>; Actual: Multiset<String>
> at 
> org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2(Conversions.scala:104)
> at 
> org.apache.flink.table.planner.Conversions$.$anonfun$generateRowConverterFunction$2$adapted(Conversions.scala:98)
> at 
> scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
> at 
> scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
> at 
> org.apache.flink.table.planner.Conversions$.generateRowConverterFunction(Conversions.scala:98)
> at 
> org.apache.flink.table.planner.DataStreamConversions$.getConversionMapperWithChanges(DataStreamConversions.scala:184)
> at 
> org.apache.flink.table.planner.DataStreamConversions$.convert(DataStreamConversions.scala:90)
> at 
> org.apache.flink.table.planner.StreamPlanner.translateOptimized(StreamPlanner.scala:413)
> at 
> org.apache.flink.table.planner.StreamPlanner.translateToType(StreamPlanner.scala:402)
> at 
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:185)
> at 
> org.apache.flink.table.planner.StreamPlanner.$anonfun$translate$1(StreamPlanner.scala:117)
> at 
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:273)
> at scala.collection.Iterator.foreach(Iterator.scala:943)
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.TraversableLike.map(TraversableLike.scala:273)
> at scala.collection.TraversableLike.map$(TraversableLike.scala:266)
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> at 
> org.apache.flink.table.planner.StreamPlanner.translate(StreamPlanner.scala:117)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.scala:210)
> at 
> org.apache.flink.table.api.scala.internal.StreamTableEnvironmentImpl.toRetractStream(StreamTableEnvironmentImpl.scala:127)
> at 
> org.apache.flink.table.api.scala.TableConversions.toRetractStream(TableConversions.scala:146)
> at io.redacted.sub.package$.getMatchesWithParent(package.scala:244)
> at io.redacted.sub.package$.process(package.scala:156)
> at io.redacted.DataAggregator$.main(DataAggregator.scala:15)
> at io.redacted.DataAggregator.main(DataAggregator.scala)
> 
> Process finished with exit code 1
> ```
> 
> The result type of aggregation function collect is multiset. How do I 
> convert it to a `java.util.Map[String, java.lang.Integer]`?
> 
> Cheers,
> YI