You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2017/06/11 13:58:28 UTC
flink git commit: [FLINK-6817] [table] Add OverWindowWithPreceding
class to guide users apply preceding in over window
Repository: flink
Updated Branches:
refs/heads/master 5d3506e88 -> 5f63bf98c
[FLINK-6817] [table] Add OverWindowWithPreceding class to guide users apply preceding in over window
This closes #4055
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f63bf98
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f63bf98
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f63bf98
Branch: refs/heads/master
Commit: 5f63bf98c3dc7c1fe28630300b68179b6700dcad
Parents: 5d3506e
Author: Jark Wu <wu...@alibaba-inc.com>
Authored: Fri Jun 2 21:28:45 2017 +0800
Committer: Jark Wu <wu...@alibaba-inc.com>
Committed: Sun Jun 11 17:29:38 2017 +0800
----------------------------------------------------------------------
.../apache/flink/table/api/java/windows.scala | 22 ++++++++++++--
.../apache/flink/table/api/scala/windows.scala | 19 ++++++++++--
.../org/apache/flink/table/api/windows.scala | 31 ++++----------------
3 files changed, 42 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5f63bf98/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
index 15208ce..f326f6f 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/java/windows.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.api.java
-import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithOrderBy, SlideWithSize, SessionWithGap}
+import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
import org.apache.flink.table.expressions.{Expression, ExpressionParser}
/**
@@ -98,7 +98,7 @@ object Over {
*/
def orderBy(orderBy: String): OverWindowWithOrderBy = {
val orderByExpr = ExpressionParser.parseExpression(orderBy)
- new OverWindowWithOrderBy(Seq[Expression](), orderByExpr)
+ new OverWindowWithOrderBy(Array[Expression](), orderByExpr)
}
/**
@@ -127,3 +127,21 @@ class PartitionedOver(private val partitionByExpr: Array[Expression]) {
new OverWindowWithOrderBy(partitionByExpr, orderByExpr)
}
}
+
+
+class OverWindowWithOrderBy(
+ private val partitionByExpr: Array[Expression],
+ private val orderByExpr: Expression) {
+
+ /**
+ * Set the preceding offset (based on time or row-count intervals) for over window.
+ *
+ * @param preceding preceding offset relative to the current row.
+ * @return this over window
+ */
+ def preceding(preceding: String): OverWindowWithPreceding = {
+ val precedingExpr = ExpressionParser.parseExpression(preceding)
+ new OverWindowWithPreceding(partitionByExpr, orderByExpr, precedingExpr)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f63bf98/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
index d0430c2..91bf1a6 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/windows.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.api.scala
-import org.apache.flink.table.api.{OverWindowWithOrderBy, SessionWithGap, SlideWithSize, TumbleWithSize}
+import org.apache.flink.table.api.{TumbleWithSize, OverWindowWithPreceding, SlideWithSize, SessionWithGap}
import org.apache.flink.table.expressions.Expression
/**
@@ -121,6 +121,21 @@ case class PartitionedOver(partitionBy: Array[Expression]) {
* For batch tables, refer to a timestamp or long attribute.
*/
def orderBy(orderBy: Expression): OverWindowWithOrderBy = {
- new OverWindowWithOrderBy(partitionBy, orderBy)
+ OverWindowWithOrderBy(partitionBy, orderBy)
}
}
+
+case class OverWindowWithOrderBy(partitionBy: Seq[Expression], orderBy: Expression) {
+
+
+ /**
+ * Set the preceding offset (based on time or row-count intervals) for over window.
+ *
+ * @param preceding preceding offset relative to the current row.
+ * @return this over window
+ */
+ def preceding(preceding: Expression): OverWindowWithPreceding = {
+ new OverWindowWithPreceding(partitionBy, orderBy, preceding)
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/5f63bf98/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
index 11ef360..ee022b1 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/windows.scala
@@ -68,11 +68,11 @@ case class UnboundedRange() extends Expression {
/**
* A partially defined over window.
*/
-class OverWindowWithOrderBy(
+class OverWindowWithPreceding(
private val partitionBy: Seq[Expression],
- private val orderBy: Expression) {
+ private val orderBy: Expression,
+ private val preceding: Expression) {
- private[flink] var preceding: Expression = _
private[flink] var following: Expression = _
/**
@@ -103,33 +103,12 @@ class OverWindowWithOrderBy(
}
/**
- * Set the preceding offset (based on time or row-count intervals) for over window.
- *
- * @param preceding preceding offset relative to the current row.
- * @return this over window
- */
- def preceding(preceding: String): OverWindowWithOrderBy = {
- this.preceding(ExpressionParser.parseExpression(preceding))
- }
-
- /**
- * Set the preceding offset (based on time or row-count intervals) for over window.
- *
- * @param preceding preceding offset relative to the current row.
- * @return this over window
- */
- def preceding(preceding: Expression): OverWindowWithOrderBy = {
- this.preceding = preceding
- this
- }
-
- /**
* Set the following offset (based on time or row-count intervals) for over window.
*
* @param following following offset that relative to the current row.
* @return this over window
*/
- def following(following: String): OverWindowWithOrderBy = {
+ def following(following: String): OverWindowWithPreceding = {
this.following(ExpressionParser.parseExpression(following))
}
@@ -139,7 +118,7 @@ class OverWindowWithOrderBy(
* @param following following offset that relative to the current row.
* @return this over window
*/
- def following(following: Expression): OverWindowWithOrderBy = {
+ def following(following: Expression): OverWindowWithPreceding = {
this.following = following
this
}