You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/15 10:15:08 UTC
flink git commit: [FLINK-6517] [table] Support multiple consecutive
windows
Repository: flink
Updated Branches:
refs/heads/master ae423e1d1 -> c86f46cdc
[FLINK-6517] [table] Support multiple consecutive windows
This closes #3897.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c86f46cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c86f46cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c86f46cd
Branch: refs/heads/master
Commit: c86f46cdc1c0e2be30876ef358e7ab3c1daa14e9
Parents: ae423e1
Author: twalthr <tw...@apache.org>
Authored: Fri May 12 10:03:25 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon May 15 12:14:36 2017 +0200
----------------------------------------------------------------------
.../table/api/StreamTableEnvironment.scala | 3 +
.../flink/table/api/scala/expressionDsl.scala | 8 ++-
.../calcite/RelTimeIndicatorConverter.scala | 18 ++++--
.../table/expressions/fieldExpression.scala | 60 ++++++++++++++-----
.../table/expressions/windowProperties.scala | 25 +++++---
.../table/plan/logical/LogicalWindow.scala | 2 +-
.../flink/table/plan/logical/operators.scala | 36 +++++++++---
.../DataStreamGroupWindowAggregate.scala | 8 ++-
.../DataStreamLogicalWindowAggregateRule.scala | 10 ++--
.../table/typeutils/TimeIndicatorTypeInfo.scala | 3 +-
.../stream/StreamTableEnvironmentTest.scala | 7 +++
.../api/scala/stream/TableSourceTest.scala | 4 +-
.../scala/stream/table/GroupWindowTest.scala | 27 ++++-----
.../calcite/RelTimeIndicatorConverterTest.scala | 54 ++++++++++++++++-
.../datastream/TimeAttributesITCase.scala | 62 +++++++++++++++++++-
15 files changed, 260 insertions(+), 67 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 994ac80..c430b21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -467,6 +467,9 @@ abstract class StreamTableEnvironment(
proctime = Some(idx, name)
}
case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames
+
+ case _ =>
+ throw new TableException("Time attributes can only be defined on field references.")
}
if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 6d15212..b87bb6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -234,12 +234,14 @@ trait ImplicitExpressionOperations {
def desc = Desc(expr)
/**
- * Returns the start time of a window when applied on a window reference.
+ * Returns the start time (inclusive) of a window when applied on a window reference.
*/
def start = WindowStart(expr)
/**
- * Returns the end time of a window when applied on a window reference.
+ * Returns the end time (exclusive) of a window when applied on a window reference.
+ *
+ * e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
*/
def end = WindowEnd(expr)
@@ -683,7 +685,7 @@ trait ImplicitExpressionOperations {
*/
def element() = ArrayElement(expr)
- // Schema definition
+ // Time definition
/**
* Declares a field as the rowtime attribute for indicating, accessing, and working in
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 7ceb397..21fa70b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -342,10 +342,20 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
}
// remove time indicator return type
- if (isTimeIndicatorType(updatedCall.getType)) {
- updatedCall.clone(timestamp, materializedOperands)
- } else {
- updatedCall.clone(updatedCall.getType, materializedOperands)
+ updatedCall.getOperator match {
+
+ // we do not modify AS if operand has not been materialized
+ case SqlStdOperatorTable.AS if
+ isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
+ updatedCall
+
+ // materialize function's result and operands
+ case _ if isTimeIndicatorType(updatedCall.getType) =>
+ updatedCall.clone(timestamp, materializedOperands)
+
+ // materialize function's operands only
+ case _ =>
+ updatedCall.clone(updatedCall.getType, materializedOperands)
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index 362d846..99adabf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -19,8 +19,10 @@ package org.apache.flink.table.expressions
import org.apache.calcite.rex.RexNode
import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, isTimeIndicatorType}
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
@@ -117,13 +119,13 @@ case class UnresolvedAlias(child: Expression) extends UnaryExpression with Named
override private[flink] lazy val valid = false
}
-case class WindowReference(name: String) extends Attribute {
+case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute {
override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
throw new UnsupportedOperationException("A window reference can not be used solely.")
override private[flink] def resultType: TypeInformation[_] =
- throw new UnsupportedOperationException("A window reference has no result type.")
+ tpe.getOrElse(throw UnresolvedException("Could not resolve type of referenced window."))
override private[flink] def withName(newName: String): Attribute = {
if (newName == name) {
@@ -132,31 +134,61 @@ case class WindowReference(name: String) extends Attribute {
throw new ValidationException("Cannot rename window reference.")
}
}
+
+ override def toString: String = s"'$name"
}
abstract class TimeAttribute(val expression: Expression)
extends UnaryExpression
- with NamedExpression {
+ with WindowProperty {
override private[flink] def child: Expression = expression
-
- override private[flink] def name: String = expression match {
- case UnresolvedFieldReference(name) => name
- case _ => throw new ValidationException("Unresolved field reference expected.")
- }
-
- override private[flink] def toAttribute: Attribute =
- throw new UnsupportedOperationException("Time attribute can not be used solely.")
}
case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
- override private[flink] def resultType: TypeInformation[_] =
+ override private[flink] def validateInput(): ValidationResult = {
+ child match {
+ case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) =>
+ ValidationFailure("A proctime window cannot provide a rowtime attribute.")
+ case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+ ValidationSuccess
+ case WindowReference(_, _) =>
+ ValidationFailure("Reference to a rowtime or proctime window required.")
+ case _ =>
+ ValidationFailure(
+ "The '.rowtime' expression can only be used for table definitions and windows.")
+ }
+ }
+
+ override def resultType: TypeInformation[_] =
TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+
+ override def toNamedWindowProperty(name: String): NamedWindowProperty =
+ NamedWindowProperty(name, this)
+
+ override def toString: String = s"rowtime($child)"
}
case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
- override private[flink] def resultType: TypeInformation[_] =
+ override private[flink] def validateInput(): ValidationResult = {
+ child match {
+ case WindowReference(_, Some(tpe)) if isTimeIndicatorType(tpe) =>
+ ValidationSuccess
+ case WindowReference(_, _) =>
+ ValidationFailure("Reference to a rowtime or proctime window required.")
+ case _ =>
+ ValidationFailure(
+ "The '.proctime' expression can only be used for table definitions and windows.")
+ }
+ }
+
+ override def resultType: TypeInformation[_] =
TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+
+ override def toNamedWindowProperty(name: String): NamedWindowProperty =
+ NamedWindowProperty(name, this)
+
+ override def toString: String = s"proctime($child)"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
index 990d928..e119247 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
@@ -20,12 +20,22 @@ package org.apache.flink.table.expressions
import org.apache.calcite.rex.RexNode
import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.table.calcite.FlinkRelBuilder
import FlinkRelBuilder.NamedWindowProperty
import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
-abstract class WindowProperty(child: Expression) extends UnaryExpression {
+trait WindowProperty {
+
+ def toNamedWindowProperty(name: String): NamedWindowProperty
+
+ def resultType: TypeInformation[_]
+
+}
+
+abstract class AbstractWindowProperty(child: Expression)
+ extends UnaryExpression
+ with WindowProperty {
override def toString = s"WindowProperty($child)"
@@ -39,20 +49,19 @@ abstract class WindowProperty(child: Expression) extends UnaryExpression {
ValidationFailure("Child must be a window reference.")
}
- private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder)
- : NamedWindowProperty = NamedWindowProperty(name, this)
+ def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this)
}
-case class WindowStart(child: Expression) extends WindowProperty(child) {
+case class WindowStart(child: Expression) extends AbstractWindowProperty(child) {
- override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+ override def resultType = SqlTimeTypeInfo.TIMESTAMP
override def toString: String = s"start($child)"
}
-case class WindowEnd(child: Expression) extends WindowProperty(child) {
+case class WindowEnd(child: Expression) extends AbstractWindowProperty(child) {
- override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+ override def resultType = SqlTimeTypeInfo.TIMESTAMP
override def toString: String = s"end($child)"
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
index 6161ef0..a328703 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
@@ -36,7 +36,7 @@ abstract class LogicalWindow(
def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this
def validate(tableEnv: TableEnvironment): ValidationResult = aliasAttribute match {
- case WindowReference(_) => ValidationSuccess
+ case WindowReference(_, _) => ValidationSuccess
case _ => ValidationFailure("Window reference for window expected.")
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 36067eb..bfb6cbf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -552,20 +552,38 @@ case class WindowAggregate(
override def resolveReference(
tableEnv: TableEnvironment,
name: String)
- : Option[NamedExpression] = window.aliasAttribute match {
+ : Option[NamedExpression] = {
+
+ def resolveAlias(alias: String) = {
+ // check if reference can already be resolved by input fields
+ val found = super.resolveReference(tableEnv, name)
+ if (found.isDefined) {
+ failValidation(s"Reference $name is ambiguous.")
+ } else {
+ // resolve type of window reference
+ val resolvedType = window.timeAttribute match {
+ case UnresolvedFieldReference(n) =>
+ super.resolveReference(tableEnv, n) match {
+ case Some(ResolvedFieldReference(_, tpe)) => Some(tpe)
+ case _ => None
+ }
+ case _ => None
+ }
+ // let validation phase throw an error if type could not be resolved
+ Some(WindowReference(name, resolvedType))
+ }
+ }
+
+ window.aliasAttribute match {
// resolve reference to this window's name
case UnresolvedFieldReference(alias) if name == alias =>
- // check if reference can already be resolved by input fields
- val found = super.resolveReference(tableEnv, name)
- if (found.isDefined) {
- failValidation(s"Reference $name is ambiguous.")
- } else {
- Some(WindowReference(name))
- }
+ resolveAlias(alias)
+
case _ =>
// resolve references as usual
super.resolveReference(tableEnv, name)
}
+ }
override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
val flinkRelBuilder = relBuilder.asInstanceOf[FlinkRelBuilder]
@@ -574,7 +592,7 @@ case class WindowAggregate(
window,
relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
propertyExpressions.map {
- case Alias(prop: WindowProperty, name, _) => prop.toNamedWindowProperty(name)(relBuilder)
+ case Alias(prop: WindowProperty, name, _) => prop.toNamedWindowProperty(name)
case _ => throw new RuntimeException("This should never happen.")
},
aggregateExpressions.map {
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 2a71592..c158579 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
import org.apache.flink.table.codegen.CodeGenerator
import org.apache.flink.table.expressions.ExpressionUtils._
import org.apache.flink.table.plan.logical._
@@ -118,6 +119,9 @@ class DataStreamGroupWindowAggregate(
inputSchema.mapAggregateCall(namedAggregate.left),
namedAggregate.right)
}
+ val physicalNamedProperties = namedProperties
+ .filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType))
+
val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
if (consumeRetraction) {
@@ -159,7 +163,7 @@ class DataStreamGroupWindowAggregate(
physicalGrouping.length,
physicalNamedAggregates.size,
schema.physicalArity,
- namedProperties)
+ physicalNamedProperties)
val keyedStream = inputDS.keyBy(physicalGrouping: _*)
val windowedStream =
@@ -185,7 +189,7 @@ class DataStreamGroupWindowAggregate(
val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
window,
schema.physicalArity,
- namedProperties)
+ physicalNamedProperties)
val windowedStream =
createNonKeyedWindowedStream(window, inputDS)
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index d57d4cc..38de539 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -23,10 +23,10 @@ import java.math.{BigDecimal => JBigDecimal}
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.table.api.{TableException, Window}
import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
+import org.apache.flink.table.api.{TableException, Window}
import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, WindowReference}
import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
@@ -84,7 +84,7 @@ class DataStreamLogicalWindowAggregateRule
val interval = getOperandAsLong(windowExpr, 1)
val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- w.on(time).as("w$")
+ w.on(time).as(WindowReference("w$"))
case SqlStdOperatorTable.HOP =>
val time = getOperandAsTimeIndicator(windowExpr, 0)
@@ -93,14 +93,14 @@ class DataStreamLogicalWindowAggregateRule
.over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
.every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- w.on(time).as("w$")
+ w.on(time).as(WindowReference("w$"))
case SqlStdOperatorTable.SESSION =>
val time = getOperandAsTimeIndicator(windowExpr, 0)
val gap = getOperandAsLong(windowExpr, 1)
val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
- w.on(time).as("w$")
+ w.on(time).as(WindowReference("w$"))
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
index 31dcb5c..083f1eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
@@ -34,7 +34,8 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean)
SqlTimestampSerializer.INSTANCE,
classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) {
- override def toString: String = s"TimeIndicatorTypeInfo"
+ override def toString: String =
+ s"TimeIndicatorTypeInfo(${if (isEventTime) "rowtime" else "proctime" })"
}
object TimeIndicatorTypeInfo {
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
index 7797f22..3c1668f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
@@ -36,6 +36,13 @@ import org.mockito.Mockito.{mock, when}
class StreamTableEnvironmentTest extends TableTestBase {
@Test(expected = classOf[TableException])
+ def testInvalidTimeAttributes(): Unit = {
+ val util = streamTestUtil()
+ // table definition makes no sense
+ util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 'd, 'e)
+ }
+
+ @Test(expected = classOf[TableException])
def testInvalidProctimeAttribute(): Unit = {
val util = streamTestUtil()
// cannot replace an attribute with proctime
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index cda90f7..890ad32 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -71,8 +71,8 @@ class TableSourceTest extends TableTestBase {
term("where", ">(val, 100)")
),
term("groupBy", "name"),
- term("window", "TumblingGroupWindow(WindowReference(w), 'addTime, 600000.millis)"),
- term("select", "name", "AVG(val) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+ term("window", "TumblingGroupWindow('w, 'addTime, 600000.millis)"),
+ term("select", "name", "AVG(val) AS TMP_1", "end('w) AS TMP_0")
),
term("select", "name", "TMP_0", "TMP_1")
)
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index ef071b7..b389183 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -170,7 +170,6 @@ class GroupWindowTest extends TableTestBase {
.select('string, weightedAvg('string, 'int)) // invalid UDAGG args
}
- @Ignore // TODO
@Test
def testMultiWindow(): Unit = {
val util = streamTestUtil()
@@ -179,7 +178,7 @@ class GroupWindowTest extends TableTestBase {
val windowedTable = table
.window(Tumble over 50.milli on 'proctime as 'w1)
.groupBy('w1, 'string)
- .select('w.end as 'proctime, 'string, 'int.count)
+ .select('w1.proctime as 'proctime, 'string, 'int.count)
.window(Slide over 20.milli every 10.milli on 'proctime as 'w2)
.groupBy('w2)
.select('string.count)
@@ -193,7 +192,7 @@ class GroupWindowTest extends TableTestBase {
unaryNode(
"DataStreamCalc",
streamTableNode(0),
- term("select", "string", "int")
+ term("select", "string", "int", "proctime")
),
term("groupBy", "string"),
term(
@@ -202,9 +201,9 @@ class GroupWindowTest extends TableTestBase {
WindowReference("w1"),
'proctime,
50.milli)),
- term("select", "string", "COUNT(int) AS TMP_0")
+ term("select", "string", "COUNT(int) AS TMP_1", "proctime('w1) AS TMP_0")
),
- term("select", "string")
+ term("select", "string", "TMP_0 AS proctime")
),
term(
"window",
@@ -213,7 +212,7 @@ class GroupWindowTest extends TableTestBase {
'proctime,
20.milli,
10.milli)),
- term("select", "COUNT(string) AS TMP_1")
+ term("select", "COUNT(string) AS TMP_2")
)
util.verifyTable(windowedTable, expected)
}
@@ -784,8 +783,8 @@ class GroupWindowTest extends TableTestBase {
term("select",
"string",
"COUNT(int) AS TMP_0",
- "start(WindowReference(w)) AS TMP_1",
- "end(WindowReference(w)) AS TMP_2")
+ "start('w) AS TMP_1",
+ "end('w) AS TMP_2")
)
util.verifyTable(windowedTable, expected)
@@ -852,8 +851,8 @@ class GroupWindowTest extends TableTestBase {
term("select",
"string",
"COUNT(int) AS TMP_0",
- "start(WindowReference(w)) AS TMP_1",
- "end(WindowReference(w)) AS TMP_2")
+ "start('w) AS TMP_1",
+ "end('w) AS TMP_2")
)
util.verifyTable(windowedTable, expected)
@@ -879,8 +878,8 @@ class GroupWindowTest extends TableTestBase {
term("select",
"string",
"COUNT(int) AS TMP_1",
- "end(WindowReference(w)) AS TMP_0",
- "start(WindowReference(w)) AS TMP_2")
+ "end('w) AS TMP_0",
+ "start('w) AS TMP_2")
),
term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
)
@@ -909,8 +908,8 @@ class GroupWindowTest extends TableTestBase {
term("select",
"string",
"SUM(int) AS TMP_0",
- "start(WindowReference(w)) AS TMP_1",
- "end(WindowReference(w)) AS TMP_2")
+ "start('w) AS TMP_1",
+ "end('w) AS TMP_2")
),
term("select",
"string",
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
index 7ac0874..cf55d48 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -243,10 +243,10 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
term(
"window",
TumblingGroupWindow(
- WindowReference("w"),
+ 'w,
'rowtime,
100.millis)),
- term("select", "long", "SUM(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+ term("select", "long", "SUM(int) AS TMP_1", "end('w) AS TMP_0")
),
term("select", "TMP_0 AS rowtime", "long", "TMP_1")
)
@@ -273,7 +273,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
term(
"window",
TumblingGroupWindow(
- 'w$,
+ WindowReference("w$"),
'rowtime,
100.millis)),
term("select", "long", "SUM(int) AS EXPR$2", "start('w$) AS w$start", "end('w$) AS w$end")
@@ -338,6 +338,54 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
util.verifyTable(result, expected)
}
+ @Test
+ def testMultiWindow(): Unit = {
+ val util = streamTestUtil()
+ val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+ val result = t
+ .window(Tumble over 100.millis on 'rowtime as 'w)
+ .groupBy('w, 'long)
+ .select('w.rowtime as 'newrowtime, 'long, 'int.sum as 'int)
+ .window(Tumble over 1.second on 'newrowtime as 'w2)
+ .groupBy('w2, 'long)
+ .select('w2.end, 'long, 'int.sum)
+
+ val expected = unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ unaryNode(
+ "DataStreamCalc",
+ unaryNode(
+ "DataStreamGroupWindowAggregate",
+ streamTableNode(0),
+ term("groupBy", "long"),
+ term(
+ "window",
+ TumblingGroupWindow(
+ 'w,
+ 'rowtime,
+ 100.millis)),
+ term("select", "long", "SUM(int) AS TMP_1", "rowtime('w) AS TMP_0")
+ ),
+ term("select", "TMP_0 AS newrowtime", "long", "TMP_1 AS int")
+ ),
+ term("groupBy", "long"),
+ term(
+ "window",
+ TumblingGroupWindow(
+ 'w2,
+ 'newrowtime,
+ 1000.millis)),
+ term("select", "long", "SUM(int) AS TMP_3", "end('w2) AS TMP_2")
+ ),
+ term("select", "TMP_2", "long", "TMP_3")
+ )
+
+ util.verifyTable(result, expected)
+ }
+
}
object RelTimeIndicatorConverterTest {
http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index 7d7088e..3f12218 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.stream.utils.StreamITCase
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
import org.apache.flink.table.expressions.TimeIntervalUnit
import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
@@ -62,6 +62,33 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
}
+ @Test(expected = classOf[ValidationException])
+ def testInvalidUseOfRowtime(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ .select('rowtime.rowtime)
+ }
+
+ @Test(expected = classOf[ValidationException])
+ def testInvalidUseOfRowtime2(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ stream
+ .toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+ .window(Tumble over 2.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window reference
+ }
+
@Test
def testCalcMaterialization(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -215,6 +242,39 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
+ @Test
+ def testMultiWindow(): Unit = {
+ val env = StreamExecutionEnvironment.getExecutionEnvironment
+ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+ val tEnv = TableEnvironment.getTableEnvironment(env)
+ StreamITCase.testResults = mutable.MutableList()
+
+ val stream = env
+ .fromCollection(data)
+ .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+ val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+ val t = table
+ .window(Tumble over 2.millis on 'rowtime as 'w)
+ .groupBy('w)
+ .select('w.rowtime as 'rowtime, 'int.count as 'int)
+ .window(Tumble over 4.millis on 'rowtime as 'w2)
+ .groupBy('w2)
+ .select('w2.rowtime, 'w2.end, 'int.count)
+
+ val results = t.toDataStream[Row]
+ results.addSink(new StreamITCase.StringSink)
+ env.execute()
+
+ val expected = Seq(
+ "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004,2",
+ "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008,2",
+ "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012,1",
+ "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02,1"
+ )
+ assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+ }
+
}
object TimeAttributesITCase {