You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sunjincheng121 <gi...@git.apache.org> on 2017/04/20 03:19:28 UTC

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

GitHub user sunjincheng121 opened a pull request:

    https://github.com/apache/flink/pull/3743

    [FLINK-6228][table] Integrating the OVER windows in the Table API (st\u2026

    In this PR I had integrating the OVER windows in the Table API, Implementation of the syntax and use examples are as follows\uff1a
    * Syntax:
    ```
    table
       .overWindows(
        (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] 
          (preceding  UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW)
         [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW]
        as alias,...[n])
       )
      .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, \u2026 [n])
    
    ```
    * examples:
    ```
    // Rows clause
    table
       .window(Over partitionBy 'c orderBy 'rowTime  preceding 2.rows as 'w1)
       .select(
         'c,
         'b.count over 'w1 as 'countB,
         'e.sum over 'w1 as 'sumE)
    
    // Range clause
    table
       .window(Over partitionBy 'c orderBy 'rowTime preceding 2.milli as 'w1)
       .select(
         'c,
         'b.count over 'w1 as 'countB,
         'e.sum over 'w1 as 'sumE)
    ```
    * More detail Info : https://docs.google.com/document/d/13Z-Ovx3jwtmzkSweJyGkMy751BouhuJ29Y1CTNZt2DI/edit#
    
    NOTE: The documentation of the OVER tableAPI not included in this PR.
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-6228][table] Integrating the OVER windows in the Table API")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sunjincheng121/flink FLINK-6228-PR

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3743.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3743
    
