You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by twalthr <gi...@git.apache.org> on 2017/06/27 15:04:50 UTC

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

GitHub user twalthr opened a pull request:

    https://github.com/apache/flink/pull/4199

    [FLINK-6584] [table] Support multiple consecutive windows in SQL

    This allows for accessing rowtime and proctime attributes of windows in SQL. It introduces `TUMBLE_ROWTIME`, `TUMBLE_PROCTIME` etc. It also allows for multiple consecutive windows such as:
    
    ```
    SELECT
      TUMBLE_ROWTIME(rowtime, INTERVAL '0.004' SECOND),
      TUMBLE_END(rowtime, INTERVAL '0.004' SECOND),
      COUNT(`int`) as `int`
    FROM (
      SELECT
        TUMBLE_ROWTIME(rowtime, INTERVAL '0.002' SECOND) AS rowtime,
        COUNT(`int`) as `int`
      FROM $table
      GROUP BY TUMBLE(rowtime, INTERVAL '0.002' SECOND)
    )
    GROUP BY TUMBLE(rowtime, INTERVAL '0.004' SECOND)```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/twalthr/flink FLINK-6584

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4199.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4199
    
----
commit 28fcb9250e85d02d70a956ea36ade54905b6b56d
Author: twalthr <tw...@apache.org>
Date:   2017-05-23T13:57:23Z

    [FLINK-6584] [table] Support multiple consecutive windows in SQL

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124638860
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    --- End diff --
    
    It doesn't need it but we don't forbid to specify a processing time window after an event time window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r146580553
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/validation/GroupWindowValidationTest.scala ---
    @@ -79,4 +79,32 @@ class GroupWindowValidationTest extends TableTestBase {
           "GROUP BY TUMBLE(ts, INTERVAL '4' MINUTE)"
         util.verifySql(sql, "n/a")
       }
    +
    +  @Test(expected = classOf[TableException])
    --- End diff --
    
    Please add comments why the tests are expected to fail.


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r146349993
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -29,6 +29,7 @@ import org.apache.calcite.rel.{RelNode, RelShuttle}
     import org.apache.calcite.util.ImmutableBitSet
     import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
     import org.apache.flink.table.calcite.FlinkTypeFactory
    +import org.apache.flink.table.expressions.ResolvedFieldReference
    --- End diff --
    
    Can be removed?


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r146350727
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---
    @@ -401,16 +402,84 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
         ScalarSqlFunctions.LOG,
     
         // EXTENSIONS
    -    SqlStdOperatorTable.TUMBLE,
    -    SqlStdOperatorTable.TUMBLE_START,
    -    SqlStdOperatorTable.TUMBLE_END,
    -    SqlStdOperatorTable.HOP,
    -    SqlStdOperatorTable.HOP_START,
    -    SqlStdOperatorTable.HOP_END,
    -    SqlStdOperatorTable.SESSION,
    -    SqlStdOperatorTable.SESSION_START,
    -    SqlStdOperatorTable.SESSION_END
    +    BasicOperatorTable.TUMBLE,
    +    BasicOperatorTable.HOP,
    +    BasicOperatorTable.SESSION,
    +    BasicOperatorTable.TUMBLE_START,
    +    BasicOperatorTable.TUMBLE_END,
    +    BasicOperatorTable.HOP_START,
    +    BasicOperatorTable.HOP_END,
    +    BasicOperatorTable.SESSION_START,
    +    BasicOperatorTable.SESSION_END,
    +    BasicOperatorTable.TUMBLE_PROCTIME,
    +    BasicOperatorTable.TUMBLE_ROWTIME,
    +    BasicOperatorTable.HOP_PROCTIME,
    +    BasicOperatorTable.HOP_ROWTIME,
    +    BasicOperatorTable.SESSION_PROCTIME,
    +    BasicOperatorTable.SESSION_ROWTIME
       )
     
       builtInSqlOperators.foreach(register)
     }
    +
    +object BasicOperatorTable {
    --- End diff --
    
    SQL documentation needs to be updated to include the window time function `TUMBLE_ROWTIME` etc.


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124344855
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    +    } else if (isProctime) {
    +      Seq(NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    +    } else {
    +      Seq()
    +    }
    +
    +    val properties = startEndProperties ++ timeProperties
    +
    +    // retrieve window start and end properties
         val transformed = call.builder()
         val rexBuilder = transformed.getRexBuilder
         transformed.push(LogicalWindowAggregate.create(
    -      agg.getWindow,
    -      Seq(
    -        NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
    -        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
    -      ), agg)
    +      window,
    +      properties,
    +      agg)
         )
     
         // forward window start and end properties
         transformed.project(
    -      innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
    +      innerProject.getProjects ++ properties.map(np => transformed.field(np.name)))
     
         def replaceGroupAuxiliaries(node: RexNode): RexNode = {
           node match {
             case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
               // replace expression by access to window start
               rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
    +
             case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
               // replace expression by access to window end
               rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
    +
    +        case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) =>
    +          if (isProctime) {
    +            throw ValidationException("A proctime window cannot provide a rowtime attribute.")
    +          } else if (isRowtime) {
    +            // replace expression by access to window rowtime
    +            transformed.field("w$rowtime")
    +          } else {
    +            throw TableException("Accessing the rowtime attribute of a window is not yet " +
    +              "supported in a batch environment.")
    +          }
    +
    +        case c: RexCall if WindowStartEndPropertiesRule.isWindowProctime(c) =>
    +          if (isProctime) {
    +            // replace expression by access to window proctime
    +            transformed.field("w$proctime")
    +          } else {
    +            throw ValidationException("Proctime is not supported in a batch environment.")
    +          }
    --- End diff --
    
    We can throw this exception in a Stream rowtime window if we want query `TUMBLE_PROCTIME`, So I thinks this message should be improve or add a `isRowtime` process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124630671
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java ---
    @@ -50,7 +50,8 @@
      * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
      * </pre></blockquote>
      */
    -class SqlGroupFunction extends SqlFunction {
    +// FLINK QUICK FIX
    +public class SqlGroupFunction extends SqlFunction {
    --- End diff --
    
    Calcite 1.13 was released and we are planning to update the dependency. 
    I guess we cannot simply drop this file and use Calcite's version, if we start modifying the code. 
    
    Do we have plans to contribute the changes back or do we want to keep this file?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r125002758
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---
    @@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
                 isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
               updatedCall
     
    +        case BasicOperatorTable.TUMBLE_ROWTIME |
    +            BasicOperatorTable.TUMBLE_PROCTIME |
    +            BasicOperatorTable.HOP_ROWTIME |
    +            BasicOperatorTable.HOP_PROCTIME |
    +            BasicOperatorTable.SESSION_ROWTIME |
    +            BasicOperatorTable.SESSION_PROCTIME if isTimeIndicatorType(updatedCall.getType) =>
    +          updatedCall
    --- End diff --
    
    I thought about this again, I think we have to use this approach. Because the operands are always materialized, because of the shape of the relational tree (groupings are always materialized which is correct).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124725025
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    +    } else if (isProctime) {
    +      Seq(NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    +    } else {
    +      Seq()
    +    }
    +
    +    val properties = startEndProperties ++ timeProperties
    +
    +    // retrieve window start and end properties
         val transformed = call.builder()
         val rexBuilder = transformed.getRexBuilder
         transformed.push(LogicalWindowAggregate.create(
    -      agg.getWindow,
    -      Seq(
    -        NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
    -        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
    -      ), agg)
    +      window,
    +      properties,
    +      agg)
         )
     
         // forward window start and end properties
         transformed.project(
    -      innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
    +      innerProject.getProjects ++ properties.map(np => transformed.field(np.name)))
     
         def replaceGroupAuxiliaries(node: RexNode): RexNode = {
           node match {
             case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
               // replace expression by access to window start
               rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
    +
             case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
               // replace expression by access to window end
               rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
    +
    +        case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) =>
    +          if (isProctime) {
    +            throw ValidationException("A proctime window cannot provide a rowtime attribute.")
    +          } else if (isRowtime) {
    +            // replace expression by access to window rowtime
    +            transformed.field("w$rowtime")
    +          } else {
    +            throw TableException("Accessing the rowtime attribute of a window is not yet " +
    +              "supported in a batch environment.")
    +          }
    +
    +        case c: RexCall if WindowStartEndPropertiesRule.isWindowProctime(c) =>
    +          if (isProctime) {
    --- End diff --
    
    add a  `isRowtime` condition branch? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4199: [FLINK-6584] [table] Support multiple consecutive windows...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4199
  
    I opened a PR for Calcite https://github.com/apache/calcite/pull/549. I will rebase this branch.


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124991860
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    --- End diff --
    
    Actually it is the other way around. We forbid a event time window after a processing time window.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124990940
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---
    @@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
                 isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
               updatedCall
     
    +        case BasicOperatorTable.TUMBLE_ROWTIME |
    +            BasicOperatorTable.TUMBLE_PROCTIME |
    +            BasicOperatorTable.HOP_ROWTIME |
    +            BasicOperatorTable.HOP_PROCTIME |
    +            BasicOperatorTable.SESSION_ROWTIME |
    +            BasicOperatorTable.SESSION_PROCTIME if isTimeIndicatorType(updatedCall.getType) =>
    +          updatedCall
    --- End diff --
    
    But you are right, it checks the return type and not the operands which is a bug.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4199: [FLINK-6584] [table] Support multiple consecutive windows...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4199
  
    @fhueske @wuchong @sunjincheng121 I updated the PR. I will open an issue and PR for Calcite now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4199: [FLINK-6584] [table] Support multiple consecutive windows...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4199
  
    Hi @twalthr, I looked into this PR again. We should add this to the next release to also support window joins on a window aggregated table.
    
    I tried to rebase it on the current master. Unfortunately, this resulted in many conflicts. Can you rebase the PR (or manually port the changes to the current master)?
    
    Thanks, Fabian


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124990474
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---
    @@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
                 isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
               updatedCall
     
    +        case BasicOperatorTable.TUMBLE_ROWTIME |
    +            BasicOperatorTable.TUMBLE_PROCTIME |
    +            BasicOperatorTable.HOP_ROWTIME |
    +            BasicOperatorTable.HOP_PROCTIME |
    +            BasicOperatorTable.SESSION_ROWTIME |
    +            BasicOperatorTable.SESSION_PROCTIME if isTimeIndicatorType(updatedCall.getType) =>
    +          updatedCall
    --- End diff --
    
    No, we need this check because the previous visitCall might changed the indicator and the function is called on a non-time attribute now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124335785
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala ---
    @@ -349,6 +350,14 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
                 isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
               updatedCall
     
    +        case BasicOperatorTable.TUMBLE_ROWTIME |
    +            BasicOperatorTable.TUMBLE_PROCTIME |
    +            BasicOperatorTable.HOP_ROWTIME |
    +            BasicOperatorTable.HOP_PROCTIME |
    +            BasicOperatorTable.SESSION_ROWTIME |
    +            BasicOperatorTable.SESSION_PROCTIME if isTimeIndicatorType(updatedCall.getType) =>
    +          updatedCall
    --- End diff --
    
    Can we remove the condition of `if isTimeIndicatorType(updatedCall.getType) `?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r146575845
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.common
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
    +import org.apache.calcite.rex.{RexCall, RexNode}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.{TableException, ValidationException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
    +import org.apache.flink.table.validate.BasicOperatorTable
    +
    +import scala.collection.JavaConversions._
    +
    +abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String)
    +  extends RelOptRule(rulePredicate, ruleName) {
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val project = call.rel(0).asInstanceOf[LogicalProject]
    +    // project includes at least on group auxiliary function
    +
    +    def hasGroupAuxiliaries(node: RexNode): Boolean = {
    +      node match {
    +        case c: RexCall if c.getOperator.isGroupAuxiliary => true
    +        case c: RexCall =>
    +          c.operands.exists(hasGroupAuxiliaries)
    +        case _ => false
    +      }
    +    }
    +
    +    project.getProjects.exists(hasGroupAuxiliaries)
    +  }
    +
    +  def convertWindowNodes(
    +      builder: RelBuilder,
    +      project: LogicalProject,
    +      filter: Option[LogicalFilter],
    +      innerProject: LogicalProject,
    +      agg: LogicalWindowAggregate)
    +    : RelNode = {
    +
    +    val rexBuilder = builder.getRexBuilder
    +
    +    val window = agg.getWindow
    +
    +    val isRowtime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = ExpressionUtils.isProctimeAttribute(window.timeAttribute)
    +
    +    def propertyName(name: String): String =
    +      window.aliasAttribute.asInstanceOf[WindowReference].name + name
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty(propertyName("start"), WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty(propertyName("end"), WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty(propertyName("rowtime"), RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty(propertyName("proctime"), ProctimeAttribute(window.aliasAttribute)))
    +    } else if (isProctime) {
    +      Seq(NamedWindowProperty(propertyName("proctime"), ProctimeAttribute(window.aliasAttribute)))
    +    } else {
    +      Seq()
    +    }
    +
    +    val properties = startEndProperties ++ timeProperties
    +
    +    // retrieve window start and end properties
    --- End diff --
    
    `retrieve window properties`?


---

[GitHub] flink issue #4199: [FLINK-6584] [table] Support multiple consecutive windows...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4199
  
    See https://issues.apache.org/jira/browse/CALCITE-1867


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124988636
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java ---
    @@ -1900,7 +1900,7 @@ public void unparse(
     		new SqlGroupFunction(SqlKind.TUMBLE, null,
     			OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
     				OperandTypes.DATETIME_INTERVAL_TIME)) {
    -			@Override List<SqlGroupFunction> getAuxiliaryFunctions() {
    +			@Override public List<SqlGroupFunction> getAuxiliaryFunctions() {
    --- End diff --
    
    As I mentioned in the PR description: "We need to get the visibility changes that I made in Calcite in the next Calcite release." Right now it is not possible to add custom SqlGroupFunctions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4199: [FLINK-6584] [table] Support multiple consecutive windows...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/4199
  
    I'll make a few last changes and will merge this PR.


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r125004836
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    --- End diff --
    
    I agree. I will change this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by wuchong <gi...@git.apache.org>.
Github user wuchong commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124733642
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    --- End diff --
    
    We have so many `w$...` here, can we create the String constants and reference them ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4199: [FLINK-6584] [table] Support multiple consecutive windows...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:

    https://github.com/apache/flink/pull/4199
  
    @fhueske @wuchong I updated the PR. 


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124336278
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/LogicalWindowAggregateRule.scala ---
    @@ -28,6 +28,7 @@ import org.apache.calcite.util.ImmutableBitSet
     import org.apache.flink.table.api._
    --- End diff --
    
    Please remove useless impor at line 26 `import org.apache.calcite.sql.fun.SqlStdOperatorTable`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124335359
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/functions/MathFunctions.scala ---
    @@ -16,7 +16,7 @@
      * limitations under the License.
      */
     
    -package org.apache.flink.table.functions.utils
    +package org.apache.flink.table.runtime.functions
    --- End diff --
    
    I suggest `MathFunctions`->`ScalarFunctions`. so that we can add all scalar functions in one file. and in the `...runtime.functions` package we have three file `ScalarFunctions`,`TableFunctions` and `AggFunctions`.Which in FLINK-6810 plan to do. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124341520
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    --- End diff --
    
    Why rowtime windows need the proctime property?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4199


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r125003218
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlGroupFunction.java ---
    @@ -50,7 +50,8 @@
      * GROUP BY HOP(rowtime, INTERVAL '1' HOUR), productId
      * </pre></blockquote>
      */
    -class SqlGroupFunction extends SqlFunction {
    +// FLINK QUICK FIX
    +public class SqlGroupFunction extends SqlFunction {
    --- End diff --
    
    Yes, I will open a issue and PR for Calcite today. I think visibility changes should not be a big problem for the Calcite folks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r124329598
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/sql/fun/SqlStdOperatorTable.java ---
    @@ -1900,7 +1900,7 @@ public void unparse(
     		new SqlGroupFunction(SqlKind.TUMBLE, null,
     			OperandTypes.or(OperandTypes.DATETIME_INTERVAL,
     				OperandTypes.DATETIME_INTERVAL_TIME)) {
    -			@Override List<SqlGroupFunction> getAuxiliaryFunctions() {
    +			@Override public List<SqlGroupFunction> getAuxiliaryFunctions() {
    --- End diff --
    
    Dose this method only using in `SqlStdOperatorTable.java`? if so , why we add `public`. I find calcite have not add this modifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r146575894
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowPropertiesRule.scala ---
    @@ -0,0 +1,256 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.plan.rules.common
    +
    +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
    +import org.apache.calcite.rel.RelNode
    +import org.apache.calcite.rel.logical.{LogicalFilter, LogicalProject}
    +import org.apache.calcite.rex.{RexCall, RexNode}
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.table.api.{TableException, ValidationException}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
    +import org.apache.flink.table.validate.BasicOperatorTable
    +
    +import scala.collection.JavaConversions._
    +
    +abstract class WindowPropertiesBaseRule(rulePredicate: RelOptRuleOperand, ruleName: String)
    +  extends RelOptRule(rulePredicate, ruleName) {
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val project = call.rel(0).asInstanceOf[LogicalProject]
    +    // project includes at least on group auxiliary function
    +
    +    def hasGroupAuxiliaries(node: RexNode): Boolean = {
    +      node match {
    +        case c: RexCall if c.getOperator.isGroupAuxiliary => true
    +        case c: RexCall =>
    +          c.operands.exists(hasGroupAuxiliaries)
    +        case _ => false
    +      }
    +    }
    +
    +    project.getProjects.exists(hasGroupAuxiliaries)
    +  }
    +
    +  def convertWindowNodes(
    +      builder: RelBuilder,
    +      project: LogicalProject,
    +      filter: Option[LogicalFilter],
    +      innerProject: LogicalProject,
    +      agg: LogicalWindowAggregate)
    +    : RelNode = {
    +
    +    val rexBuilder = builder.getRexBuilder
    +
    +    val window = agg.getWindow
    +
    +    val isRowtime = ExpressionUtils.isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = ExpressionUtils.isProctimeAttribute(window.timeAttribute)
    +
    +    def propertyName(name: String): String =
    +      window.aliasAttribute.asInstanceOf[WindowReference].name + name
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty(propertyName("start"), WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty(propertyName("end"), WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty(propertyName("rowtime"), RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty(propertyName("proctime"), ProctimeAttribute(window.aliasAttribute)))
    +    } else if (isProctime) {
    +      Seq(NamedWindowProperty(propertyName("proctime"), ProctimeAttribute(window.aliasAttribute)))
    +    } else {
    +      Seq()
    +    }
    +
    +    val properties = startEndProperties ++ timeProperties
    +
    +    // retrieve window start and end properties
    +    builder.push(agg.copy(properties))
    +
    +    // forward window start and end properties
    --- End diff --
    
    `forward window properties`?


---

[GitHub] flink pull request #4199: [FLINK-6584] [table] Support multiple consecutive ...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4199#discussion_r125003073
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/WindowStartEndPropertiesRule.scala ---
    @@ -54,34 +56,75 @@ class WindowStartEndPropertiesRule
         val project = call.rel(0).asInstanceOf[LogicalProject]
         val innerProject = call.rel(1).asInstanceOf[LogicalProject]
         val agg = call.rel(2).asInstanceOf[LogicalWindowAggregate]
    +    val window = agg.getWindow
     
    -    // Retrieve window start and end properties
    +    val isRowtime = isRowtimeAttribute(window.timeAttribute)
    +    val isProctime = isProctimeAttribute(window.timeAttribute)
    +
    +    val startEndProperties = Seq(
    +      NamedWindowProperty("w$start", WindowStart(window.aliasAttribute)),
    +      NamedWindowProperty("w$end", WindowEnd(window.aliasAttribute)))
    +
    +    // allow rowtime/proctime for rowtime windows and proctime for proctime windows
    +    val timeProperties = if (isRowtime) {
    +      Seq(
    +        NamedWindowProperty("w$rowtime", RowtimeAttribute(window.aliasAttribute)),
    +        NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    +    } else if (isProctime) {
    +      Seq(NamedWindowProperty("w$proctime", ProctimeAttribute(window.aliasAttribute)))
    +    } else {
    +      Seq()
    +    }
    +
    +    val properties = startEndProperties ++ timeProperties
    +
    +    // retrieve window start and end properties
         val transformed = call.builder()
         val rexBuilder = transformed.getRexBuilder
         transformed.push(LogicalWindowAggregate.create(
    -      agg.getWindow,
    -      Seq(
    -        NamedWindowProperty("w$start", WindowStart(agg.getWindow.aliasAttribute)),
    -        NamedWindowProperty("w$end", WindowEnd(agg.getWindow.aliasAttribute))
    -      ), agg)
    +      window,
    +      properties,
    +      agg)
         )
     
         // forward window start and end properties
         transformed.project(
    -      innerProject.getProjects ++ Seq(transformed.field("w$start"), transformed.field("w$end")))
    +      innerProject.getProjects ++ properties.map(np => transformed.field(np.name)))
     
         def replaceGroupAuxiliaries(node: RexNode): RexNode = {
           node match {
             case c: RexCall if WindowStartEndPropertiesRule.isWindowStart(c) =>
               // replace expression by access to window start
               rexBuilder.makeCast(c.getType, transformed.field("w$start"), false)
    +
             case c: RexCall if WindowStartEndPropertiesRule.isWindowEnd(c) =>
               // replace expression by access to window end
               rexBuilder.makeCast(c.getType, transformed.field("w$end"), false)
    +
    +        case c: RexCall if WindowStartEndPropertiesRule.isWindowRowtime(c) =>
    +          if (isProctime) {
    +            throw ValidationException("A proctime window cannot provide a rowtime attribute.")
    +          } else if (isRowtime) {
    +            // replace expression by access to window rowtime
    +            transformed.field("w$rowtime")
    +          } else {
    +            throw TableException("Accessing the rowtime attribute of a window is not yet " +
    +              "supported in a batch environment.")
    +          }
    +
    +        case c: RexCall if WindowStartEndPropertiesRule.isWindowProctime(c) =>
    +          if (isProctime) {
    +            // replace expression by access to window proctime
    +            transformed.field("w$proctime")
    +          } else {
    +            throw ValidationException("Proctime is not supported in a batch environment.")
    +          }
    --- End diff --
    
    Good point.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---