You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefano Bortoli (JIRA)" <ji...@apache.org> on 2017/08/01 15:17:00 UTC
[jira] [Updated] (FLINK-7338) User Defined aggregation with
constants causes error under in lowerbound over window extraction
[ https://issues.apache.org/jira/browse/FLINK-7338?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stefano Bortoli updated FLINK-7338:
-----------------------------------
Description:
A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window.
{code}
val sqlQuery = "SELECT a, " +
" myAgg(a, CAST('1' as BIGINT)) "+
" OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND
PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
{code}
The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala
we do : field count - lower bound index
-- which causes a -1 get, and subsequent RuntimeException.
We should do: lower bound offset - field count to find the value in the constant array.
The code below should fix the problem.
{code}
private[flink] def getLowerBoundary(
logicWindow: Window,
overWindow: Group,
input: RelNode): Long = {
val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
lowerBound match {
case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
case _ => lowerBound.asInstanceOf[Long]
}
}
{code}
was:
A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window.
[code]
val sqlQuery = "SELECT a, " +
" myAgg(a, CAST('1' as BIGINT)) "+
" OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND
PRECEDING AND CURRENT ROW) " +
"FROM MyTable"
[code]
The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala
we do : field count - lower bound index
-- which causes a -1 get, and subsequent RuntimeException.
We should do: lower bound offset - field count to find the value in the constant array.
The code below should fix the problem.
[code]
private[flink] def getLowerBoundary(
logicWindow: Window,
overWindow: Group,
input: RelNode): Long = {
val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
lowerBound match {
case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
case _ => lowerBound.asInstanceOf[Long]
}
}
[code]
> User Defined aggregation with constants causes error under in lowerbound over window extraction
> -----------------------------------------------------------------------------------------------
>
> Key: FLINK-7338
> URL: https://issues.apache.org/jira/browse/FLINK-7338
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.3.1
> Reporter: Stefano Bortoli
> Priority: Critical
>
> A user defined aggregation that passes a constant among the arguments causes a RuntimeException extracting the lower boundary over window.
> {code}
> val sqlQuery = "SELECT a, " +
> " myAgg(a, CAST('1' as BIGINT)) "+
> " OVER (ORDER BY proctime RANGE BETWEEN INTERVAL '30' SECOND
> PRECEDING AND CURRENT ROW) " +
> "FROM MyTable"
> {code}
> The error is in the org.apache.flink.table.plan.nodes.OverAggregate.scala
> we do : field count - lower bound index
> -- which causes a -1 get, and subsequent RuntimeException.
> We should do: lower bound offset - field count to find the value in the constant array.
> The code below should fix the problem.
> {code}
> private[flink] def getLowerBoundary(
> logicWindow: Window,
> overWindow: Group,
> input: RelNode): Long = {
> val ref: RexInputRef = overWindow.lowerBound.getOffset.asInstanceOf[RexInputRef]
> val lowerBoundIndex = ref.getIndex - input.getRowType.getFieldCount
> val lowerBound = logicWindow.constants.get(lowerBoundIndex).getValue2
> lowerBound match {
> case x: java.math.BigDecimal => x.asInstanceOf[java.math.BigDecimal].longValue()
> case _ => lowerBound.asInstanceOf[Long]
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)