You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefano Bortoli (JIRA)" <ji...@apache.org> on 2017/08/01 16:08:00 UTC
[jira] [Created] (FLINK-7339) aggregationToString fails when user
defined aggregation contains constants
Stefano Bortoli created FLINK-7339:
--------------------------------------
Summary: aggregationToString fails when user defined aggregation contains constants
Key: FLINK-7339
URL: https://issues.apache.org/jira/browse/FLINK-7339
Project: Flink
Issue Type: Bug
Components: DataStream API, Table API & SQL
Affects Versions: 1.3.1
Reporter: Stefano Bortoli
Issue related to FLINK-7338, when the user defined aggregation contains a constant it breaks the aggregation translation to string, which are mapped 1 to 1 to the input fields.
OverAggregates.scala aggregationToString function fails to find a parameter of the function among the input fields, and therefore throws a RuntimeException.
{code}
private[flink] def aggregationToString(
inputType: RelDataType,
rowType: RelDataType,
namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
val inFields = inputType.getFieldNames.asScala
val outFields = rowType.getFieldNames.asScala
val aggStrings = namedAggregates.map(_.getKey).map(
a => s"${a.getAggregation}(${
if (a.getArgList.size() > 0) {
// ERROR HAPPENS HERE!
a.getArgList.asScala.map(inFields(_)).mkString(", ")
} else {
"*"
}
})")
(inFields ++ aggStrings).zip(outFields).map {
case (f, o) => if (f == o) {
f
} else {
s"$f AS $o"
}
}.mkString(", ")
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)