You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/10/02 21:12:00 UTC

[jira] [Commented] (FLINK-7371) user defined aggregator assumes nr of arguments smaller or equal than number of row fields

    [ https://issues.apache.org/jira/browse/FLINK-7371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16188843#comment-16188843 ] 

ASF GitHub Bot commented on FLINK-7371:
---------------------------------------

Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4736#discussion_r142254410
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/OverWindowITCase.scala ---
    @@ -85,6 +85,46 @@ class OverWindowITCase extends StreamingWithStateTestBase {
       }
     
       @Test
    +  def testOverWindowWithConstant(): Unit = {
    +
    +    val data = List(
    +      (1L, 1, "Hello"),
    +      (2L, 2, "Hello"),
    +      (3L, 3, "Hello"),
    +      (4L, 4, "Hello"),
    +      (5L, 5, "Hello"),
    +      (6L, 6, "Hello"),
    +      (7L, 7, "Hello World"),
    +      (8L, 8, "Hello World"),
    +      (8L, 8, "Hello World"),
    +      (20L, 20, "Hello World"))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
    +    val weightAvgFun = new WeightedAvg
    +
    +    val windowedTable = table
    +      .window(
    +        Over partitionBy 'c orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
    +      .select('c, weightAvgFun('a, 42, 'b, "2") over 'w as 'wAvg)
    +      .select('c, 'wAvg)
    --- End diff --
    
    can be removed


> user defined aggregator assumes nr of arguments smaller or equal than number of row fields
> ------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7371
>                 URL: https://issues.apache.org/jira/browse/FLINK-7371
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.1
>            Reporter: Stefano Bortoli
>            Assignee: Timo Walther
>
> The definition of user define aggregations with a number of parameters larger than the row fields causes ArrayIndexOutOfBoundsException because the indexing is based on a linear iteration over row fields. This does not consider cases where fields can be used more than once and constant values are passed to the aggregation function.
> for example:
> {code}
> window(partition {} order by [2] rows between $5 PRECEDING and CURRENT ROW aggs [myAgg($0, $1, $3, $0, $4)])
> {code}
> where $3 and $4 are reference to constants, and $0 and $1 are fields causes:
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 4
> 	at org.apache.flink.table.plan.schema.RowSchema.mapIndex(RowSchema.scala:134)
> 	at org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
> 	at org.apache.flink.table.plan.schema.RowSchema$$anonfun$mapAggregateCall$1.apply(RowSchema.scala:147)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> 	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at org.apache.flink.table.plan.schema.RowSchema.mapAggregateCall(RowSchema.scala:147)
> 	at org.apache.flink.table.plan.nodes.datastream.DataStreamOverAggregate$$anonfun$9.apply(DataStreamOverAggregate.scala:362)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)