You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/01/23 14:13:13 UTC

[GitHub] flink pull request #3026: [FLINK-2980] [table] Support for GROUPING SETS cla...

Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3026#discussion_r97311604
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -928,5 +1032,151 @@ class GroupWindowedTable(
         val fieldExprs = ExpressionParser.parseExpressionList(fields)
         select(fieldExprs: _*)
       }
    +}
    +
    +/**
    +  * A table that has been grouped on several sets of grouping keys.
    +  */
    +class GroupingSetsTable(
    +  private[flink] val table: Table,
    +  private[flink] val groups: Seq[Seq[Expression]],
    +  private[flink] val sqlKind: SqlKind) {
    +
    +  /**
    +    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
    +    * The field expressions can contain complex expressions and aggregations.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   tab.groupingSets('key).select('key, 'value.avg + " The average" as 'average)
    +    * }}}
    +    */
    +  def select(fields: Expression*): Table = {
    +
    +    val (aggNames, propNames) = extractAggregationsAndProperties(fields, table.tableEnv)
    +
    +    if (propNames.nonEmpty) {
    +      throw ValidationException("Window properties can only be used on windowed tables.")
    +    }
    +
    +    val groupingSets = sqlKind match {
    +      case SqlKind.CUBE => ExpressionUtils.cube(groups)
    +      case SqlKind.ROLLUP => ExpressionUtils.rollup(groups)
    +      case _ => groups
    +    }
    +
    +    val projectsOnAgg = replaceAggregationsAndProperties(
    +      fields, table.tableEnv, aggNames, propNames)
    +    val projectFields = extractFieldReferences(fields ++ groupingSets.flatten.distinct)
    +
    +    val logical =
    +      Project(projectsOnAgg,
    +        GroupingAggregation(groupingSets, aggNames.map(a => Alias(a._1, a._2)).toSeq,
    +                            Project(projectFields, table.logicalPlan).validate(table.tableEnv)
    +        ).validate(table.tableEnv)
    +      ).validate(table.tableEnv)
    +
    +    new Table(table.tableEnv, logical)
    +  }
    +
    +  /**
    +    * Performs a selection operation on a grouped table. Similar to an SQL SELECT statement.
    +    * The field expressions can contain complex expressions and aggregations.
    +    *
    +    * Example:
    +    *
    +    * {{{
    +    *   tab.groupBy("key").select("key, value.avg + ' The average' as average")
    +    * }}}
    +    */
    +  def select(fields: String): Table = {
    +    val fieldExprs = ExpressionParser.parseExpressionList(fields)
    +    select(fieldExprs: _*)
    +  }
     
    +  /**
    +    * Groups the records of a table by assigning them to windows defined by a time or row interval.
    +    *
    +    * For streaming tables of infinite size, grouping into windows is required to define finite
    +    * groups on which group-based aggregates can be computed.
    +    *
    +    * For batch tables of finite size, windowing essentially provides shortcuts for time-based
    +    * groupBy.
    +    *
    +    * @param groupWindow group-window that specifies how elements are grouped.
    +    * @return A windowed table.
    +    */
    +  def window(groupWindow: GroupWindow): GroupingSetsWindowedTable = {
    +    if (table.tableEnv.isInstanceOf[BatchTableEnvironment]) {
    +      throw new ValidationException(s"Windows on batch tables are currently not supported.")
    +    }
    +    new GroupingSetsWindowedTable(table, groups, sqlKind, groupWindow)
    +  }
    +}
    +
    +class GroupingSetsWindowedTable(
    --- End diff --
    
    We decided to not support grouping sets in a stream environment yet and there also no tests for it. Could you remove this class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---