You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tw...@apache.org on 2017/05/15 10:15:08 UTC

flink git commit: [FLINK-6517] [table] Support multiple consecutive windows

Repository: flink
Updated Branches:
  refs/heads/master ae423e1d1 -> c86f46cdc


[FLINK-6517] [table] Support multiple consecutive windows

This closes #3897.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c86f46cd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c86f46cd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c86f46cd

Branch: refs/heads/master
Commit: c86f46cdc1c0e2be30876ef358e7ab3c1daa14e9
Parents: ae423e1
Author: twalthr <tw...@apache.org>
Authored: Fri May 12 10:03:25 2017 +0200
Committer: twalthr <tw...@apache.org>
Committed: Mon May 15 12:14:36 2017 +0200

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      |  3 +
 .../flink/table/api/scala/expressionDsl.scala   |  8 ++-
 .../calcite/RelTimeIndicatorConverter.scala     | 18 ++++--
 .../table/expressions/fieldExpression.scala     | 60 ++++++++++++++-----
 .../table/expressions/windowProperties.scala    | 25 +++++---
 .../table/plan/logical/LogicalWindow.scala      |  2 +-
 .../flink/table/plan/logical/operators.scala    | 36 +++++++++---
 .../DataStreamGroupWindowAggregate.scala        |  8 ++-
 .../DataStreamLogicalWindowAggregateRule.scala  | 10 ++--
 .../table/typeutils/TimeIndicatorTypeInfo.scala |  3 +-
 .../stream/StreamTableEnvironmentTest.scala     |  7 +++
 .../api/scala/stream/TableSourceTest.scala      |  4 +-
 .../scala/stream/table/GroupWindowTest.scala    | 27 ++++-----
 .../calcite/RelTimeIndicatorConverterTest.scala | 54 ++++++++++++++++-
 .../datastream/TimeAttributesITCase.scala       | 62 +++++++++++++++++++-
 15 files changed, 260 insertions(+), 67 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 994ac80..c430b21 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -467,6 +467,9 @@ abstract class StreamTableEnvironment(
           proctime = Some(idx, name)
         }
       case (u: UnresolvedFieldReference, _) => fieldNames = u.name :: fieldNames
