You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by haohui <gi...@git.apache.org> on 2017/04/03 21:34:28 UTC

[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...

GitHub user haohui opened a pull request:

    https://github.com/apache/flink/pull/3665

    [FLINK-6011] Support TUMBLE, HOP, SESSION window in streaming SQL.

    This PR adds supports for the `TUMBLE`, `HOP`, and `SESSION` windows in Flink.
    
    The work of supporting WindowStart and WindowEnd expressions will be deferred to FLINK-6012.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/haohui/flink FLINK-6011

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3665.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3665
    
----
commit ef54aea409ea95feb04e651a0fde2fd4b620d888
Author: Haohui Mai <wh...@apache.org>
Date:   2017-04-03T21:32:09Z

    [FLINK-6011] Support TUMBLE, HOP, SESSION window in streaming SQL.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109856293
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    Flink does only support windows with fixed configuration (SESSION windows have variable length, but the gap parameter is fixed). I'm also not sure if that would make sense. It's quite hard to reason about the behavior of a window with variable parameters, IMO.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109857875
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    Agree. 97d1a45 will throw an `TableException` if the configuration is not fixed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...

Posted by haohui <gi...@git.apache.org>.
Github user haohui commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109810006
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    I just tried out Calcite did not stop you from passing something like a {{RexCall}}. So yes, it can be dynamic.
    
    One question: whether Flink actually supports `GroupWindow` that has a dynamic size? Maybe I'm wrong but it does not seem so.
    
    If the answer is no maybe we should check that whether it is a `RexLiteral`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109616209
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala ---
    @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule
       }
     
       private def identifyWindow(field: RexNode): Option[Window] = {
    -    // Detects window expressions by pattern matching
    -    //   supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx),
    -    //   with time being equal to proctime() or rowtime()
         field match {
           case call: RexCall =>
             call.getOperator match {
    -          case _: SqlFloorFunction =>
    -            val operand = call.getOperands.get(1).asInstanceOf[RexLiteral]
    -            val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange]
    -            val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit)
    -            call.getType match {
    -              case TimeModeTypes.PROCTIME =>
    -                return Some(w)
    -              case TimeModeTypes.ROWTIME =>
    -                return Some(w.on("rowtime"))
    -              case _ =>
    -            }
    -          case _ =>
    +          case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow
    +          case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow
    +          case _ => None
             }
    -      case _ =>
    +      case _ => None
         }
    -    None
       }
    -
     }
     
    -object LogicalWindowAggregateRule {
    +private abstract class WindowTranslator {
    +  val call: RexCall
     
    -  private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate],
    -    RelOptRule.operand(classOf[LogicalProject], RelOptRule.none()))
    +  protected def unwrapLiteral[T](node: RexNode): T =
    +    node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T]
     
    -  private[flink] val INSTANCE = new LogicalWindowAggregateRule
    +  protected def getOperandAsLong(idx: Int): Long =
    +    unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue()
    --- End diff --
    
    Does Calcite ensure that operands can only be literals, no input reference?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3665


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window in strea...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3665
  
    Thanks for the update @haohui!
    Will merge this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window in strea...

Posted by shaoxuan-wang <gi...@git.apache.org>.
Github user shaoxuan-wang commented on the issue:

    https://github.com/apache/flink/pull/3665
  
    So we finally got those supported by Calcite 1.12\uff1fReally excited to see those features supported in flinkSQL. Thanks @haohui. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window in strea...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/3665
  
    Thanks for the PR @haohui!
    +1 to merge.
    I've been waiting for this feature :-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...

Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3665#discussion_r109615593
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala ---
    @@ -350,7 +350,16 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable {
         SqlStdOperatorTable.EXISTS,
         // EXTENSIONS
         EventTimeExtractor,
    -    ProcTimeExtractor
    +    ProcTimeExtractor,
    +    SqlStdOperatorTable.TUMBLE,
    +    SqlStdOperatorTable.TUMBLE_START,
    --- End diff --
    
    Do we already support the `START/END` functions? We should let them unsupported until they are implemented and tested.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---