You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefano Bortoli (JIRA)" <ji...@apache.org> on 2017/08/01 15:17:00 UTC

[jira] [Updated] (FLINK-7338) User Defined aggregation with constants causes error under in lowerbound over window extraction

     [ https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stefano Bortoli updated FLINK-7338:
-----------------------------------
    Description: 
A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window.

{code}
val sqlQuery = "SELECT a, " +
      "  myAgg(a, CAST('1' as BIGINT)) "+
      "   OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND 
               PRECEDING AND CURRENT ROW) " +
      "FROM MyTable"
{code}

The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala

we do : field count - lower bound index
--  which causes a -1 get, and subsequent RuntimeException. 
We should do: lower bound offset - field count to find the value in the constant array.

The code below should fix the problem.

{code}
private[flink] def getLowerBoundary(
    logicWindow: Window,
    overWindow: Group,
    input: RelNode): Long = {

    val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
    val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
    val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
    lowerBound match {
      case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
      case _ => lowerBound.asInstanceOf[Long]
    }
  }
{code}

  was:
A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window.

[code]
val sqlQuery = "SELECT a, " +
      "  myAgg(a, CAST('1' as BIGINT)) "+
      "   OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND 
               PRECEDING AND CURRENT ROW) " +
      "FROM MyTable"
[code]

The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala

we do : field count - lower bound index
--  which causes a -1 get, and subsequent RuntimeException. 
We should do: lower bound offset - field count to find the value in the constant array.

The code below should fix the problem.

[code]
private[flink] def getLowerBoundary(
    logicWindow: Window,
    overWindow: Group,
    input: RelNode): Long = {

    val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
    val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
    val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
    lowerBound match {
      case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
      case _ => lowerBound.asInstanceOf[Long]
    }
  }
[code]


> User Defined aggregation with constants causes error under in lowerbound over window extraction
> -----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-7338
>                 URL: https://issues.apache.org/jira/browse/FLINK-7338
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.1
>            Reporter: Stefano Bortoli
>            Priority: Critical
>
> A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window.
> {code}
> val sqlQuery = "SELECT a, " +
>       "  myAgg(a, CAST('1' as BIGINT)) "+
>       "   OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND 
>                PRECEDING AND CURRENT ROW) " +
>       "FROM MyTable"
> {code}
> The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala
> we do : field count - lower bound index
> --  which causes a -1 get, and subsequent RuntimeException. 
> We should do: lower bound offset - field count to find the value in the constant array.
> The code below should fix the problem.
> {code}
> private[flink] def getLowerBoundary(
>     logicWindow: Window,
>     overWindow: Group,
>     input: RelNode): Long = {
>     val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
>     val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
>     val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
>     lowerBound match {
>       case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
>       case _ => lowerBound.asInstanceOf[Long]
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)