+
+      case _ =>
+        throw new TableException("Time attributes can only be defined on field references.")
     }
 
     if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 6d15212..b87bb6d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -234,12 +234,14 @@ trait ImplicitExpressionOperations {
   def desc = Desc(expr)
 
   /**
-    * Returns the start time of a window when applied on a window reference.
+    * Returns the start time (inclusive) of a window when applied on a window reference.
     */
   def start = WindowStart(expr)
 
   /**
-    * Returns the end time of a window when applied on a window reference.
+    * Returns the end time (exclusive) of a window when applied on a window reference.
+    *
+    * e.g. if a window ends at 10:59:59.999 this property will return 11:00:00.000.
     */
   def end = WindowEnd(expr)
 
@@ -683,7 +685,7 @@ trait ImplicitExpressionOperations {
     */
   def element() = ArrayElement(expr)
 
-  // Schema definition
+  // Time definition
 
   /**
     * Declares a field as the rowtime attribute for indicating, accessing, and working in

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
index 7ceb397..21fa70b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverter.scala
@@ -342,10 +342,20 @@ class RelTimeIndicatorConverter(rexBuilder: RexBuilder) extends RelShuttle {
       }
 
       // remove time indicator return type
-      if (isTimeIndicatorType(updatedCall.getType)) {
-        updatedCall.clone(timestamp, materializedOperands)
-      } else {
-        updatedCall.clone(updatedCall.getType, materializedOperands)
+      updatedCall.getOperator match {
+
+        // we do not modify AS if operand has not been materialized
+        case SqlStdOperatorTable.AS if
+            isTimeIndicatorType(updatedCall.getOperands.get(0).getType) =>
+          updatedCall
+
+        // materialize function's result and operands
+        case _ if isTimeIndicatorType(updatedCall.getType) =>
+          updatedCall.clone(timestamp, materializedOperands)
+
+        // materialize function's operands only
+        case _ =>
+          updatedCall.clone(updatedCall.getType, materializedOperands)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
index 362d846..99adabf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/fieldExpression.scala
@@ -19,8 +19,10 @@ package org.apache.flink.table.expressions
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.table.api.{UnresolvedException, ValidationException}
+import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory.{isRowtimeIndicatorType, isTimeIndicatorType}
 import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess}
 
@@ -117,13 +119,13 @@ case class UnresolvedAlias(child: Expression) extends UnaryExpression with Named
   override private[flink] lazy val valid = false
 }
 
-case class WindowReference(name: String) extends Attribute {
+case class WindowReference(name: String, tpe: Option[TypeInformation[_]] = None) extends Attribute {
 
   override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode =
     throw new UnsupportedOperationException("A window reference can not be used solely.")
 
   override private[flink] def resultType: TypeInformation[_] =
-    throw new UnsupportedOperationException("A window reference has no result type.")
+    tpe.getOrElse(throw UnresolvedException("Could not resolve type of referenced window."))
 
   override private[flink] def withName(newName: String): Attribute = {
     if (newName == name) {
@@ -132,31 +134,61 @@ case class WindowReference(name: String) extends Attribute {
       throw new ValidationException("Cannot rename window reference.")
     }
   }
+
+  override def toString: String = s"'$name"
 }
 
 abstract class TimeAttribute(val expression: Expression)
   extends UnaryExpression
-  with NamedExpression {
+  with WindowProperty {
 
   override private[flink] def child: Expression = expression
-
-  override private[flink] def name: String = expression match {
-    case UnresolvedFieldReference(name) => name
-    case _ => throw new ValidationException("Unresolved field reference expected.")
-  }
-
-  override private[flink] def toAttribute: Attribute =
-    throw new UnsupportedOperationException("Time attribute can not be used solely.")
 }
 
 case class RowtimeAttribute(expr: Expression) extends TimeAttribute(expr) {
 
-  override private[flink] def resultType: TypeInformation[_] =
+  override private[flink] def validateInput(): ValidationResult = {
+    child match {
+      case WindowReference(_, Some(tpe)) if !isRowtimeIndicatorType(tpe) =>
+        ValidationFailure("A proctime window cannot provide a rowtime attribute.")
+      case WindowReference(_, Some(tpe)) if isRowtimeIndicatorType(tpe) =>
+        ValidationSuccess
+      case WindowReference(_, _) =>
+        ValidationFailure("Reference to a rowtime or proctime window required.")
+      case _ =>
+        ValidationFailure(
+          "The '.rowtime' expression can only be used for table definitions and windows.")
+    }
+  }
+
+  override def resultType: TypeInformation[_] =
     TimeIndicatorTypeInfo.ROWTIME_INDICATOR
+
+  override def toNamedWindowProperty(name: String): NamedWindowProperty =
+    NamedWindowProperty(name, this)
+
+  override def toString: String = s"rowtime($child)"
 }
 
 case class ProctimeAttribute(expr: Expression) extends TimeAttribute(expr) {
 
-  override private[flink] def resultType: TypeInformation[_] =
+  override private[flink] def validateInput(): ValidationResult = {
+    child match {
+      case WindowReference(_, Some(tpe)) if isTimeIndicatorType(tpe) =>
+        ValidationSuccess
+      case WindowReference(_, _) =>
+        ValidationFailure("Reference to a rowtime or proctime window required.")
+      case _ =>
+        ValidationFailure(
+          "The '.proctime' expression can only be used for table definitions and windows.")
+    }
+  }
+
+  override def resultType: TypeInformation[_] =
     TimeIndicatorTypeInfo.PROCTIME_INDICATOR
+
+  override def toNamedWindowProperty(name: String): NamedWindowProperty =
+    NamedWindowProperty(name, this)
+
+  override def toString: String = s"proctime($child)"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
index 990d928..e119247 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/windowProperties.scala
@@ -20,12 +20,22 @@ package org.apache.flink.table.expressions
 
 import org.apache.calcite.rex.RexNode
 import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo
+import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
 import org.apache.flink.table.calcite.FlinkRelBuilder
 import FlinkRelBuilder.NamedWindowProperty
 import org.apache.flink.table.validate.{ValidationFailure, ValidationSuccess}
 
-abstract class WindowProperty(child: Expression) extends UnaryExpression {
+trait WindowProperty {
+
+  def toNamedWindowProperty(name: String): NamedWindowProperty
+
+  def resultType: TypeInformation[_]
+
+}
+
+abstract class AbstractWindowProperty(child: Expression)
+  extends UnaryExpression
+  with WindowProperty {
 
   override def toString = s"WindowProperty($child)"
 
@@ -39,20 +49,19 @@ abstract class WindowProperty(child: Expression) extends UnaryExpression {
       ValidationFailure("Child must be a window reference.")
     }
 
-  private[flink] def toNamedWindowProperty(name: String)(implicit relBuilder: RelBuilder)
-    : NamedWindowProperty = NamedWindowProperty(name, this)
+  def toNamedWindowProperty(name: String): NamedWindowProperty = NamedWindowProperty(name, this)
 }
 
-case class WindowStart(child: Expression) extends WindowProperty(child) {
+case class WindowStart(child: Expression) extends AbstractWindowProperty(child) {
 
-  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+  override def resultType = SqlTimeTypeInfo.TIMESTAMP
 
   override def toString: String = s"start($child)"
 }
 
-case class WindowEnd(child: Expression) extends WindowProperty(child) {
+case class WindowEnd(child: Expression) extends AbstractWindowProperty(child) {
 
-  override private[flink] def resultType = SqlTimeTypeInfo.TIMESTAMP
+  override def resultType = SqlTimeTypeInfo.TIMESTAMP
 
   override def toString: String = s"end($child)"
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
index 6161ef0..a328703 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/LogicalWindow.scala
@@ -36,7 +36,7 @@ abstract class LogicalWindow(
   def resolveExpressions(resolver: (Expression) => Expression): LogicalWindow = this
 
   def validate(tableEnv: TableEnvironment): ValidationResult = aliasAttribute match {
-    case WindowReference(_) => ValidationSuccess
+    case WindowReference(_, _) => ValidationSuccess
     case _ => ValidationFailure("Window reference for window expected.")
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
index 36067eb..bfb6cbf 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala
@@ -552,20 +552,38 @@ case class WindowAggregate(
   override def resolveReference(
       tableEnv: TableEnvironment,
       name: String)
-    : Option[NamedExpression] = window.aliasAttribute match {
+    : Option[NamedExpression] = {
+
+    def resolveAlias(alias: String) = {
+      // check if reference can already be resolved by input fields
+      val found = super.resolveReference(tableEnv, name)
+      if (found.isDefined) {
+        failValidation(s"Reference $name is ambiguous.")
+      } else {
+        // resolve type of window reference
+        val resolvedType = window.timeAttribute match {
+          case UnresolvedFieldReference(n) =>
+            super.resolveReference(tableEnv, n) match {
+              case Some(ResolvedFieldReference(_, tpe)) => Some(tpe)
+              case _ => None
+            }
+          case _ => None
+        }
+        // let validation phase throw an error if type could not be resolved
+        Some(WindowReference(name, resolvedType))
+      }
+    }
+
+    window.aliasAttribute match {
       // resolve reference to this window's name
       case UnresolvedFieldReference(alias) if name == alias =>
-        // check if reference can already be resolved by input fields
-        val found = super.resolveReference(tableEnv, name)
-        if (found.isDefined) {
-          failValidation(s"Reference $name is ambiguous.")
-        } else {
-          Some(WindowReference(name))
-        }
+        resolveAlias(alias)
+
       case _ =>
         // resolve references as usual
         super.resolveReference(tableEnv, name)
     }
+  }
 
   override protected[logical] def construct(relBuilder: RelBuilder): RelBuilder = {
     val flinkRelBuilder = relBuilder.asInstanceOf[FlinkRelBuilder]
@@ -574,7 +592,7 @@ case class WindowAggregate(
       window,
       relBuilder.groupKey(groupingExpressions.map(_.toRexNode(relBuilder)).asJava),
       propertyExpressions.map {
-        case Alias(prop: WindowProperty, name, _) => prop.toNamedWindowProperty(name)(relBuilder)
+        case Alias(prop: WindowProperty, name, _) => prop.toNamedWindowProperty(name)
         case _ => throw new RuntimeException("This should never happen.")
       },
       aggregateExpressions.map {

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 2a71592..c158579 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.windows.{Window => DataStreamWindow}
 import org.apache.flink.table.api.{StreamQueryConfig, StreamTableEnvironment, TableException}
 import org.apache.flink.table.calcite.FlinkRelBuilder.NamedWindowProperty
+import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.codegen.CodeGenerator
 import org.apache.flink.table.expressions.ExpressionUtils._
 import org.apache.flink.table.plan.logical._
@@ -118,6 +119,9 @@ class DataStreamGroupWindowAggregate(
         inputSchema.mapAggregateCall(namedAggregate.left),
         namedAggregate.right)
     }
+    val physicalNamedProperties = namedProperties
+      .filter(np => !FlinkTypeFactory.isTimeIndicatorType(np.property.resultType))
+
     val consumeRetraction = DataStreamRetractionRules.isAccRetract(input)
 
     if (consumeRetraction) {
@@ -159,7 +163,7 @@ class DataStreamGroupWindowAggregate(
         physicalGrouping.length,
         physicalNamedAggregates.size,
         schema.physicalArity,
-        namedProperties)
+        physicalNamedProperties)
 
       val keyedStream = inputDS.keyBy(physicalGrouping: _*)
       val windowedStream =
@@ -185,7 +189,7 @@ class DataStreamGroupWindowAggregate(
       val windowFunction = AggregateUtil.createAggregationAllWindowFunction(
         window,
         schema.physicalArity,
-        namedProperties)
+        physicalNamedProperties)
 
       val windowedStream =
         createNonKeyedWindowedStream(window, inputDS)

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
index d57d4cc..38de539 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamLogicalWindowAggregateRule.scala
@@ -23,10 +23,10 @@ import java.math.{BigDecimal => JBigDecimal}
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.flink.table.api.{TableException, Window}
 import org.apache.flink.table.api.scala.{Session, Slide, Tumble}
+import org.apache.flink.table.api.{TableException, Window}
 import org.apache.flink.table.calcite.FlinkTypeFactory
-import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, UnresolvedFieldReference}
+import org.apache.flink.table.expressions.{Literal, ResolvedFieldReference, WindowReference}
 import org.apache.flink.table.plan.rules.common.LogicalWindowAggregateRule
 import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
 
@@ -84,7 +84,7 @@ class DataStreamLogicalWindowAggregateRule
         val interval = getOperandAsLong(windowExpr, 1)
         val w = Tumble.over(Literal(interval, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(time).as("w$")
+        w.on(time).as(WindowReference("w$"))
 
       case SqlStdOperatorTable.HOP =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
@@ -93,14 +93,14 @@ class DataStreamLogicalWindowAggregateRule
           .over(Literal(size, TimeIntervalTypeInfo.INTERVAL_MILLIS))
           .every(Literal(slide, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(time).as("w$")
+        w.on(time).as(WindowReference("w$"))
 
       case SqlStdOperatorTable.SESSION =>
         val time = getOperandAsTimeIndicator(windowExpr, 0)
         val gap = getOperandAsLong(windowExpr, 1)
         val w = Session.withGap(Literal(gap, TimeIntervalTypeInfo.INTERVAL_MILLIS))
 
-        w.on(time).as("w$")
+        w.on(time).as(WindowReference("w$"))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
index 31dcb5c..083f1eb 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TimeIndicatorTypeInfo.scala
@@ -34,7 +34,8 @@ class TimeIndicatorTypeInfo(val isEventTime: Boolean)
     SqlTimestampSerializer.INSTANCE,
     classOf[SqlTimestampComparator].asInstanceOf[Class[TypeComparator[Timestamp]]]) {
 
-  override def toString: String = s"TimeIndicatorTypeInfo"
+  override def toString: String =
+    s"TimeIndicatorTypeInfo(${if (isEventTime) "rowtime" else "proctime" })"
 }
 
 object TimeIndicatorTypeInfo {

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
index 7797f22..3c1668f 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/StreamTableEnvironmentTest.scala
@@ -36,6 +36,13 @@ import org.mockito.Mockito.{mock, when}
 class StreamTableEnvironmentTest extends TableTestBase {
 
   @Test(expected = classOf[TableException])
+  def testInvalidTimeAttributes(): Unit = {
+    val util = streamTestUtil()
+    // table definition makes no sense
+    util.addTable[(Long, Int, String, Int, Long)]('a.rowtime.rowtime, 'b, 'c, 'd, 'e)
+  }
+
+  @Test(expected = classOf[TableException])
   def testInvalidProctimeAttribute(): Unit = {
     val util = streamTestUtil()
     // cannot replace an attribute with proctime

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
index cda90f7..890ad32 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/TableSourceTest.scala
@@ -71,8 +71,8 @@ class TableSourceTest extends TableTestBase {
             term("where", ">(val, 100)")
           ),
           term("groupBy", "name"),
-          term("window", "TumblingGroupWindow(WindowReference(w), 'addTime, 600000.millis)"),
-          term("select", "name", "AVG(val) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+          term("window", "TumblingGroupWindow('w, 'addTime, 600000.millis)"),
+          term("select", "name", "AVG(val) AS TMP_1", "end('w) AS TMP_0")
         ),
         term("select", "name", "TMP_0", "TMP_1")
       )

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
index ef071b7..b389183 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala
@@ -170,7 +170,6 @@ class GroupWindowTest extends TableTestBase {
       .select('string, weightedAvg('string, 'int)) // invalid UDAGG args
   }
 
-  @Ignore // TODO
   @Test
   def testMultiWindow(): Unit = {
     val util = streamTestUtil()
@@ -179,7 +178,7 @@ class GroupWindowTest extends TableTestBase {
     val windowedTable = table
       .window(Tumble over 50.milli on 'proctime as 'w1)
       .groupBy('w1, 'string)
-      .select('w.end as 'proctime, 'string, 'int.count)
+      .select('w1.proctime as 'proctime, 'string, 'int.count)
       .window(Slide over 20.milli every 10.milli on 'proctime as 'w2)
       .groupBy('w2)
       .select('string.count)
@@ -193,7 +192,7 @@ class GroupWindowTest extends TableTestBase {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(0),
-            term("select", "string", "int")
+            term("select", "string", "int", "proctime")
           ),
           term("groupBy", "string"),
           term(
@@ -202,9 +201,9 @@ class GroupWindowTest extends TableTestBase {
               WindowReference("w1"),
               'proctime,
               50.milli)),
-          term("select", "string", "COUNT(int) AS TMP_0")
+          term("select", "string", "COUNT(int) AS TMP_1", "proctime('w1) AS TMP_0")
         ),
-        term("select", "string")
+        term("select", "string", "TMP_0 AS proctime")
       ),
       term(
         "window",
@@ -213,7 +212,7 @@ class GroupWindowTest extends TableTestBase {
           'proctime,
           20.milli,
           10.milli)),
-      term("select", "COUNT(string) AS TMP_1")
+      term("select", "COUNT(string) AS TMP_2")
     )
     util.verifyTable(windowedTable, expected)
   }
@@ -784,8 +783,8 @@ class GroupWindowTest extends TableTestBase {
       term("select",
         "string",
         "COUNT(int) AS TMP_0",
-        "start(WindowReference(w)) AS TMP_1",
-        "end(WindowReference(w)) AS TMP_2")
+        "start('w) AS TMP_1",
+        "end('w) AS TMP_2")
     )
 
     util.verifyTable(windowedTable, expected)
@@ -852,8 +851,8 @@ class GroupWindowTest extends TableTestBase {
       term("select",
         "string",
         "COUNT(int) AS TMP_0",
-        "start(WindowReference(w)) AS TMP_1",
-        "end(WindowReference(w)) AS TMP_2")
+        "start('w) AS TMP_1",
+        "end('w) AS TMP_2")
     )
 
     util.verifyTable(windowedTable, expected)
@@ -879,8 +878,8 @@ class GroupWindowTest extends TableTestBase {
         term("select",
           "string",
           "COUNT(int) AS TMP_1",
-          "end(WindowReference(w)) AS TMP_0",
-          "start(WindowReference(w)) AS TMP_2")
+          "end('w) AS TMP_0",
+          "start('w) AS TMP_2")
       ),
       term("select", "TMP_0 AS we1", "string", "TMP_1 AS cnt", "TMP_2 AS ws", "TMP_0 AS we2")
     )
@@ -909,8 +908,8 @@ class GroupWindowTest extends TableTestBase {
         term("select",
           "string",
           "SUM(int) AS TMP_0",
-          "start(WindowReference(w)) AS TMP_1",
-          "end(WindowReference(w)) AS TMP_2")
+          "start('w) AS TMP_1",
+          "end('w) AS TMP_2")
       ),
       term("select",
         "string",

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
index 7ac0874..cf55d48 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/calcite/RelTimeIndicatorConverterTest.scala
@@ -243,10 +243,10 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
         term(
           "window",
           TumblingGroupWindow(
-            WindowReference("w"),
+            'w,
             'rowtime,
             100.millis)),
-        term("select", "long", "SUM(int) AS TMP_1", "end(WindowReference(w)) AS TMP_0")
+        term("select", "long", "SUM(int) AS TMP_1", "end('w) AS TMP_0")
       ),
       term("select", "TMP_0 AS rowtime", "long", "TMP_1")
     )
@@ -273,7 +273,7 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
         term(
           "window",
           TumblingGroupWindow(
-            'w$,
+            WindowReference("w$"),
             'rowtime,
             100.millis)),
         term("select", "long", "SUM(int) AS EXPR$2", "start('w$) AS w$start", "end('w$) AS w$end")
@@ -338,6 +338,54 @@ class RelTimeIndicatorConverterTest extends TableTestBase {
     util.verifyTable(result, expected)
   }
 
+  @Test
+  def testMultiWindow(): Unit = {
+    val util = streamTestUtil()
+    val t = util.addTable[(Long, Long, Int)]('rowtime.rowtime, 'long, 'int)
+
+    val result = t
+      .window(Tumble over 100.millis on 'rowtime as 'w)
+      .groupBy('w, 'long)
+      .select('w.rowtime as 'newrowtime, 'long, 'int.sum as 'int)
+      .window(Tumble over 1.second on 'newrowtime as 'w2)
+      .groupBy('w2, 'long)
+      .select('w2.end, 'long, 'int.sum)
+
+    val expected = unaryNode(
+      "DataStreamCalc",
+      unaryNode(
+        "DataStreamGroupWindowAggregate",
+        unaryNode(
+          "DataStreamCalc",
+          unaryNode(
+            "DataStreamGroupWindowAggregate",
+            streamTableNode(0),
+            term("groupBy", "long"),
+            term(
+              "window",
+              TumblingGroupWindow(
+                'w,
+                'rowtime,
+                100.millis)),
+            term("select", "long", "SUM(int) AS TMP_1", "rowtime('w) AS TMP_0")
+          ),
+          term("select", "TMP_0 AS newrowtime", "long", "TMP_1 AS int")
+        ),
+        term("groupBy", "long"),
+        term(
+          "window",
+          TumblingGroupWindow(
+            'w2,
+            'newrowtime,
+            1000.millis)),
+        term("select", "long", "SUM(int) AS TMP_3", "end('w2) AS TMP_2")
+      ),
+      term("select", "TMP_2", "long", "TMP_3")
+    )
+
+    util.verifyTable(result, expected)
+  }
+
 }
 
 object RelTimeIndicatorConverterTest {

http://git-wip-us.apache.org/repos/asf/flink/blob/c86f46cd/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
index 7d7088e..3f12218 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.watermark.Watermark
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.api.scala.stream.utils.StreamITCase
-import org.apache.flink.table.api.{TableEnvironment, TableException, Types}
+import org.apache.flink.table.api.{TableEnvironment, TableException, Types, ValidationException}
 import org.apache.flink.table.calcite.RelTimeIndicatorConverterTest.TableFunc
 import org.apache.flink.table.expressions.TimeIntervalUnit
 import org.apache.flink.table.runtime.datastream.TimeAttributesITCase.TimestampWithEqualWatermark
@@ -62,6 +62,33 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
   }
 
+  @Test(expected = classOf[ValidationException])
+  def testInvalidUseOfRowtime(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+      .select('rowtime.rowtime)
+  }
+
+  @Test(expected = classOf[ValidationException])
+  def testInvalidUseOfRowtime2(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    stream
+      .toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+      .window(Tumble over 2.millis on 'rowtime as 'w)
+      .groupBy('w)
+      .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window reference
+  }
+
   @Test
   def testCalcMaterialization(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
@@ -215,6 +242,39 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  @Test
+  def testMultiWindow(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+
+    val stream = env
+      .fromCollection(data)
+      .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark())
+    val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string)
+
+    val t = table
+      .window(Tumble over 2.millis on 'rowtime as 'w)
+      .groupBy('w)
+      .select('w.rowtime as 'rowtime, 'int.count as 'int)
+      .window(Tumble over 4.millis on 'rowtime as 'w2)
+      .groupBy('w2)
+      .select('w2.rowtime, 'w2.end, 'int.count)
+
+    val results = t.toDataStream[Row]
+    results.addSink(new StreamITCase.StringSink)
+    env.execute()
+
+    val expected = Seq(
+      "1970-01-01 00:00:00.003,1970-01-01 00:00:00.004,2",
+      "1970-01-01 00:00:00.007,1970-01-01 00:00:00.008,2",
+      "1970-01-01 00:00:00.011,1970-01-01 00:00:00.012,1",
+      "1970-01-01 00:00:00.019,1970-01-01 00:00:00.02,1"
+    )
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
 }
 
 object TimeAttributesITCase {