----
commit 03d4153be93d505cdb47d174e5aafe10eb93a45f
Author: sunjincheng121 <su...@gmail.com>
Date:   2017-04-13T09:36:18Z

    [FLINK-6228][table] Integrating the OVER windows in the Table API (stream)

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112567644
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
    +        // for stream
    +        relBuilder.literal(orderName)
    +      } else {
    +        // for batch
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode,sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach(x=>
    +      partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    precedingValue: Long,
    +    sqlKind: SqlKind): RexWindowBound = {
    +
    +    if (precedingValue == Long.MaxValue) {
    --- End diff --
    
    Please use the constants defined in `expressionDsl.scala` for the checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3743: [FLINK-6228][table] Integrating the OVER windows in the T...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3743
  
    @fhueske Thanks a lot for your comments and suggestions. And I like the idea of move all validations into `validateInput` method. I have updated the PR with following changes:
    1.Check that orderBy field is correctly validated (only rowtime, proctime)
    2.Check that partitionBy fields are correctly validated (existing fields in the input, no complex expressions)
    3.Check that preceding and following are of same type
    4.Check that preceding and following are literals
    5.Check that preceding value is > 0
    6.Check that following value is >= -1
    7.Move all validations into `validateInput` method.
    BTW. In this PR we only add batch check in the `table.scala#window(overWindows: OverWindow*)` method.
    
    Thanks,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112456485
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -810,6 +810,47 @@ class Table(
         new WindowedTable(this, window)
       }
     
    +  /**
    +    * Groups the records of a table by assigning them to windows defined by a time or row interval.
    +    *
    +    * For streaming tables of infinite size, grouping into windows is required to define finite
    +    * groups on which over-based aggregates can be computed.
    +    *
    +    * Over window for batch tables are currently not supported.
    +    *
    +    * @param overWindows windows that specifies how elements are grouped.
    +    * @return Over windowed table
    +    */
    +  def window(overWindows: OverWindow*): OverWindowedTable = {
    +
    +    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
    +      throw TableException("Over window for batch tables are currently not supported.")
    +    } else {
    +      overWindows.foreach { overWindow =>
    +        val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +        if (!orderName.equalsIgnoreCase("rowtime")
    +          && !orderName.equalsIgnoreCase("proctime")) {
    +          throw ValidationException(
    +            s"OrderBy expression must be ['rowtime] or ['proctime], but got ['${orderName}]")
    +        }
    +      }
    +    }
    +
    +    if (overWindows.size != 1) {
    +      throw TableException("OverWindow only supported single window at current time.")
    +    }
    +
    +    overWindows.foreach { overWindow =>
    +      if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
    --- End diff --
    
    This check should be done in `OverCall.validateInput()` as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112605911
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---
    @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser}
     import org.apache.flink.table.plan.logical._
     
     /**
    +  * An over window specification.
    +  *
    +  * Over window is similar to the traditional OVER SQL.
    +  */
    +class OverWindow {
    +
    +  private[flink] var alias: Expression = _
    +  private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
    +  private[flink] var orderBy: Expression = _
    +  private[flink] var preceding: Expression = _
    +  private[flink] var following: Expression = null
    +
    +  /**
    +    * Assigns an alias for this window that the following `select()` clause can refer to.
    +    *
    +    * @param alias alias for this over window
    +    * @return this over window
    +    */
    +  def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
    +
    +  /**
    +    * Assigns an alias for this window that the following `select()` clause can refer to.
    +    *
    +    * @param alias alias for this over window
    +    * @return this over window
    +    */
    +  def as(alias: Expression): OverWindow = {
    +    this.alias = alias
    +    this
    +  }
    +
    +  /**
    +    * Partitions the elements on some partition keys.
    +    *
    +    * @param partitionBy
    +    * @return this over window
    +    */
    +  def partitionBy(partitionBy: String): OverWindow = {
    +    this.partitionBy(ExpressionParser.parseExpression(partitionBy))
    +  }
    +
    +  /**
    +    * Partitions the elements on some partition keys.
    +    *
    +    * @param partitionBy
    +    * @return this over window
    +    */
    +  def partitionBy(partitionBy: Expression*): OverWindow = {
    +    this.partitionBy = partitionBy
    +    this
    +  }
    +
    +
    +  /**
    +    * Specifies the time mode.
    +    *
    +    * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
    +    *                to specify time mode.
    +    * @return this over window
    +    */
    +  def orderBy(orderBy: String): OverWindow = {
    +    this.orderBy(ExpressionParser.parseExpression(orderBy))
    +  }
    +
    +  /**
    +    * Specifies the time mode.
    +    *
    +    * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
    +    *                to specify time mode.
    +    * @return this over window
    +    */
    +  def orderBy(orderBy: Expression): OverWindow = {
    +    this.orderBy = orderBy
    +    this
    +  }
    +
    +  /**
    +    * Set the preceding offset (based on time or row-count intervals) for over window
    +    *
    +    * @param preceding forward offset that relative to the current row
    +    * @return this over window
    +    */
    +  def preceding(preceding: String): OverWindow = {
    +    this.preceding(ExpressionParser.parseExpression(preceding))
    +  }
    +
    +  /**
    +    * Set the preceding offset (based on time or row-count intervals) for over window
    +    *
    +    * @param preceding forward offset that relative to the current row
    +    * @return this over window
    +    */
    +  def preceding(preceding: Expression): OverWindow = {
    +    this.preceding = preceding
    +    this
    +  }
    +
    +  /**
    +    * Set the following offset (based on time or row-count intervals) for over window
    +    *
    +    * @param following subsequent offset that relative to the current row
    +    * @return this over window
    +    */
    +  def following(following: String): OverWindow = {
    --- End diff --
    
    That's good idea. +1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112570230
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -93,28 +96,43 @@ class DataStreamOverAggregate(
     
         val orderKeys = overWindow.orderKeys.getFieldCollations
     
    -    if (orderKeys.size() != 1) {
    -      throw new TableException(
    -        "Unsupported use of OVER windows. The window can only be ordered by a single time column.")
    -    }
    -    val orderKey = orderKeys.get(0)
    +    val timeType = if (!orderKeys.isEmpty) {
    +      if (orderKeys.size() != 1) {
    +        throw new TableException(
    +          "Unsupported use of OVER windows. The window can only be ordered by a single time " +
    +            "column.")
    +      }
    +      val orderKey = orderKeys.get(0)
     
    -    if (!orderKey.direction.equals(ASCENDING)) {
    -      throw new TableException(
    -        "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
    +      if (!orderKey.direction.equals(ASCENDING)) {
    +        throw new TableException(
    +          "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
    +      }
    +      inputType
    +        .getFieldList
    +        .get(orderKey.getFieldIndex)
    +        .getValue.asInstanceOf[TimeModeType]
    +    } else {
    --- End diff --
    
    this could be removed if we don't use literals for the orderBy() field


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112559640
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
    +        // for stream
    +        relBuilder.literal(orderName)
    --- End diff --
    
    can we use `relBuilder.call(EventTimeExtractor)` and `relBuilder.call(ProcTimeExtractor)` here to make the logic identical to SQL?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112499806
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -93,28 +96,43 @@ class DataStreamOverAggregate(
     
         val orderKeys = overWindow.orderKeys.getFieldCollations
     
    -    if (orderKeys.size() != 1) {
    -      throw new TableException(
    -        "Unsupported use of OVER windows. The window can only be ordered by a single time column.")
    -    }
    -    val orderKey = orderKeys.get(0)
    +    val timeType = if (!orderKeys.isEmpty) {
    +      if (orderKeys.size() != 1) {
    +        throw new TableException(
    +          "Unsupported use of OVER windows. The window can only be ordered by a single time " +
    +            "column.")
    +      }
    +      val orderKey = orderKeys.get(0)
     
    -    if (!orderKey.direction.equals(ASCENDING)) {
    -      throw new TableException(
    -        "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
    +      if (!orderKey.direction.equals(ASCENDING)) {
    +        throw new TableException(
    +          "Unsupported use of OVER windows. The window can only be ordered in ASCENDING mode.")
    +      }
    +      inputType
    +        .getFieldList
    +        .get(orderKey.getFieldIndex)
    +        .getValue.asInstanceOf[TimeModeType]
    +    } else {
    +      val it = logicWindow.constants.listIterator()
    +      if (it.hasNext) {
    +        val item = it.next().getValue
    +        if (item.isInstanceOf[NlsString]) {
    +          val value = item.asInstanceOf[NlsString].getValue
    +          if (value.equalsIgnoreCase("rowtime")) {
    +            new RowTimeType
    +          } else {
    +            new ProcTimeType
    +          }
    +        }
    +      }
         }
     
         val inputDS = input.asInstanceOf[DataStreamRel].translateToPlan(tableEnv)
     
         val generator = new CodeGenerator(
    -      tableEnv.getConfig,
    -      false,
    -      inputDS.getType)
    -
    -    val timeType = inputType
    -      .getFieldList
    -      .get(orderKey.getFieldIndex)
    -      .getValue
    +    tableEnv.getConfig,
    --- End diff --
    
    indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112555311
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala ---
    @@ -83,3 +83,49 @@ object Session {
         */
       def withGap(gap: Expression): SessionWindow = new SessionWindow(gap)
     }
    +
    +/**
    +  * Helper object for creating a over window.
    +  */
    +object Over {
    --- End diff --
    
    I think we need an equivalent class for the Java Table API, similar as `org.apache.flink.table.api.java.groupWindows.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112654550
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.getSqlAggFunction()
    +
    +    val relDataType = relBuilder
    +      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +      .createTypeFromTypeInfo(agg.resultType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime")) {
    +        // for stream event-time
    +        relBuilder.call(EventTimeExtractor)
    +      }
    +      else if (orderName.equalsIgnoreCase("proctime")) {
    +        // for stream proc-time
    +        relBuilder.call(ProcTimeExtractor)
    +      } else {
    +        // for batch event-time
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode, sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach {
    +      x =>
    +        val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
    +        if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
    +          throw ValidationException(
    +            s"expression $partitionKey cannot be used as a partition key expression " +
    +            "because it's not a valid key type which must be hashable and comparable")
    +        }
    +        partitionKeys.add(partitionKey)
    +    }
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    bound: Literal,
    +    sqlKind: SqlKind): RexWindowBound = {
    +
    +    if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW) {
    +      val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
    +      create(unbounded, null)
    +    } else if (bound == CURRENT_RANGE || bound == CURRENT_ROW) {
    +      val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
    +      create(currentRow, null)
    +    } else {
    +      val returnType = relBuilder
    +        .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +        .createTypeFromTypeInfo(Types.DECIMAL)
    +
    +      val sqlOperator = new SqlPostfixOperator(
    +        sqlKind.name,
    +        sqlKind,
    +        2,
    +        new OrdinalReturnTypeInference(0),
    +        null,
    +        null)
    +
    +      val operands: Array[SqlNode] = new Array[SqlNode](1)
    +      operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
    +
    +      val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
    +
    +      val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +      expressions.add(relBuilder.literal(bound.value))
    +
    +      val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
    +
    +      create(node, rexNode)
    +    }
    +  }
    +
    +  override private[flink] def children: Seq[Expression] = Seq(agg)
    +
    +  override def toString = s"${this.getClass.getCanonicalName}(${overWindowAlias.toString})"
    +
    +  override private[flink] def resultType = agg.resultType
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    var validationResult: ValidationResult = ValidationSuccess
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +    if (!orderName.equalsIgnoreCase("rowtime")
    +      && !orderName.equalsIgnoreCase("proctime")) {
    +      ValidationFailure(
    +        s"OrderBy expression must be ['rowtime] or ['proctime], but got ['${orderName}]")
    +    }
    +    if (!overWindow.preceding.asInstanceOf[Literal].resultType.getClass
    --- End diff --
    
    We need to check if `preceding` and `following` are of type `Literal` first.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112456197
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -810,6 +810,47 @@ class Table(
         new WindowedTable(this, window)
       }
     
    +  /**
    +    * Groups the records of a table by assigning them to windows defined by a time or row interval.
    +    *
    +    * For streaming tables of infinite size, grouping into windows is required to define finite
    +    * groups on which over-based aggregates can be computed.
    +    *
    +    * Over window for batch tables are currently not supported.
    +    *
    +    * @param overWindows windows that specifies how elements are grouped.
    +    * @return Over windowed table
    +    */
    +  def window(overWindows: OverWindow*): OverWindowedTable = {
    +
    +    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
    +      throw TableException("Over window for batch tables are currently not supported.")
    +    } else {
    +      overWindows.foreach { overWindow =>
    +        val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    --- End diff --
    
    The other operators are validated later. Can you add this check to This check to `OverCall.validateInput()`?
    
    Please add tests to validate that the checks work.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112571486
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---
    @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser}
     import org.apache.flink.table.plan.logical._
     
     /**
    +  * An over window specification.
    +  *
    +  * Over window is similar to the traditional OVER SQL.
    +  */
    +class OverWindow {
    +
    +  private[flink] var alias: Expression = _
    +  private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
    +  private[flink] var orderBy: Expression = _
    +  private[flink] var preceding: Expression = _
    +  private[flink] var following: Expression = null
    +
    +  /**
    +    * Assigns an alias for this window that the following `select()` clause can refer to.
    +    *
    +    * @param alias alias for this over window
    +    * @return this over window
    +    */
    +  def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
    +
    +  /**
    +    * Assigns an alias for this window that the following `select()` clause can refer to.
    +    *
    +    * @param alias alias for this over window
    +    * @return this over window
    +    */
    +  def as(alias: Expression): OverWindow = {
    +    this.alias = alias
    +    this
    +  }
    +
    +  /**
    +    * Partitions the elements on some partition keys.
    +    *
    +    * @param partitionBy
    +    * @return this over window
    +    */
    +  def partitionBy(partitionBy: String): OverWindow = {
    +    this.partitionBy(ExpressionParser.parseExpression(partitionBy))
    +  }
    +
    +  /**
    +    * Partitions the elements on some partition keys.
    +    *
    +    * @param partitionBy
    +    * @return this over window
    +    */
    +  def partitionBy(partitionBy: Expression*): OverWindow = {
    +    this.partitionBy = partitionBy
    +    this
    +  }
    +
    +
    +  /**
    +    * Specifies the time mode.
    +    *
    +    * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
    +    *                to specify time mode.
    +    * @return this over window
    +    */
    +  def orderBy(orderBy: String): OverWindow = {
    +    this.orderBy(ExpressionParser.parseExpression(orderBy))
    +  }
    +
    +  /**
    +    * Specifies the time mode.
    +    *
    +    * @param orderBy For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]]
    +    *                to specify time mode.
    +    * @return this over window
    +    */
    +  def orderBy(orderBy: Expression): OverWindow = {
    +    this.orderBy = orderBy
    +    this
    +  }
    +
    +  /**
    +    * Set the preceding offset (based on time or row-count intervals) for over window
    +    *
    +    * @param preceding forward offset that relative to the current row
    +    * @return this over window
    +    */
    +  def preceding(preceding: String): OverWindow = {
    +    this.preceding(ExpressionParser.parseExpression(preceding))
    +  }
    +
    +  /**
    +    * Set the preceding offset (based on time or row-count intervals) for over window
    +    *
    +    * @param preceding forward offset that relative to the current row
    +    * @return this over window
    +    */
    +  def preceding(preceding: Expression): OverWindow = {
    +    this.preceding = preceding
    +    this
    +  }
    +
    +  /**
    +    * Set the following offset (based on time or row-count intervals) for over window
    +    *
    +    * @param following subsequent offset that relative to the current row
    +    * @return this over window
    +    */
    +  def following(following: String): OverWindow = {
    --- End diff --
    
    Should we make `following` optional and default to CURRENT_ROW / CURRENT_RANGE depending on the type of `preceding`? I think that would be a nice shortcut and also be aligned with SQL.
    
    What do you think @sunjincheng121 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112456740
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -928,6 +969,27 @@ class WindowedTable(
     
     }
     
    +class OverWindowedTable(
    +    private[flink] val table: Table,
    +    private[flink] val overWindows: OverWindow*) {
    --- End diff --
    
    Please use `Array` instead of varargs for internal methods & classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112635892
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    --- End diff --
    
    If we remove the `as` method from `OverCall`, it will be handled as a regular`Alias` expression which wraps the `OverCall`. 
    Since the `OverCall` is wrapped, the `ProjectTranslator.translateOverWindow()` method needs another `case Alias(OverCall(agg, alias, _), aggAlias, _)` when inserting the OverWindow into the call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112628642
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
                 case expr if !expr.valid => u
                 case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
                 case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
    +            case over: OverCall if null != over.aggAlias =>
    --- End diff --
    
    If i understand you correctly, you want add alias for AGG,e.g.:
    ` .select('c, 'b.count over 'w as 'mycount) -->  .select('c, 'b.count  as 'mycount over 'w)`
     but for SQL we add alias to OVER, e.g.:
    `SELECT c, count(a) OVER (PARTITION BY c ORDER BY ProcTime()  RANGE UNBOUNDED preceding) as cnt1 from T1`
    So  I want to be consistent with SQL\uff0cWhat do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112636908
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream.table
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.source.SourceFunction
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +
    +class OverWindowITCase extends StreamingWithStateTestBase {
    +
    +  @Test
    +  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
    +
    +    val data = List(
    +      (1L, 1, "Hello"),
    +      (2L, 2, "Hello"),
    +      (3L, 3, "Hello"),
    +      (4L, 4, "Hello"),
    +      (5L, 5, "Hello"),
    +      (6L, 6, "Hello"),
    +      (7L, 7, "Hello World"),
    +      (8L, 8, "Hello World"),
    +      (20L, 20, "Hello World"))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'a, 'b, 'c)
    +
    +    val windowedTable = table
    +      .window(
    +        Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w)
    +      .select('c, 'b.count over 'w as 'mycount)
    +      .select('c, 'mycount)
    +
    +    val results = windowedTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq(
    +      "Hello World,1", "Hello World,2", "Hello World,3",
    +      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    env.setParallelism(1)
    +
    +    val data = Seq(
    +      Left(14000005L, (1, 1L, "Hi")),
    +      Left(14000000L, (2, 1L, "Hello")),
    +      Left(14000002L, (1, 1L, "Hello")),
    +      Left(14000002L, (1, 2L, "Hello")),
    +      Left(14000002L, (1, 3L, "Hello world")),
    +      Left(14000003L, (2, 2L, "Hello world")),
    +      Left(14000003L, (2, 3L, "Hello world")),
    +      Right(14000020L),
    +      Left(14000021L, (1, 4L, "Hello world")),
    +      Left(14000022L, (1, 5L, "Hello world")),
    +      Left(14000022L, (1, 6L, "Hello world")),
    +      Left(14000022L, (1, 7L, "Hello world")),
    +      Left(14000023L, (2, 4L, "Hello world")),
    +      Left(14000023L, (2, 5L, "Hello world")),
    +      Right(14000030L)
    +    )
    +    val table = env
    +      .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
    +      .toTable(tEnv).as('a, 'b, 'c)
    +
    +    val windowedTable = table
    +      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following
    +         CURRENT_RANGE as 'w)
    +      .select(
    +        'a, 'b, 'c,
    +        'b.sum over 'w,
    +        'b.count over 'w,
    +        'b.avg over 'w,
    +        'b.max over 'w,
    +        'b.min over 'w)
    +
    +    val result = windowedTable.toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,1,Hello,6,3,2,3,1",
    +      "1,2,Hello,6,3,2,3,1",
    +      "1,3,Hello world,6,3,2,3,1",
    +      "1,1,Hi,7,4,1,3,1",
    +      "2,1,Hello,1,1,1,1,1",
    +      "2,2,Hello world,6,3,2,3,1",
    +      "2,3,Hello world,6,3,2,3,1",
    +      "1,4,Hello world,11,5,2,4,1",
    +      "1,5,Hello world,29,8,3,7,1",
    +      "1,6,Hello world,29,8,3,7,1",
    +      "1,7,Hello world,29,8,3,7,1",
    +      "2,4,Hello world,15,5,3,5,1",
    +      "2,5,Hello world,15,5,3,5,1"
    +    )
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testProcTimeBoundedPartitionedRangeOver(): Unit = {
    +
    +    val data = List(
    +      (1, 1L, 0, "Hallo", 1L),
    +      (2, 2L, 1, "Hallo Welt", 2L),
    +      (2, 3L, 2, "Hallo Welt wie", 1L),
    +      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
    +      (3, 5L, 4, "ABC", 2L),
    +      (3, 6L, 5, "BCD", 3L),
    +      (4, 7L, 6, "CDE", 2L),
    +      (4, 8L, 7, "DEF", 1L),
    +      (4, 9L, 8, "EFG", 1L),
    +      (4, 10L, 9, "FGH", 2L),
    +      (5, 11L, 10, "GHI", 1L),
    +      (5, 12L, 11, "HIJ", 3L),
    +      (5, 13L, 12, "IJK", 3L),
    +      (5, 14L, 13, "JKL", 2L),
    +      (5, 15L, 14, "KLM", 2L))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
    +
    +    val windowedTable = table
    +      .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
    +      .select('a, 'c.sum over 'w, 'c.min over 'w)
    +    val result = windowedTable.toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,0,0",
    +      "2,1,1",
    +      "2,3,1",
    +      "3,3,3",
    +      "3,7,3",
    +      "3,12,3",
    +      "4,6,6",
    +      "4,13,6",
    +      "4,21,6",
    +      "4,30,6",
    +      "5,10,10",
    +      "5,21,10",
    +      "5,33,10",
    +      "5,46,10",
    +      "5,60,10")
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testRowTimeBoundedPartitionedRowOver(): Unit = {
    --- End diff --
    
    Fair enough. The `OverWindowTest` should be sufficient.
    However, I would keep a few integration tests for the Table API as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3743: [FLINK-6228][table] Integrating the OVER windows in the T...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3743
  
    Hi @fhueske, Thanks for everything you did. I have merged the changes, and I have moved validations of `partitionBy` into `validateInput` method and added two tests about partitionBy:
    * TestPartitionByWithUnresolved
    * TestPartitionByWithNotKeyType
    
    Thanks,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112495783
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
                 case expr if !expr.valid => u
                 case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
                 case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
    +            case over: OverCall if null != over.aggAlias =>
    --- End diff --
    
    We don't need this if we handle the aggregation alias as regular `Alias` expression


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112653396
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.getSqlAggFunction()
    +
    +    val relDataType = relBuilder
    +      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +      .createTypeFromTypeInfo(agg.resultType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime")) {
    +        // for stream event-time
    +        relBuilder.call(EventTimeExtractor)
    +      }
    +      else if (orderName.equalsIgnoreCase("proctime")) {
    +        // for stream proc-time
    +        relBuilder.call(ProcTimeExtractor)
    +      } else {
    +        // for batch event-time
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode, sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach {
    +      x =>
    +        val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
    +        if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
    --- End diff --
    
    move this check to `validateInput()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112495160
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    --- End diff --
    
    The proper translation can be done in `ProjectTranslator.translateOverWindow()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112469838
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala ---
    @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression {
         * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
         */
       private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
    +
    +  /**
    +    * Because SqlAggFunction from Calcite's AggCallImpl is invisible,
    +    * we have to manually create sqlAggFunction in flink code base.
    +    *
    +    */
    +  private[flink] def toSqlAggFunction()(implicit relBuilder: RelBuilder): SqlAggFunction
    +
    +  /**
    +    * Attach the Resolved Child to aggregation
    +    */
    +  private[flink] def withResolvedChild(child: Expression): Aggregation
    --- End diff --
    
    remove the `withResolvedChild()` method as we validate the aggregation argument with the default validation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112652207
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.getSqlAggFunction()
    +
    +    val relDataType = relBuilder
    +      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +      .createTypeFromTypeInfo(agg.resultType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime")) {
    --- End diff --
    
    We cannot properly distinguish batch and streaming here.
    If a batch table has a column `rowtime`, this will fail.
    
    We could replace the `UnresolvedFieldReference` for `rowtime` / `proctime` by  `RowTime` / `ProcTime` `TimeIndicator` expression in `ProjectionTranslator.translateOverWindow()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112476982
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ---
    @@ -364,6 +365,21 @@ trait ImplicitExpressionOperations {
       def position(haystack: Expression) = Position(expr, haystack)
     
       /**
    +    * For windowing function to config over window
    +    * e.g.:
    +    * table
    +    *   .window(Over partitionBy 'c orderBy 'rowtime preceding 2.rows following CURRENT_ROW as 'w)
    +    *   .select('c, 'a, 'a.count over 'w, 'a.sum over 'w)
    +    */
    +  def over(alias: Expression) = {
    +    expr match {
    +      case _: Aggregation => new OverCall(expr.asInstanceOf[Aggregation], null, alias)
    --- End diff --
    
    rm `new` (case classes should be instantiated with the `apply()` method, i.e., without `new`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112636652
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -54,6 +54,9 @@ case class Project(projectList: Seq[NamedExpression], child: LogicalNode) extend
                 case expr if !expr.valid => u
                 case c @ Cast(ne: NamedExpression, tp) => Alias(c, s"${ne.name}-$tp")
                 case gcf: GetCompositeField => Alias(gcf, gcf.aliasName().getOrElse(s"_c$i"))
    +            case over: OverCall if null != over.aggAlias =>
    --- End diff --
    
    Currently `.select('c, 'b.count over 'w as 'mycount)` would be translated into `OverCall(Count(b), w, mycount, _)`. If we remove `as` from `OverCall`, it will be wrapped in a regular `Alias` expression: `Alias(OverCall(Count(b), w, _), mycount)`.
    
    The syntax would be the same, but the internal representation would use the existing `Alias` expression, just like any other expression that is renamed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112501222
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/SqlTypeUtils.scala ---
    @@ -0,0 +1,47 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.typeutils
    +
    +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory}
    +import org.apache.calcite.sql.`type`.SqlTypeName
    +import org.apache.flink.api.common.typeinfo.TypeInformation
    +import org.apache.flink.table.api.Types._
    +import org.apache.flink.table.api.TableException
    +
    +object SqlTypeUtils {
    --- End diff --
    
    Util class can be removed. We can create a `RelDataType` from a `TypeInformation` as follows:
    
    ```
    relBuilder.getTypeFactory.asInstanceOf[FlinkTypeFactory].createTypeFromTypeInfo(typeInfo)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112650081
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala ---
    @@ -82,3 +84,32 @@ object Session {
         */
       def withGap(gap: String): SessionWindow = new SessionWindow(gap)
     }
    +
    +/**
    +  * Helper object for creating a over window.
    +  */
    +object Over {
    +
    +  /**
    +    * Specifies the time attribute on which rows are grouped.
    +    *
    +    * For streaming tables call [[orderBy 'rowtime or orderBy 'proctime]] to specify time mode.
    +    *
    +    * For batch tables, refer to a timestamp or long attribute.
    +    */
    +  def orderBy(orderBy: Expression): OverWindowPredefined = {
    --- End diff --
    
    The Java API is based on `String` not `Expression`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3743: [FLINK-6228][table] Integrating the OVER windows in the T...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3743
  
    Hi @sunjincheng121,
    
    thanks for the update! I was working on some refactoring of this PR as well to get the validation logic clean, but I think your approach is much nicer. 
    I'll will rebase my changes on top of yours and will open a PR against your PR branch soon.
    
    Cheers, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112567381
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
    +        // for stream
    +        relBuilder.literal(orderName)
    +      } else {
    +        // for batch
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode,sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach(x=>
    +      partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    precedingValue: Long,
    --- End diff --
    
    `precedingValue` -> `value`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112460318
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---
    @@ -216,6 +216,26 @@ object ProjectionTranslator {
                 projectList += unresolved
             }
     
    +      case OverCall(agg, aggAlias, alias, _) =>
    +        val overWindow = overWindows.find(_.alias.equals(alias))
    +        val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
    +        val childrenOutput = parent.output
    --- End diff --
    
    `childOutput` or `parentOutput`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112560976
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
    +        // for stream
    +        relBuilder.literal(orderName)
    --- End diff --
    
    If we do this, we could remove the special literal handling in `OverAggregate` and `DataStreamOverAggregate`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112471307
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---
    @@ -216,6 +216,26 @@ object ProjectionTranslator {
                 projectList += unresolved
             }
     
    +      case OverCall(agg, aggAlias, alias, _) =>
    +        val overWindow = overWindows.find(_.alias.equals(alias))
    +        val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
    +        val childrenOutput = parent.output
    +        val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
    +
    +        val resolvedFieldReference = if (candidates.length > 1) {
    --- End diff --
    
    The aggregations are automatically resolved and validated if we add `agg` to the children of `OverCall`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112570917
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream.table
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.source.SourceFunction
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +
    +class OverWindowITCase extends StreamingWithStateTestBase {
    +
    +  @Test
    +  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
    +
    +    val data = List(
    +      (1L, 1, "Hello"),
    +      (2L, 2, "Hello"),
    +      (3L, 3, "Hello"),
    +      (4L, 4, "Hello"),
    +      (5L, 5, "Hello"),
    +      (6L, 6, "Hello"),
    +      (7L, 7, "Hello World"),
    +      (8L, 8, "Hello World"),
    +      (20L, 20, "Hello World"))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'a, 'b, 'c)
    +
    +    val windowedTable = table
    +      .window(
    +        Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w)
    +      .select('c, 'b.count over 'w as 'mycount)
    +      .select('c, 'mycount)
    +
    +    val results = windowedTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq(
    +      "Hello World,1", "Hello World,2", "Hello World,3",
    +      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    env.setParallelism(1)
    +
    +    val data = Seq(
    +      Left(14000005L, (1, 1L, "Hi")),
    +      Left(14000000L, (2, 1L, "Hello")),
    +      Left(14000002L, (1, 1L, "Hello")),
    +      Left(14000002L, (1, 2L, "Hello")),
    +      Left(14000002L, (1, 3L, "Hello world")),
    +      Left(14000003L, (2, 2L, "Hello world")),
    +      Left(14000003L, (2, 3L, "Hello world")),
    +      Right(14000020L),
    +      Left(14000021L, (1, 4L, "Hello world")),
    +      Left(14000022L, (1, 5L, "Hello world")),
    +      Left(14000022L, (1, 6L, "Hello world")),
    +      Left(14000022L, (1, 7L, "Hello world")),
    +      Left(14000023L, (2, 4L, "Hello world")),
    +      Left(14000023L, (2, 5L, "Hello world")),
    +      Right(14000030L)
    +    )
    +    val table = env
    +      .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
    +      .toTable(tEnv).as('a, 'b, 'c)
    +
    +    val windowedTable = table
    +      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following
    +         CURRENT_RANGE as 'w)
    +      .select(
    +        'a, 'b, 'c,
    +        'b.sum over 'w,
    +        'b.count over 'w,
    +        'b.avg over 'w,
    +        'b.max over 'w,
    +        'b.min over 'w)
    +
    +    val result = windowedTable.toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,1,Hello,6,3,2,3,1",
    +      "1,2,Hello,6,3,2,3,1",
    +      "1,3,Hello world,6,3,2,3,1",
    +      "1,1,Hi,7,4,1,3,1",
    +      "2,1,Hello,1,1,1,1,1",
    +      "2,2,Hello world,6,3,2,3,1",
    +      "2,3,Hello world,6,3,2,3,1",
    +      "1,4,Hello world,11,5,2,4,1",
    +      "1,5,Hello world,29,8,3,7,1",
    +      "1,6,Hello world,29,8,3,7,1",
    +      "1,7,Hello world,29,8,3,7,1",
    +      "2,4,Hello world,15,5,3,5,1",
    +      "2,5,Hello world,15,5,3,5,1"
    +    )
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testProcTimeBoundedPartitionedRangeOver(): Unit = {
    +
    +    val data = List(
    +      (1, 1L, 0, "Hallo", 1L),
    +      (2, 2L, 1, "Hallo Welt", 2L),
    +      (2, 3L, 2, "Hallo Welt wie", 1L),
    +      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
    +      (3, 5L, 4, "ABC", 2L),
    +      (3, 6L, 5, "BCD", 3L),
    +      (4, 7L, 6, "CDE", 2L),
    +      (4, 8L, 7, "DEF", 1L),
    +      (4, 9L, 8, "EFG", 1L),
    +      (4, 10L, 9, "FGH", 2L),
    +      (5, 11L, 10, "GHI", 1L),
    +      (5, 12L, 11, "HIJ", 3L),
    +      (5, 13L, 12, "IJK", 3L),
    +      (5, 14L, 13, "JKL", 2L),
    +      (5, 15L, 14, "KLM", 2L))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
    +
    +    val windowedTable = table
    +      .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
    +      .select('a, 'c.sum over 'w, 'c.min over 'w)
    +    val result = windowedTable.toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,0,0",
    +      "2,1,1",
    +      "2,3,1",
    +      "3,3,3",
    +      "3,7,3",
    +      "3,12,3",
    +      "4,6,6",
    +      "4,13,6",
    +      "4,21,6",
    +      "4,30,6",
    +      "5,10,10",
    +      "5,21,10",
    +      "5,33,10",
    +      "5,46,10",
    +      "5,60,10")
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testRowTimeBoundedPartitionedRowOver(): Unit = {
    --- End diff --
    
    Add a test for rowtime bounded range as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112564205
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    --- End diff --
    
    We also need to validate other properties of the overWindow:
    - check that the partitionBy expressions are valid fields in the input. 
    - If a partitionBy expression is not a field, we would need to push the expression into a Project before the Project with the overWindow and reference the new field. This would need to happen in `Table.window()`. I think for now we can have the restriction that only field references are allowed expressions.
    - validate the `preceding` and `following` are literals @


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112461145
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---
    @@ -216,6 +216,26 @@ object ProjectionTranslator {
                 projectList += unresolved
             }
     
    +      case OverCall(agg, aggAlias, alias, _) =>
    +        val overWindow = overWindows.find(_.alias.equals(alias))
    +        val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
    +        val childrenOutput = parent.output
    +        val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
    +
    +        val resolvedFieldReference = if (candidates.length > 1) {
    +          throw new TableException(s"Reference $aggName is ambiguous.")
    +        } else if (candidates.isEmpty) {
    +          throw new TableException(s"Can not resolve [$aggName].")
    +        } else {
    +          Some(candidates.head.withName(aggName))
    +        }
    +
    +        projectList += new OverCall(
    --- End diff --
    
    remove `new`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112570409
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/OverAggregate.scala ---
    @@ -23,7 +23,9 @@ import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFieldImpl}
     import org.apache.calcite.rel.core.AggregateCall
     import org.apache.calcite.rel.core.Window.Group
     import org.apache.calcite.rel.core.Window
    -import org.apache.calcite.rex.{RexInputRef}
    +import org.apache.calcite.rex.{RexInputRef, RexLiteral}
    --- End diff --
    
    All these changes could be undone if we don't use a literal for the orderBy() expression


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112469357
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
    +        // for stream
    +        relBuilder.literal(orderName)
    +      } else {
    +        // for batch
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode,sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach(x=>
    +      partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    precedingValue: Long,
    +    sqlKind: SqlKind): RexWindowBound = {
    +
    +    if (precedingValue == Long.MaxValue) {
    +      val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
    +      create(unbounded, null)
    +    } else if (precedingValue == 0L) {
    +      val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
    +      create(currentRow, null)
    +    } else {
    +
    +      val returnType = new BasicSqlType(
    +        relBuilder.getTypeFactory.getTypeSystem,
    +        SqlTypeName.DECIMAL)
    +
    +      val sqlOperator = new SqlPostfixOperator(
    +        sqlKind.name,
    +        sqlKind,
    +        2,
    +        new OrdinalReturnTypeInference(0),
    +        null,
    +        null)
    +
    +      val operands: Array[SqlNode] = new Array[SqlNode](1)
    +      operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
    +
    +      val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
    +
    +      val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +      expressions.add(relBuilder.literal(precedingValue))
    +
    +      val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
    +
    +      create(node, rexNode)
    +    }
    +  }
    +
    +  override private[flink] def children: Seq[Expression] = Seq()
    --- End diff --
    
    change to `Seq(agg)` to automatically validate the aggregation call and its arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112628964
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    --- End diff --
    
    Please, tell me more about this. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112460885
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---
    @@ -216,6 +216,26 @@ object ProjectionTranslator {
                 projectList += unresolved
             }
     
    +      case OverCall(agg, aggAlias, alias, _) =>
    +        val overWindow = overWindows.find(_.alias.equals(alias))
    +        val aggName = agg.child.asInstanceOf[UnresolvedFieldReference].name
    +        val childrenOutput = parent.output
    +        val candidates = childrenOutput.filter(_.name.equalsIgnoreCase(aggName))
    +
    +        val resolvedFieldReference = if (candidates.length > 1) {
    --- End diff --
    
    the other expressions are resolved later. We should follow this pattern to the the code base consistent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112633124
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    --- End diff --
    
    Hm, not sure. Would need to look into this further. But, IMO it is very important to validate that the `partitionBy` fields exist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112494372
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    --- End diff --
    
    The `aggAlias` should be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112620324
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    --- End diff --
    
    Can we check `partitionBy` as following(In `toRexNode `method):
    ```
    overWindow.partitionBy.foreach {
          x =>
            val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
            if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
              ValidationException(
                s"expression $partitionKey cannot be used as a partition key expression " +
                "because it's not a valid key type which must be hashable and comparable")
            }
    ```
    Because It's not work(can not resolve the partition key field) when I add `partitionBy` into children, and add check logic into `validateInput` method.
    `def children: Seq[Expression] = Seq(agg) ++ overWindow.partitionBy`
    Is there something wrong in the code ?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112482490
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala ---
    @@ -22,6 +22,126 @@ import org.apache.flink.table.expressions.{Expression, ExpressionParser}
     import org.apache.flink.table.plan.logical._
     
     /**
    +  * An over window specification.
    +  *
    +  * Over window is similar to the traditional OVER SQL.
    +  */
    +class OverWindow {
    +
    +  private[flink] var alias: Expression = _
    +  private[flink] var partitionBy: Seq[Expression] = Seq[Expression]()
    +  private[flink] var orderBy: Expression = _
    +  private[flink] var preceding: Expression = _
    +  private[flink] var following: Expression = null
    +
    +  /**
    +    * Assigns an alias for this window that the following `select()` clause can refer to.
    +    *
    +    * @param alias alias for this over window
    +    * @return this over window
    +    */
    +  def as(alias: String): OverWindow = as(ExpressionParser.parseExpression(alias))
    +
    +  /**
    +    * Assigns an alias for this window that the following `select()` clause can refer to.
    +    *
    +    * @param alias alias for this over window
    +    * @return this over window
    +    */
    +  def as(alias: Expression): OverWindow = {
    --- End diff --
    
    I think it would be good if we could enforce the alias. 
    This could be done by first build a window which is not of type `OverWindow` and letting `as()` return the complete `OverWindow`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112473915
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -810,6 +810,47 @@ class Table(
         new WindowedTable(this, window)
       }
     
    +  /**
    +    * Groups the records of a table by assigning them to windows defined by a time or row interval.
    +    *
    +    * For streaming tables of infinite size, grouping into windows is required to define finite
    +    * groups on which over-based aggregates can be computed.
    +    *
    +    * Over window for batch tables are currently not supported.
    +    *
    +    * @param overWindows windows that specifies how elements are grouped.
    +    * @return Over windowed table
    +    */
    +  def window(overWindows: OverWindow*): OverWindowedTable = {
    +
    +    if (tableEnv.isInstanceOf[BatchTableEnvironment]) {
    +      throw TableException("Over window for batch tables are currently not supported.")
    +    } else {
    +      overWindows.foreach { overWindow =>
    +        val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    --- End diff --
    
    This check should be done in `OverCall.validateInput()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112459590
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---
    @@ -190,8 +190,8 @@ object ProjectionTranslator {
       def expandProjectList(
    --- End diff --
    
    I would not change the `expandProjectList()` method which has a dedicated purpose.
    
    Instead, I would add a new method to `ProjectionTranslator`, which just translates the over windows:
    
    ```
    def translateOverWindows(
          exprs: Seq[Expression],
          overWindows: Array[OverWindow]): Seq[Expression]
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3743: [FLINK-6228][table] Integrating the OVER windows in the T...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3743
  
    Thanks for the update @sunjincheng121!
    
    Will merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112570095
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
    +        // for stream
    +        relBuilder.literal(orderName)
    +      } else {
    +        // for batch
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode,sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach(x=>
    +      partitionKeys.add(relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)))
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding.value.asInstanceOf[Long], SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following.value.asInstanceOf[Long], SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    precedingValue: Long,
    +    sqlKind: SqlKind): RexWindowBound = {
    +
    +    if (precedingValue == Long.MaxValue) {
    +      val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
    +      create(unbounded, null)
    +    } else if (precedingValue == 0L) {
    +      val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
    +      create(currentRow, null)
    +    } else {
    +
    +      val returnType = new BasicSqlType(
    +        relBuilder.getTypeFactory.getTypeSystem,
    +        SqlTypeName.DECIMAL)
    +
    +      val sqlOperator = new SqlPostfixOperator(
    +        sqlKind.name,
    +        sqlKind,
    +        2,
    +        new OrdinalReturnTypeInference(0),
    +        null,
    +        null)
    +
    +      val operands: Array[SqlNode] = new Array[SqlNode](1)
    +      operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
    +
    +      val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
    +
    +      val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +      expressions.add(relBuilder.literal(precedingValue))
    +
    +      val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
    +
    +      create(node, rexNode)
    +    }
    +  }
    +
    +  override private[flink] def children: Seq[Expression] = Seq()
    --- End diff --
    
    maybe we can check the partitionBy expressions here as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112488004
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/aggregations.scala ---
    @@ -35,6 +37,18 @@ abstract sealed class Aggregation extends UnaryExpression {
         * Convert Aggregate to its counterpart in Calcite, i.e. AggCall
         */
       private[flink] def toAggCall(name: String)(implicit relBuilder: RelBuilder): AggCall
    +
    +  /**
    +    * Because SqlAggFunction from Calcite's AggCallImpl is invisible,
    --- End diff --
    
    Thanks for the explanation for this method.
    Can you update the doc to `Returns the SqlAggFunction for this Aggregation` and change the method name to `getSqlAggFunction()`? The method does not convert the Aggregation because the returned SqlAggFunction does not contain the arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112655769
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---
    @@ -221,6 +221,23 @@ object ProjectionTranslator {
         projectList
       }
     
    +  def translateOverWindows(
    +      exprs: Seq[Expression],
    +      overWindows: Array[OverWindow]): Seq[Expression] = {
    --- End diff --
    
    Add `tableEnv` parameter to identify whether this is a batch or a streaming query. 
    In case of a streaming query, check that `overwindow.orderBy` is an `UnresolvedFieldReference` and translates `rowtime` to a `RowTime` expression and `proctime` to a `ProcTime` expression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112657301
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowTest.scala ---
    @@ -0,0 +1,528 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.api.scala.stream.table
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestUtil._
    +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
    +import org.junit.Test
    +
    +class OverWindowTest extends TableTestBase {
    --- End diff --
    
    I think we should add a few more tests here:
    - Add two tests without following for row and range
    - Check that orderBy field is correctly validated (only rowtime, proctime)
    - Check that partitionBy fields are correctly validated (existing fields in the input, no complex expressions)
    - Check that preceding and following are of same type
    - Check that preceding and following are literals
    - any other validation checks?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112557404
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    --- End diff --
    
    +space `val sets: util.HashSet`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112457606
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/ProjectionTranslator.scala ---
    @@ -190,8 +190,8 @@ object ProjectionTranslator {
       def expandProjectList(
           exprs: Seq[Expression],
           parent: LogicalNode,
    -      tableEnv: TableEnvironment)
    -    : Seq[Expression] = {
    +      tableEnv: TableEnvironment,
    +      overWindows: OverWindow*): Seq[Expression] = {
    --- End diff --
    
    Please use `Array` instead of varargs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112479103
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ---
    @@ -586,6 +602,13 @@ trait ImplicitExpressionOperations {
      * to [[ImplicitExpressionOperations]].
      */
     trait ImplicitExpressionConversions {
    +
    +  implicit val UNBOUNDED_ROW = toRowInterval(Long.MaxValue)
    +  implicit val UNBOUNDED_RANGE = toMilliInterval(1, Long.MaxValue)
    +
    +  implicit val CURRENT_ROW = toRowInterval(0L)
    +  implicit val CURRENT_RANGE = toMilliInterval(0L, Long.MaxValue)
    --- End diff --
    
    isn't a range interval of `0` a valid range to use? I would mean all rows that arrived in the same millisecond as the current row, no?
    Should we use `-1` as constant for current row instead?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3743


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112561580
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    +    this.aggAlias = aggAlias
    +    this
    +  }
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.toSqlAggFunction()
    +
    +    val aggReturnType: TypeInformation[_] = agg.resultType
    +
    +    val relDataType = SqlTypeUtils.createSqlType(relBuilder.getTypeFactory, aggReturnType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets:util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime") || orderName.equalsIgnoreCase("proctime")) {
    +        // for stream
    +        relBuilder.literal(orderName)
    +      } else {
    +        // for batch
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode,sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach(x=>
    --- End diff --
    
    +spacea `foreach( x =>`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112455555
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/table.scala ---
    @@ -810,6 +810,47 @@ class Table(
         new WindowedTable(this, window)
       }
     
    +  /**
    --- End diff --
    
    Extend docs to
    
    ```
      /**
        * Defines over-windows on the records of a table.
        *
        * An over-window defines for each record an interval of records over which aggregation 
        * functions can be computed.
        *
        * Example:
        *
        * {{{
        *   table
        *     .window(Over partitionBy 'c orderBy 'rowTime preceding 10.seconds as 'ow)
        *     .select('c, 'b.count over 'ow, 'e.sum over 'ow)
        * }}}
        *
        * __Note__: Computing over window aggregates on a streaming table is only a parallel operation
        * if the window is partititioned. Otherwise, the whole stream will be processed by a single
        * task, i.e., with parallelism 1.
        *
        * __Note__: Over-windows for batch tables are currently not supported.
        *
        * @param overWindows windows that specify the record interval over which aggregations are
        *                    computed.
        * @return An OverWindowedTable to specify the aggregations.
        */
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112485134
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    --- End diff --
    
    This class should override `checkInputs()` and check that the interval configuration is valid (unbounded - current row, preceding - current row), and the checks which are moved from `table.scala`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112654288
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +59,160 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = {
    +
    +    val rexBuilder = relBuilder.getRexBuilder
    +
    +    val operator: SqlAggFunction = agg.getSqlAggFunction()
    +
    +    val relDataType = relBuilder
    +      .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +      .createTypeFromTypeInfo(agg.resultType)
    +
    +    val aggExprs: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    val aggChildName = agg.child.asInstanceOf[ResolvedFieldReference].name
    +
    +    aggExprs.add(relBuilder.field(aggChildName))
    +
    +    val orderKeys: ImmutableList.Builder[RexFieldCollation] =
    +      new ImmutableList.Builder[RexFieldCollation]()
    +
    +    val sets: util.HashSet[SqlKind] = new util.HashSet[SqlKind]()
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +
    +    val rexNode =
    +      if (orderName.equalsIgnoreCase("rowtime")) {
    +        // for stream event-time
    +        relBuilder.call(EventTimeExtractor)
    +      }
    +      else if (orderName.equalsIgnoreCase("proctime")) {
    +        // for stream proc-time
    +        relBuilder.call(ProcTimeExtractor)
    +      } else {
    +        // for batch event-time
    +        relBuilder.field(orderName)
    +      }
    +
    +    orderKeys.add(new RexFieldCollation(rexNode, sets))
    +
    +    val partitionKeys: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +    overWindow.partitionBy.foreach {
    +      x =>
    +        val partitionKey = relBuilder.field(x.asInstanceOf[UnresolvedFieldReference].name)
    +        if (!FlinkTypeFactory.toTypeInfo(partitionKey.getType).isKeyType) {
    +          throw ValidationException(
    +            s"expression $partitionKey cannot be used as a partition key expression " +
    +            "because it's not a valid key type which must be hashable and comparable")
    +        }
    +        partitionKeys.add(partitionKey)
    +    }
    +
    +    val preceding = overWindow.preceding.asInstanceOf[Literal]
    +    val following = overWindow.following.asInstanceOf[Literal]
    +
    +    val isPhysical: Boolean = preceding.resultType.isInstanceOf[RowIntervalTypeInfo]
    +
    +    val lowerBound = createBound(relBuilder, preceding, SqlKind.PRECEDING)
    +    val upperBound = createBound(relBuilder, following, SqlKind.FOLLOWING)
    +
    +    rexBuilder.makeOver(
    +      relDataType,
    +      operator,
    +      aggExprs,
    +      partitionKeys,
    +      orderKeys.build,
    +      lowerBound,
    +      upperBound,
    +      isPhysical,
    +      true,
    +      false)
    +  }
    +
    +  private def createBound(
    +    relBuilder: RelBuilder,
    +    bound: Literal,
    +    sqlKind: SqlKind): RexWindowBound = {
    +
    +    if (bound == UNBOUNDED_RANGE || bound == UNBOUNDED_ROW) {
    +      val unbounded = SqlWindow.createUnboundedPreceding(SqlParserPos.ZERO)
    +      create(unbounded, null)
    +    } else if (bound == CURRENT_RANGE || bound == CURRENT_ROW) {
    +      val currentRow = SqlWindow.createCurrentRow(SqlParserPos.ZERO)
    +      create(currentRow, null)
    +    } else {
    +      val returnType = relBuilder
    +        .getTypeFactory.asInstanceOf[FlinkTypeFactory]
    +        .createTypeFromTypeInfo(Types.DECIMAL)
    +
    +      val sqlOperator = new SqlPostfixOperator(
    +        sqlKind.name,
    +        sqlKind,
    +        2,
    +        new OrdinalReturnTypeInference(0),
    +        null,
    +        null)
    +
    +      val operands: Array[SqlNode] = new Array[SqlNode](1)
    +      operands(0) = (SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO))
    +
    +      val node = new SqlBasicCall(sqlOperator, operands, SqlParserPos.ZERO)
    +
    +      val expressions: util.ArrayList[RexNode] = new util.ArrayList[RexNode]()
    +      expressions.add(relBuilder.literal(bound.value))
    +
    +      val rexNode = relBuilder.getRexBuilder.makeCall(returnType, sqlOperator, expressions)
    +
    +      create(node, rexNode)
    +    }
    +  }
    +
    +  override private[flink] def children: Seq[Expression] = Seq(agg)
    +
    +  override def toString = s"${this.getClass.getCanonicalName}(${overWindowAlias.toString})"
    +
    +  override private[flink] def resultType = agg.resultType
    +
    +  override private[flink] def validateInput(): ValidationResult = {
    +    var validationResult: ValidationResult = ValidationSuccess
    +    val orderName = overWindow.orderBy.asInstanceOf[UnresolvedFieldReference].name
    +    if (!orderName.equalsIgnoreCase("rowtime")
    --- End diff --
    
    If we replace the `UnresolvedFieldReference` by a `RowTIme` or `ProcTime` expression, we should check here if it is a `TimeIndicator` or a valid field reference. This is the same check that we need to do for the `partitionBy` fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112604758
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/OverWindowITCase.scala ---
    @@ -0,0 +1,265 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream.table
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.source.SourceFunction
    +import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.scala.stream.table.OverWindowITCase.{RowTimeSourceFunction}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +
    +import scala.collection.mutable
    +
    +class OverWindowITCase extends StreamingWithStateTestBase {
    +
    +  @Test
    +  def testProcTimeUnBoundedPartitionedRowOver(): Unit = {
    +
    +    val data = List(
    +      (1L, 1, "Hello"),
    +      (2L, 2, "Hello"),
    +      (3L, 3, "Hello"),
    +      (4L, 4, "Hello"),
    +      (5L, 5, "Hello"),
    +      (6L, 6, "Hello"),
    +      (7L, 7, "Hello World"),
    +      (8L, 8, "Hello World"),
    +      (20L, 20, "Hello World"))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'a, 'b, 'c)
    +
    +    val windowedTable = table
    +      .window(
    +        Over partitionBy 'c orderBy 'procTime preceding UNBOUNDED_ROW following CURRENT_ROW as 'w)
    +      .select('c, 'b.count over 'w as 'mycount)
    +      .select('c, 'mycount)
    +
    +    val results = windowedTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq(
    +      "Hello World,1", "Hello World,2", "Hello World,3",
    +      "Hello,1", "Hello,2", "Hello,3", "Hello,4", "Hello,5", "Hello,6")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testRowTimeUnBoundedPartitionedRangeOver(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    env.setStateBackend(getStateBackend)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    env.setParallelism(1)
    +
    +    val data = Seq(
    +      Left(14000005L, (1, 1L, "Hi")),
    +      Left(14000000L, (2, 1L, "Hello")),
    +      Left(14000002L, (1, 1L, "Hello")),
    +      Left(14000002L, (1, 2L, "Hello")),
    +      Left(14000002L, (1, 3L, "Hello world")),
    +      Left(14000003L, (2, 2L, "Hello world")),
    +      Left(14000003L, (2, 3L, "Hello world")),
    +      Right(14000020L),
    +      Left(14000021L, (1, 4L, "Hello world")),
    +      Left(14000022L, (1, 5L, "Hello world")),
    +      Left(14000022L, (1, 6L, "Hello world")),
    +      Left(14000022L, (1, 7L, "Hello world")),
    +      Left(14000023L, (2, 4L, "Hello world")),
    +      Left(14000023L, (2, 5L, "Hello world")),
    +      Right(14000030L)
    +    )
    +    val table = env
    +      .addSource(new RowTimeSourceFunction[(Int, Long, String)](data))
    +      .toTable(tEnv).as('a, 'b, 'c)
    +
    +    val windowedTable = table
    +      .window(Over partitionBy 'a orderBy 'rowtime preceding UNBOUNDED_RANGE following
    +         CURRENT_RANGE as 'w)
    +      .select(
    +        'a, 'b, 'c,
    +        'b.sum over 'w,
    +        'b.count over 'w,
    +        'b.avg over 'w,
    +        'b.max over 'w,
    +        'b.min over 'w)
    +
    +    val result = windowedTable.toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,1,Hello,6,3,2,3,1",
    +      "1,2,Hello,6,3,2,3,1",
    +      "1,3,Hello world,6,3,2,3,1",
    +      "1,1,Hi,7,4,1,3,1",
    +      "2,1,Hello,1,1,1,1,1",
    +      "2,2,Hello world,6,3,2,3,1",
    +      "2,3,Hello world,6,3,2,3,1",
    +      "1,4,Hello world,11,5,2,4,1",
    +      "1,5,Hello world,29,8,3,7,1",
    +      "1,6,Hello world,29,8,3,7,1",
    +      "1,7,Hello world,29,8,3,7,1",
    +      "2,4,Hello world,15,5,3,5,1",
    +      "2,5,Hello world,15,5,3,5,1"
    +    )
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testProcTimeBoundedPartitionedRangeOver(): Unit = {
    +
    +    val data = List(
    +      (1, 1L, 0, "Hallo", 1L),
    +      (2, 2L, 1, "Hallo Welt", 2L),
    +      (2, 3L, 2, "Hallo Welt wie", 1L),
    +      (3, 4L, 3, "Hallo Welt wie gehts?", 2L),
    +      (3, 5L, 4, "ABC", 2L),
    +      (3, 6L, 5, "BCD", 3L),
    +      (4, 7L, 6, "CDE", 2L),
    +      (4, 8L, 7, "DEF", 1L),
    +      (4, 9L, 8, "EFG", 1L),
    +      (4, 10L, 9, "FGH", 2L),
    +      (5, 11L, 10, "GHI", 1L),
    +      (5, 12L, 11, "HIJ", 3L),
    +      (5, 13L, 12, "IJK", 3L),
    +      (5, 14L, 13, "JKL", 2L),
    +      (5, 15L, 14, "KLM", 2L))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv).as('a, 'b, 'c, 'd, 'e)
    +
    +    val windowedTable = table
    +      .window(Over partitionBy 'a orderBy 'proctime preceding 4.rows following CURRENT_ROW as 'w)
    +      .select('a, 'c.sum over 'w, 'c.min over 'w)
    +    val result = windowedTable.toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList(
    +      "1,0,0",
    +      "2,1,1",
    +      "2,3,1",
    +      "3,3,3",
    +      "3,7,3",
    +      "3,12,3",
    +      "4,6,6",
    +      "4,13,6",
    +      "4,21,6",
    +      "4,30,6",
    +      "5,10,10",
    +      "5,21,10",
    +      "5,33,10",
    +      "5,46,10",
    +      "5,60,10")
    +
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testRowTimeBoundedPartitionedRowOver(): Unit = {
    --- End diff --
    
    In fact, I do not want to add more ITCase, and even I want to remove this ITCase, because TableAPI and SQL of the process function is the same, I think I have a full test in OverWindowTest is enough, so I hope in this class with the least TestCase, cover specific function points, such as `rows / range proc-time / row-time` is enough. What do you think?"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3743: [FLINK-6228][table] Integrating the OVER windows i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3743#discussion_r112494282
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala ---
    @@ -49,6 +57,128 @@ case class Call(functionName: String, args: Seq[Expression]) extends Expression
     }
     
     /**
    +  * Over expression for calcite over transform.
    +  *
    +  * @param agg             over-agg expression
    +  * @param aggAlias        agg alias for following `select()` clause.
    +  * @param overWindowAlias over window alias
    +  * @param overWindow      over window
    +  */
    +case class OverCall(
    +    agg: Aggregation,
    +    var aggAlias: Expression,
    +    overWindowAlias: Expression,
    +    var overWindow: OverWindow = null) extends Expression {
    +
    +  private[flink] def as(aggAlias: Expression): OverCall = {
    --- End diff --
    
    `OverCall` should not implement its own `as` method. We should use the regular `Alias` expression for renaming expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---