You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Roman Wozniak (JIRA)" <ji...@apache.org> on 2018/07/24 11:24:00 UTC
[jira] [Updated] (FLINK-9935) Batch Table API: grouping by window
and attribute causes java.lang.ClassCastException:
[ https://issues.apache.org/jira/browse/FLINK-9935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Roman Wozniak updated FLINK-9935:
---------------------------------
Priority: Blocker (was: Major)
> Batch Table API: grouping by window and attribute causes java.lang.ClassCastException:
> --------------------------------------------------------------------------------------
>
> Key: FLINK-9935
> URL: https://issues.apache.org/jira/browse/FLINK-9935
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.5.0, 1.5.1
> Reporter: Roman Wozniak
> Priority: Blocker
>
> Grouping by window AND some other attribute(s) seems broken. Test case attached:
> {code}
> class BatchStatisticsIntegrationTest extends FlatSpec with Matchers {
> trait BatchContext {
> implicit lazy val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
> implicit val tableEnv: BatchTableEnvironment = TableEnvironment.getTableEnvironment(env)
> val data = Seq(
> (1532424567000L, "id1", "location1"),
> (1532424567000L, "id2", "location1"),
> (1532424567000L, "id3", "location1"),
> (1532424568000L, "id1", "location2"),
> (1532424568000L, "id2", "location3")
> )
> val rawDataSet: DataSet[(Long, String, String)] = env.fromCollection(data)
> val table: Table = tableEnv.fromDataSet(rawDataSet, 'rowtime, 'id, 'location)
> }
> it should "be possible to run Table API queries with grouping by tumble window and column(s) on batch data" in new BatchContext {
> val results = table
> .window(Tumble over 1.second on 'rowtime as 'w)
> .groupBy('w, 'location)
> .select(
> 'w.start.cast(Types.LONG),
> 'w.end.cast(Types.LONG),
> 'location,
> 'id.count
> )
> .toDataSet[(Long, Long, String, Long)]
> .collect()
> results should contain theSameElementsAs Seq(
> (1532424567000L, 1532424568000L, "location1", 3L),
> (1532424568000L, 1532424569000L, "location2", 1L),
> (1532424568000L, 1532424569000L, "location3", 1L)
> )
> }
> }
> {code}
> It seems like during execution time, the 'rowtime attribute replaces 'location and that causes ClassCastException.
> {code:java}
> [info] Cause: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
> [info] at org.apache.flink.api.common.typeutils.base.StringSerializer.serialize(StringSerializer.java:28)
> [info] at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:160)
> [info] at org.apache.flink.api.java.typeutils.runtime.RowSerializer.serialize(RowSerializer.java:46)
> [info] at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:54)
> [info] at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:88)
> [info] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:129)
> [info] at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
> [info] at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
> [info] at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
> [info] at org.apache.flink.api.java.operators.translation.RichCombineToGroupCombineWrapper.combine(RichCombineToGroupCombineWrapper.java:52)
> {code}
> Here is some debug information that I was able to get. So, field serializers don't match the type of Row fields:
> {code}
> this.instance = {Row@68451} "1532424567000,(3),1532424567000"
> fields = {Object[3]@68461}
> 0 = {Long@68462} 1532424567000
> 1 = {CountAccumulator@68463} "(3)"
> 2 = {Long@68462} 1532424567000
> this.serializer = {RowSerializer@68452}
> fieldSerializers = {TypeSerializer[3]@68455}
> 0 = {StringSerializer@68458}
> 1 = {TupleSerializer@68459}
> 2 = {LongSerializer@68460}
> arity = 3
> nullMask = {boolean[3]@68457}
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)