You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by haohui <gi...@git.apache.org> on 2017/02/01 22:08:19 UTC

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

GitHub user haohui opened a pull request:

    https://github.com/apache/flink/pull/3252

    [FLINK-5624] Support tumbling window on streaming tables in the SQL API.

    This is a POC to add tumbling window support for streaming tables in SQL.
    
    Essentially it recognizes the `LogicalWindow` construct in Calcite and transform it to the `LogicalWindowAggregate` in flink.
    
    Feedbacks are highly appreciated.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haohui/flink FLINK-5624

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3252.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3252
    
----
commit a8d4b5042e8bcd1b149f8915118c116e419690e0
Author: Haohui Mai <wh...@apache.org>
Date:   2017-02-01T22:03:44Z

    [FLINK-5624] Support tumbling window on streaming tables in the SQL API.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    Thanks for the update @haohui.
    PR is good to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    Adding `ROWTIME()` as an expression to enable users to specify event time windows.
    
    After trying multiple approaches at the end I settled down with translating `LogicalAggregate` directly to `DataStreamAggregate`. The translation removes the group-by expression from the aggregate and adds the same expression as a window.
    
    Note that `ROWTIME()` is actually translated to a call to the local timestamp. The expression has to be executable because Calcite creates a new project operator to compute the group-by expression, where the expression has to be executed. For example, the following query
    
    ```
    SELECT COUNT(*) FROM table GROUP BY FLOOR(ROWTIME() TO HOUR)
    ```
    
    will be translated to:
    
    ```
    LogicalAggregate(group={$0}, agg={COUNT(*)})
      LogicalProject($0=FLOOR(ROWTIME() TO HOUR))
      ...
    ```
    
    It's tempting to remove the group-by expression from the logical plan. However, it cannot be done using the optimization frameworks in Calcite. These frameworks expect the output types of the operators stay the same before and after the transformations. Removing the field actually changes the types thus Calcite will complain.
    
    The down side of this approach is that it might be difficult for Flink to catch malformed queries such as `SELECT COUNT(*) FROM table GROUP BY FLOOR(ROWTIME() / 2 TO HOUR)` at compile-time. Any ideas to improve the situation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r101248097
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---
    @@ -0,0 +1,139 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.plan.rules.datastream
    +
    +import java.util.Calendar
    +
    +import com.google.common.collect.ImmutableList
    +import org.apache.calcite.avatica.util.TimeUnitRange
    +import org.apache.calcite.plan._
    +import org.apache.calcite.plan.hep.HepRelVertex
    +import org.apache.calcite.rel.logical.{LogicalAggregate, LogicalProject}
    +import org.apache.calcite.rex.{RexCall, RexLiteral, RexNode}
    +import org.apache.calcite.sql.fun.SqlFloorFunction
    +import org.apache.calcite.util.ImmutableBitSet
    +import org.apache.flink.table.api.scala.Tumble
    +import org.apache.flink.table.api.{TableException, TumblingWindow, Window}
    +import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
    +import org.apache.flink.table.expressions._
    +import org.apache.flink.table.functions.EventTimeExtractor
    +import org.apache.flink.table.plan.logical.rel.LogicalWindowAggregate
    +
    +import scala.collection.JavaConversions._
    +
    +class LogicalWindowAggregateRule
    +  extends RelOptRule(
    +    LogicalWindowAggregateRule.LOGICAL_WINDOW_PREDICATE,
    +    "LogicalWindowAggregateRule") {
    +
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalAggregate]
    +
    +    val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    +    val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
    +
    +    val windowClause = recognizeWindow(agg)
    +    !distinctAggs && !groupSets && !agg.indicator && windowClause.isDefined
    +  }
    +
    +  /**
    +    * Transform LogicalAggregate with windowing expression to LogicalProject
    +    * + LogicalWindowAggregate + LogicalProject.
    +    *
    +    * The transformation adds an additional LogicalProject at the top to ensure
    +    * that the types are equivalent.
    +    */
    +  override def onMatch(call: RelOptRuleCall): Unit = {
    +    val agg = call.rel[LogicalAggregate](0)
    +    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
    +    val (windowExprIdx, window) = recognizeWindow(agg).get
    +    val newGroupSet = agg.getGroupSet.except(ImmutableBitSet.of(windowExprIdx))
    +
    +    val builder = call.builder()
    +    val rexBuilder = builder.getRexBuilder
    +    val zero = rexBuilder.makeTimestampLiteral(LogicalWindowAggregateRule.TIMESTAMP_ZERO, 3)
    +
    +    val newAgg = builder
    +      .push(project.getInput)
    +      .project(project.getChildExps.updated(windowExprIdx, zero))
    +      .aggregate(builder.groupKey(
    +        newGroupSet,
    +        agg.indicator, ImmutableList.of(newGroupSet)), agg.getAggCallList)
    +      .build().asInstanceOf[LogicalAggregate]
    +
    +    // Create an additional project to conform with types
    +    val transformed = call.builder()
    +    transformed.push(LogicalWindowAggregate.create(
    +      window.toLogicalWindow,
    +      Seq[NamedWindowProperty](),
    +      newAgg))
    +      .project(List(zero) ++ transformed.fields())
    --- End diff --
    
    The `zero` element must be injected at the position of the window attribute in the grouping set. 
    If you change the order of grouping attributes in the SQL query in the `testMultiGroup()` to `GROUP BY id, FLOOR(rowtime() TO HOUR)`, the planning fails.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100990608
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TimeModeIndicatorFunctions.scala ---
    @@ -33,10 +34,18 @@ object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
         SqlMonotonicity.INCREASING
     }
     
    -case class RowTime() extends LeafExpression {
    +case class TimeIndicator() extends LeafExpression {
    --- End diff --
    
    Should be `RowTime` (`ProcTime` will be added later).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r101239149
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/stream/sql/WindowAggregateITCase.java ---
    @@ -0,0 +1,135 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.api.java.stream.sql;
    +
    +import org.apache.flink.api.java.tuple.Tuple4;
    +import com.google.common.collect.ImmutableList;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +import org.apache.flink.table.api.Table;
    +import org.apache.flink.table.api.TableEnvironment;
    +import org.apache.flink.table.api.TableException;
    +import org.apache.flink.table.api.java.StreamTableEnvironment;
    +import org.apache.flink.table.api.scala.stream.utils.StreamITCase;
    +import org.apache.flink.types.Row;
    +import org.junit.Before;
    +import org.junit.Test;
    +
    +import java.sql.Timestamp;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +public class WindowAggregateITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    I don't think we need integration tests for this feature. We try to keep those to a minimum to not blow up the build time. Since window aggregates are already tested for the Table API, it should be sufficient to test the resulting execution plan.
    
    We have the `TableTestBase` which can be extended to validate the result of the optimization. This should be enough to test this feature. Since these tests are quite cheap, we can also test more queries also with differently ordered grouping expressions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100991050
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala ---
    @@ -225,12 +231,29 @@ abstract class StreamTableEnvironment(
         // decorrelate
         val decorPlan = RelDecorrelator.decorrelateQuery(relNode)
     
    +    val prePlanner = createHepPlanner
    --- End diff --
    
    I'll merge PR #3101 later today which adds a normalization phase before optimization by adding a HepPlanner.
    Could you integrate you changes with #3101?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100845516
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.expressions
    +
    +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName}
    +import org.apache.calcite.sql.validate.SqlMonotonicity
    +import org.apache.calcite.sql._
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
    +import org.apache.flink.table.api.TableException
    +
    +object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
    +  ReturnTypes.explicit(SqlTypeName.TIMESTAMP), null, OperandTypes.NILADIC,
    +  SqlFunctionCategory.SYSTEM) {
    +  override def getSyntax: SqlSyntax = SqlSyntax.FUNCTION
    +
    +  override def getMonotonicity(call: SqlOperatorBinding): SqlMonotonicity =
    +    SqlMonotonicity.INCREASING
    +}
    +
    +case class RowTime() extends LeafExpression {
    --- End diff --
    
    Use `CurrentTimestamp` like PR #3271


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3252


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    Merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    The v3 PR gets the best from both of the worlds -- the code generator will throw exceptions if the queries actually execute the `rowtime()`.
    
    Essentially it rewrites the project and the aggregate operators before passing the operators into the Volcano planner. @fhueske please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100995825
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamAggregateRule.scala ---
    @@ -159,9 +152,25 @@ class LogicalWindowAggregateRule
         }
         false
       }
    +
    +  private def rewriteTimeIndicatorOperators(agg: LogicalAggregate, groupExprIdx: Int) = {
    +    val project = agg.getInput.asInstanceOf[HepRelVertex].getCurrentRel.asInstanceOf[LogicalProject]
    +    val newProjectExpr = mutable.ArrayBuffer[RexNode]()
    +    newProjectExpr.appendAll(project.getChildExps)
    +    val rexBuilder = agg.getCluster.getRexBuilder
    +    newProjectExpr(groupExprIdx) = rexBuilder.makeTimestampLiteral(
    +      DataStreamAggregateRule.TIMESTAMP_ZERO, 3)
    +    val newProject = project.copy(project.getTraitSet, project.getInput,
    +      newProjectExpr, project.getRowType)
    +
    +    agg.copy(agg.getTraitSet, List(newProject)).asInstanceOf[LogicalAggregate]
    --- End diff --
    
    Can we create here a `LogicalAggregate` with adapted `groupSet`? 
    I think adding the `windowingGroupSet` to the `LogicalWindowAggregate` is not a very nice solution. It would be better if we could keep the existing code as it is without introducing workarounds for the SQL case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    Updated the PR to recognize the `GROUP BY` clause instead of the `OVER` clause.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    Discussed with @fhueske offline. Thanks a lot for the comments. The V4 PR implements the following:
    
    * Rebased on top of #3101 
    * `LogicalWindowAggregateRule` implements `RelOptRule` instead of the `ConvertRule`.
    * Instead of adding a new field in `LogicalWindowAggregate`, the implementation now transforms the original `LogicalAggregate(LogicalProject())` expression to `LogicalProject(LogicalWindowAggregate(LogicalProject(...)))`. The outermost projection ensures that the operators have the same row types before and after the transformation.
    
    Please take another look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3252: [FLINK-5624] Support tumbling window on streaming tables ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3252
  
    Hi @haohui, thanks for your contribution!
    
    The referenced JIRA is about adding support for group windows to SQL, not OVER (or row) windows. It should enable queries such as:
    
    ```
    SELECT a, sum(b) as sumB, TUMBLE_END(rowtime(), INTERVAL '1' HOUR) AS t,
      FROM myT
    GROUP BY TUMBLE(rowtime(), INTERVAL '1' HOUR), a;
    ```
    
    I saw that you contributed `TUMBLE` just very recently to Calcite, so this feature is not yet available in a Calcite release that we could link against. Until then, we could add support for the more manual version of SQL tumbling windows:
    
    ```
    SELECT a, SUM(b) AS sumB, CEIL(rowtime() TO HOUR) AS t,
      FROM myT
    GROUP BY CEIL(rowtime() TO HOUR), a
    ```
    
    We would also need to find a way to reference the `rowtime`. We do not want to expose this as an actual attribute in Flink's SQL (internally, Flink treats record timestamps as metadata which may not be modified by a query). The current approach would be to implement a built-in function which serves as a marker and is replaced during the translation.
    
    Best, Fabian


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100844487
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/streaming.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.expressions
    +
    +import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes, SqlTypeName}
    +import org.apache.calcite.sql.validate.SqlMonotonicity
    +import org.apache.calcite.sql._
    +import org.apache.calcite.tools.RelBuilder
    +import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
    +import org.apache.flink.table.api.TableException
    +
    +object EventTimeExtractor extends SqlFunction("ROWTIME", SqlKind.OTHER_FUNCTION,
    --- End diff --
    
    Can you move this to a `TimeModeIndicatorFunctions` class as suggested on `FlinkStreamingFunctionCatalog` of PR #3271. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100917108
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala ---
    @@ -290,6 +291,15 @@ object FunctionGenerator {
         Seq(),
         new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
     
    +  // Make ROWTIME() return the local timestamp
    +  // The function has to be executable as in windowed queries it is used
    +  // in the GroupBy expression. The results of the function, however, does
    +  // not matter.
    +  addSqlFunction(
    +    EventTimeExtractor,
    +    Seq(),
    +    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
    --- End diff --
    
    It turns out that the function is used all the way in runtime -- the translated plan looks like the following:
    
    ```
    LogicalAggregate(group={0}, ...)
      LogicalProject($0=FLOOR(ROWTIME() TO HOURS)))
    ```
    
    The expression is used in the projection. Unfortunately there is no trivial way to exclude it in Calcite as mentioned in the last comments.
    
    The results of expression is not used in the query though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100837291
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala ---
    @@ -290,6 +291,15 @@ object FunctionGenerator {
         Seq(),
         new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
     
    +  // Make ROWTIME() return the local timestamp
    +  // The function has to be executable as in windowed queries it is used
    +  // in the GroupBy expression. The results of the function, however, does
    +  // not matter.
    +  addSqlFunction(
    +    EventTimeExtractor,
    +    Seq(),
    +    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
    --- End diff --
    
    This function should not be called. So, I would suggest to create a `CallGenerator` that throws an exception, if possible when the code is generated, alternatively in the generated code. PR #3271 will need the same call generator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3252: [FLINK-5624] Support tumbling window on streaming ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3252#discussion_r100918614
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/FunctionGenerator.scala ---
    @@ -290,6 +291,15 @@ object FunctionGenerator {
         Seq(),
         new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
     
    +  // Make ROWTIME() return the local timestamp
    +  // The function has to be executable as in windowed queries it is used
    +  // in the GroupBy expression. The results of the function, however, does
    +  // not matter.
    +  addSqlFunction(
    +    EventTimeExtractor,
    +    Seq(),
    +    new CurrentTimePointCallGen(SqlTimeTypeInfo.TIMESTAMP, local = true))
    --- End diff --
    
    Ah, yes. You are right. It is still called in the DataStreamCalc and cannot be easily removed as you noted.
    
    Alright, then I'd suggest to just emit a casted `null`. This is not very nice, as it might also be called at any other place but since we will remove the marker function soon, it should not be a big issue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---