You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Stefano Bortoli (JIRA)" <ji...@apache.org> on 2017/08/01 16:08:00 UTC

[jira] [Created] (FLINK-7339) aggregationToString fails when user defined aggregation contains constants

Stefano Bortoli created FLINK-7339:
--------------------------------------

             Summary: aggregationToString fails when user defined aggregation contains constants
                 Key: FLINK-7339
                 URL: https://issues.apache.org/jira/browse/FLINK-7339
             Project: Flink
          Issue Type: Bug
          Components: DataStream API, Table API & SQL
    Affects Versions: 1.3.1
            Reporter: Stefano Bortoli


Issue related to FLINK-7338, when the user defined aggregation contains a constant it breaks the aggregation translation to string, which are mapped 1 to 1 to the input fields. 

OverAggregates.scala aggregationToString function fails to find a parameter of the function among the input fields, and therefore throws a RuntimeException. 

{code}
private[flink] def aggregationToString(
    inputType: RelDataType,
    rowType: RelDataType,
    namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {

    val inFields = inputType.getFieldNames.asScala
    val outFields = rowType.getFieldNames.asScala
    val aggStrings = namedAggregates.map(_.getKey).map(
      a => s"${a.getAggregation}(${
        if (a.getArgList.size() > 0) {
          // ERROR HAPPENS HERE!
          a.getArgList.asScala.map(inFields(_)).mkString(", ")
        } else {
          "*"
        }
      })")
    (inFields ++ aggStrings).zip(outFields).map {
      case (f, o) => if (f == o) {
        f
      } else {
        s"$f AS $o"
      }
    }.mkString(", ")

  }

{code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)