You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Roman Wozniak (JIRA)" <ji...@apache.org> on 2018/07/24 11:24:00 UTC

[jira] [Updated] (FLINK-9935) Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:

     [ https://issues.apache.org/jira/browse/FLINK-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Roman Wozniak updated FLINK-9935:
---------------------------------
    Priority: Blocker  (was: Major)

> Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:
> --------------------------------------------------------------------------------------
>
>                 Key: FLINK-9935
>                 URL: https://issues.apache.org/jira/browse/FLINK-9935
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API &amp; SQL
>    Affects Versions: 1.5.0, 1.5.1
>            Reporter: Roman Wozniak
>            Priority: Blocker
>
>  Grouping by window AND some other attribute(s) seems broken. Test case attached:
> {code}
> class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {
>   trait BatchContext {
>     implicit lazy val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
>     implicit val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
>     val data = Seq(
>       (1532424567000L, "id1", "location1"),
>       (1532424567000L, "id2", "location1"),
>       (1532424567000L, "id3", "location1"),
>       (1532424568000L, "id1", "location2"),
>       (1532424568000L, "id2", "location3")
>     )
>     val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)
>     val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 'location)
>   }
>   it should "be possible to run Table API queries with grouping by tumble window and column(s) on batch data" in new BatchContext {
>     val results = table
>       .window(Tumble over 1.second on 'rowtime as 'w)
>       .groupBy('w, 'location)
>       .select(
>         'w.start.cast(Types.LONG),
>         'w.end.cast(Types.LONG),
>         'location,
>         'id.count
>       )
>       .toDataSet[(Long, Long, String, Long)]
>       .collect()
>     results should contain theSameElementsAs Seq(
>       (1532424567000L, 1532424568000L, "location1", 3L),
>       (1532424568000L, 1532424569000L, "location2", 1L),
>       (1532424568000L, 1532424569000L, "location3", 1L)
>     )
>   }
> }
> {code}
> It seems like during execution time, the 'rowtime attribute replaces 'location and that causes ClassCastException.
> {code:java}
> [info]   Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
> [info]   at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> [info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
> [info]   at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
> [info]   at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> [info]   at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> [info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> [info]   at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> [info]   at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> [info]   at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> [info]   at org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
> {code}
> Here is some debug information that I was able to get. So, field serializers don't match the type of Row fields:
> {code}
> this.instance = {Row@68451} "1532424567000,(3),1532424567000"
>  fields = {Object[3]@68461} 
>   0 = {Long@68462} 1532424567000
>   1 = {CountAccumulator@68463} "(3)"
>   2 = {Long@68462} 1532424567000
> this.serializer = {RowSerializer@68452} 
>  fieldSerializers = {TypeSerializer[3]@68455} 
>   0 = {StringSerializer@68458} 
>   1 = {TupleSerializer@68459} 
>   2 = {LongSerializer@68460} 
>  arity = 3
>  nullMask = {boolean[3]@68457} 
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)