You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2017/04/24 11:58:04 UTC

[jira] [Created] (FLINK-6368) Grouping keys in stream aggregations have wrong order

Timo Walther created FLINK-6368:
-----------------------------------

             Summary: Grouping keys in stream aggregations have wrong order
                 Key: FLINK-6368
                 URL: https://issues.apache.org/jira/browse/FLINK-6368
             Project: Flink
          Issue Type: Bug
          Components: Table API & SQL
            Reporter: Timo Walther


FLINK-5768 removed the `AggregateUtil.createPrepareMapFunction` stage. It seems that the order of grouping keys is sometimes messed up. The following tests fails:

{code}
  @Test
  def testEventTimeSlidingGroupWindow(): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    val tEnv = TableEnvironment.getTableEnvironment(env)
    StreamITCase.testResults = mutable.MutableList()

    val stream = env
      .fromCollection(data)
      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
      .map(t => (t._2, t._6))
    val table = stream.toTable(tEnv, 'int, 'string)

    val windowedTable = table
      .window(Slide over 10.milli every 5.milli on 'rowtime as 'w)
      .groupBy('w, 'string)
      .select('string, 'int.count, 'w.start, 'w.end)

    val results = windowedTable.toDataStream[Row]
    results.addSink(new StreamITCase.StringSink)
    env.execute()
  }
{code}

Exception:
{code}
Caused by: java.lang.RuntimeException: Could not forward element to next operator
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:532)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:505)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:485)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:871)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:849)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:50)
	at org.apache.flink.table.runtime.aggregate.TimeWindowPropertyCollector.collect(TimeWindowPropertyCollector.scala:29)
	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateWindowFunction.apply(IncrementalAggregateWindowFunction.scala:74)
	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:64)
	at org.apache.flink.table.runtime.aggregate.IncrementalAggregateTimeWindowFunction.apply(IncrementalAggregateTimeWindowFunction.scala:35)
	at org.apache.flink.streaming.runtime.operators.windowing.functions.InternalSingleValueWindowFunction.process(InternalSingleValueWindowFunction.java:45)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:598)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:505)
	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:276)
	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:119)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:940)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:288)
	... 7 more
Caused by: java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
{code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)