You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/06/11 13:58:28 UTC

flink git commit: [FLINK-6817] [table] Add OverWindowWithPreceding class to guide users apply preceding in over window

Repository: flink
Updated Branches:
  refs/heads/master 5d3506e88 -> 5f63bf98c


[FLINK-6817] [table] Add OverWindowWithPreceding class to guide users apply preceding in over window

This closes #4055


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f63bf98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f63bf98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f63bf98

Branch: refs/heads/master
Commit: 5f63bf98c3dc7c1fe28630300b68179b6700dcad
Parents: 5d3506e
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Jun 2 21:28:45 2017 +0800
Committer: Jark Wu <wu...@alibaba-inc.com>
Committed: Sun Jun 11 17:29:38 2017 +0800

----------------------------------------------------------------------
 .../apache/flink/table/api/java/windows.scala   | 22 ++++++++++++--
 .../apache/flink/table/api/scala/windows.scala  | 19 ++++++++++--
 .../org/apache/flink/table/api/windows.scala    | 31 ++++----------------
 3 files changed, 42 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f63bf98/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
index 15208ce..f326f6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.api.java
 
-import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap}
+import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
 import org.apache.flink.table.expressions.{Expression, ExpressionParser}
 
 /**
@@ -98,7 +98,7 @@ object Over {
     */
   def orderBy(orderBy: String): OverWindowWithOrderBy = {
     val orderByExpr = ExpressionParser.parseExpression(orderBy)
-    new OverWindowWithOrderBy(Seq[Expression](), orderByExpr)
+    new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
   }
 
   /**
@@ -127,3 +127,21 @@ class PartitionedOver(private val partitionByExpr: Array[Expression]) {
     new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
   }
 }
+
+
+class OverWindowWithOrderBy(
+  private val partitionByExpr: Array[Expression],
+  private val orderByExpr: Expression) {
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over window.
+    *
+    * @param preceding preceding offset relative to the current row.
+    * @return this over window
+    */
+  def preceding(preceding: String): OverWindowWithPreceding = {
+    val precedingExpr = ExpressionParser.parseExpression(preceding)
+    new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f63bf98/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index d0430c2..91bf1a6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.api.scala
 
-import org.apache.flink.table.api.{OverWindowWithOrderBy, SessionWithGap, SlideWithSize, TumbleWithSize}
+import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
 import org.apache.flink.table.expressions.Expression
 
 /**
@@ -121,6 +121,21 @@ case class PartitionedOver(partitionBy: Array[Expression]) {
     * For batch tables, refer to a timestamp or long attribute.
     */
   def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
-    new OverWindowWithOrderBy(partitionBy, orderBy)
+    OverWindowWithOrderBy(partitionBy, orderBy)
   }
 }
+
+case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expression) {
+
+
+  /**
+    * Set the preceding offset (based on time or row-count intervals) for over window.
+    *
+    * @param preceding preceding offset relative to the current row.
+    * @return this over window
+    */
+  def preceding(preceding: Expression): OverWindowWithPreceding = {
+    new OverWindowWithPreceding(partitionBy, orderBy, preceding)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5f63bf98/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
index 11ef360..ee022b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -68,11 +68,11 @@ case class UnboundedRange() extends Expression {
 /**
   * A partially defined over window.
   */
-class OverWindowWithOrderBy(
+class OverWindowWithPreceding(
     private val partitionBy: Seq[Expression],
-    private val orderBy: Expression) {
+    private val orderBy: Expression,
+    private val preceding: Expression) {
 
-  private[flink] var preceding: Expression = _
   private[flink] var following: Expression = _
 
   /**
@@ -103,33 +103,12 @@ class OverWindowWithOrderBy(
   }
 
   /**
-    * Set the preceding offset (based on time or row-count intervals) for over window.
-    *
-    * @param preceding preceding offset relative to the current row.
-    * @return this over window
-    */
-  def preceding(preceding: String): OverWindowWithOrderBy = {
-    this.preceding(ExpressionParser.parseExpression(preceding))
-  }
-
-  /**
-    * Set the preceding offset (based on time or row-count intervals) for over window.
-    *
-    * @param preceding preceding offset relative to the current row.
-    * @return this over window
-    */
-  def preceding(preceding: Expression): OverWindowWithOrderBy = {
-    this.preceding = preceding
-    this
-  }
-
-  /**
     * Set the following offset (based on time or row-count intervals) for over window.
     *
     * @param following following offset that relative to the current row.
     * @return this over window
     */
-  def following(following: String): OverWindowWithOrderBy = {
+  def following(following: String): OverWindowWithPreceding = {
     this.following(ExpressionParser.parseExpression(following))
   }
 
@@ -139,7 +118,7 @@ class OverWindowWithOrderBy(
     * @param following following offset that relative to the current row.
     * @return this over window
     */
-  def following(following: Expression): OverWindowWithOrderBy = {
+  def following(following: Expression): OverWindowWithPreceding = {
     this.following = following
     this
   }